Пакет Threading предназначен для организации мультипотоковых программ. Он включает в себя набор платформенно-независимых примитивов, оформленных в виде C++ классов, с помощью которых можно обеспечить эффективную платформенно независимую организацию програм с несколькими потоками управления и общими данными.
В этом документе подразумевается знакомство с основными концепциями многопоточности, необходимое введение можно найти в [4], [3]
Данное ПО разработанно и поддерживается компанией GradSoft, Киев, Украина.
Последняя версия этого пакета доступна на www-сайте: http://www.gradsoft.com.ua.
Вы можете свободно использовать этот пакет и включать его в свои программы,
в соответствии с лицензией, находящейся в файле docs/LICENSE (либо docs/LICENSE.rus в дистрибутиве пакета GradSoft C++ ToolBox.
При необходимости возможно коммерческое сопровождение пакета.
Данное Руководство Программиста написанно для версии Threading, входящей в состав GradC++ToolBox версии 1.2.0. Тут описано использование API Threading с точки зрения программиста. Порядок инсталляции пакета описан в Руководстве Администратора пакета GradSoft C++ ToolBox [1]
Наиболее часто используемой функцией бибилиотеки классов, без сомнения, является обработка ошибок ;)
Методы классов из Threading генерируют исключения, являющиеся потомками
ThreadingExceptions::Failure. Эти исключения образуют иерархию,
показанную на следующей схеме:
std::runtime_error
|
|
*-ThreadingExceptions::Failure
|
|----------ThreadingExceptions::NoResources
| |
| |----ThreadingExceptions::NoMemory
| |----ThreadingExceptions::TemporaryNoResources
| *----ThreadingExceptions::NoPermission
|
|----ThreadingExceptions::ResourceBusy
|----ThreadingExceptions::InvalidResource
|----ThreadingExceptions::PossibleDeadlock
|----ThreadingExceptions::SystemError
|----ThreadingExceptions::InternalError
*----ThreadingExceptions::NotImplemented
Рассмотрим эту иерархию подробнее:
Failure - просто базовый класс для всех исключений из Threading
NoResources - группа исключений, связанных с нехваткой ресурсов.
Это также базовый класс, и вам может понадобиться обрабатывать разные подкласcы этого класса по разному.
NoMemory - низлежащая функция операционной системы не
смогла выполнить свою работу вследствие нехватки памяти.
TemporaryNoResources - низлежащая функция операционой
системы не смогла выполнить свою работу вследствие временной нехватки
каких-либо ресурсов (например, общего количества потоков в системе).
Возможно эти ресурсы могут быть освобождены другими потоками.
NoPermission - программе не хватает привелегий для
выполнения низлежащей функции операционной системы.
ResourceBusy - данный ресурс сейчас занят;
к примеру, это исключение возникает при попытке уничтожения занятого Mutex-а.
PossibleDeadlock - Операция приводит к ситуации тупика.
Как правило, иллюстрирует ошибку в логике прикладной программы - например повторная блокировка одного и того-же ресурса в одном потоке. Заметим, что это прерывание возникает не на всех платформах - на некоторых, в зависимости от флагов компиляции и настроек окружения вы просто получите Deadlock.
SystemError - непроквалифицированная ошибка низлежащей функции
операционной системы.
InternalError - означает внутреннюю ошибку пакета Threading.
NotImplemented - данная функция не реализованна для данной платформы.
При обработке исключения программисту доступно сообщение об ошибке и
системно-зависимый код ошибки. Для этого используются методы класса
ThreadingExceptions::Failure getErrorMessage() и
getErrorCode() соответственно.
Сначала рассмотрим простой пример:
class CountThread: public Thread
{
public:
void run()
{
for(int i=0; i<1000; ++i)
{
cout << "i=" << i << endl;
sleep(1);
}
}
};
Этот поток выводит через каждую секунду текущее значение i. Полностью программа, использующая CountThread может выглядеть следующим образом 1:
#include <GradSoft/Threading.h>
...
class CountThread {
....
}
...
int main(int argc, char** argv)
{
CountThread countThread;
countThread.start();
Thread::sleepCurrent(100);
return 0;
}
Как мы видим, для того что-бы запустить поток необходимо обратится к методу Thread::start(); по окончании программы поток автоматически останавливается.
Заметим, что хотя цикл countThread рассчитан на выполнение в течение 1000 секунд, он будет остановлен в деструкторе countThread в течении 100 секунд после старта.
Модифицируем программу так, что-бы main ждал окончания цикла:
int main(int argc, char** argv)
{
CountThread countThread;
countThread.start();
while(countThread.is_running()) {
Thread::sleepCurrent(1);
}
return 0;
}
Т. е.
Thread::sleepCurrent() - статический, и относится к текущему потоку выполнения.
Отметим, что использованный в данном примере циклический вызов Thread::is_running() не является единственным способом дождаться завершения процесса - в классе Thread определен статический метод join(const Thread&) с обычной семантикой: ждать пока аргумент завершится и слить его с текущим потоком управления.
Таким образом, функцию main() последнего примера более естественно было бы определить так:
int main(int argc, char** argv)
{
CountThread countThread;
countThread.start();
Thread::join(countThread);
return 0;
}
Теперь предложим Вашему вниманию следующий пример:
class Forever: public Thread
{
public:
void run(void)
{
for(;;);
}
};
int main(int,char**)
{
Forever forever;
forever.start();
Thread::sleepCurrent(10);
return 0;
}
Попробуйте окомпилировать и запустить этот пример на нескольких платформах,
скажем на Sun Solaris и на Linux.
Вы обнаружите, что программа будет вести себя по разному: на Sun Solaris
она проживет 10 секунд и завершится; на Linux - будет длиться вечно, пока
мы не прервем ее средствами операционной системы.
Так происходит потому, что различные операционные системы поддерживают различные модели выполнения потоков
(например, асинхронную или задержанную),
и в некоторых ОС останов потока внешним событием возможен только в т.н. точках останова (cancelation points),
которые могут содержаться в функциях операционной системы, а могут и не содержаться.
Так вот, класс Thread определяет несколько методов, в которые точки останова точно встроенны. Это следующие методы:
int main(int argc, char** argv)
{
CountThread countThread;
countThread.start();
Thread::sleepCurrent(5);
return 0;
}
приведет к завершению запущенного процесса через не более чем через 6 секунд после старта.
Еще одно полезное понятие в проектировании параллельных програм - понятие
точки переключения: в этой точке планировщик операционной системы
выполняет переключение текущего процессора на другую задачу. В Threading
это переключение можно инициировать, с помощью статического метода
Thread::yield().
Вызов yield() бывает полезен при организации сервис-ориентирванных приложений.
В системном программировании нам часто бывает надо ассоциировать с потоком какие-то данные, о которых ничего не известно при создании потока. Пример: асинхронный ввод/вывод, при котором операционная система или ORB активирует callback приема сообщения в потоке, неизвестном заранее либо обработка неявных транзакций, которые зависят от потока выполнения.
Для этих целей в Threading существует инфраструктура работы с
контекстами потока (ThreadContext).
Посмотрим на основные определения:
class Thread
{
public:
.....
ThreadContext* getThreadContext();
....
};
ThreadContext:
/**
* Thread-Specific context.
**/
class ThreadContext
{
public:
/**
* one abstract entry in thread context.
* (ThreadContext itself is just sequence of such slots)
**/
class Slot
{
public:
///
virtual ~Slot() {}
};
public:
/**
* get current thread.
* return NULL, is this thread is not created
* by our ToolBox.
**/
Thread* getThread();
/**
* get Context of current thread.
* (note, that ThreadContext is automatically created on this call.)
**/
static ThreadContext* current();
/**
* alloc slot in current context and return it's index.
**/
unsigned int allocSlot(Slot* slot);
/**
* alloc slot on specified index.
*@return true, if
**/
bool allocSlot(Slot* slot, unsigned int index);
/**
* return slot on specified index or NULL, if slot does not exists.
**/
Slot* getSlot(unsigned int index);
private:
.......
};
#endif
Сначала посмотрим на механизм слотов: что это такое ? - ответ: все, что вам угодно. Слоты создаются пользователем, ThreadContext хранит в себе индексированную последовательность слотов. Вы можете создать слот и ассоциировать его с экземпляром ThreadContext и индексом, также вы можете взять слот по данному индексу.
Пример: пусть мы проектируем приложение, в котором сетевые сообщения передаются в поставляемые пользователем классы, и потоки приема сообщений являются абсолютно независимыми от этих классов. Тогда, нам возможно надо ассоциировать с потоком две вещи: идентификатор текущей операции потока и ссылку на соединение, обрабатывающее этот поток. Мы заранее распределяем индексы слотов, начиная с нуля:
#define CURRENT_THS_INDEX 0 #define CONNECTION_THS_INDEX 0
Определяем слот для сообщения:
class CurrentThreadSlot: public ThreadContext::Slot
{
public:
.........
// unmarshall message and set parameters and operation.
void setData(BinaryBuffer message);
static CurrentThreadSlot* getOrCreate();
private:
std::string operation_;
std::auto_ptr<Parameters> params_;
};
В месте приема сообщения создаем/выбираем нужный нам слот:
accept.. // ..... unmarshall(buffer,operation,params); CurrentThreadSlot()->getOrCreate()->setData(); ///
Где метод getOrCreate выглядит следующим образом:
CurrentThreadSlot* CurrentThreaSlot::getOrCreate()
{
ThreadContext* threadContext=ThreadContext::current();
ThreadContext::Slot* ts= treadContext->getSlot(CURRENT_THS_INDEX);
CurrentThreadSlot* cts=NULL;
if (ts==NULL) {
cts = new CurrentThreadSlot();
treadContext->alloc(cts,CURRENT_THS_INDEX);
}else{
cts=dynamic_cast<CurrentThreadSlot*>(ts);
if (cts==NULL) throw InternalError("bad value in operation thread context");
}
return cts;
}
И в API предоставляемом пользователю определим функцию получения текущего сообщения:
class Current
{
public:
static std::string get_operation()
{
return getThreadSlot()->get_operation();
}
........
private:
CurrentThreadSlot* getThreadSlot()
{
ThreadContext* threadContext=ThreadContext::current();
ThreadContext::Slot* ts= treadContext->getSlot(CURRENT_THS_INDEX);
if (ts==NULL) throw NoOperation();
CurrentThreadSlot* cts=dynamic_cast<CurrentThreadSlot*>(ts);
if (ts==NULL) throw InternalError("bad value in operation thread context");
return cts;
}
}
Со вторым слотом производим аналогичные действия.
Ну и наконец unsigned int ThreadContext::allocSlot(x) предназанчена
для использования в более сложных ситуациях, когда нам необходимо сформировать множество индексов слотов динамически.
Так, со слотами разобрались, теперь перейдем собственно к Threading:
как соотносятся контексты потоков и сами потоки:
ThreadContext* Thread::getThreadContext()
Thread* ThreadContext::getThread()Этот метод может возвратить
NULL, если поток выполнения,
ассоциированный с этим контекстом был создан не с помощью GradSoft C++ ToolBox,
а каким-то иным образом.
Время жизни ThreadContext это время существования потока - все слоты автоматически удаляются системой после завершения соответствующего потока.
Так, например, использование следующего слота выполнения потока будет приводить к тому, что через некоторое время после завершения потока будет выводится сообщения:
class EndThreadSlot: public GradSoft::ThreadSlot
{
int id_;
public:
EndThreadSlot(int id):id_(id) { }
~EndThreadSlot()
{ cerr << "thread which binded context with id " << id_
<< " was finished some time ago" << endl; }
};
Кстати, этот пример также иллюстрирует зависмость поведенияч от системы:
для posix-совместимых систем это сообщение будет выводится во время
разрушения потока, для Win32 - действительно, спустя некоторое время
;)
Thread::yield
GradSoft C++ ToolBox, так и с потоками, созданными
другими компонентами
При написании мультипотоковых программ необходимо организовывать совместный доступ к ресурсам из различных потоков управления. Для этой цели в Threading определенны следующие примитивы синхронизации:
(от слов Mutual Exclusive Lock). Как видно из названия, mutex предусматривает использование в ресурса данный момент времени только из одного потока.
У класса Mutex есть 3 метода: lock, try_lock и unlock. Процесс должен перед использованием ресурса заблокировать соответствующий mutex (т. е. вызвать Mutex::lock), потом разблокировать (вызвать Mutex::unlock). try_lock делает попытку заблокировать mutex без ожидания. Если она неудачна (т. е. ресурс к этому моменту уже занят) - возвращает false.
...... resourceMutex.lock(); .. work with shared resource here ... resourceMutex.unlock(); ..........
Заметим, что операции с mutex-ами должны быть атомарными (в смысле неделимыми).Т. е. кратчайший способ добиться ситуации deadlock (тупика) - это вызвать одну операцию, блокирующую ресурс, из другой, которая также блокирует доступ к этому-же ресурсу.
Например, следующий фрагмент кода введет программу в бесконечное ожидание:
mutex.lock(); mutex.lock(); // -- deadlock here mutex.unlock(); mutex.unlock();
Заметим, что на некоторых платформах будет сгенерированно прерывание
PossibleDeadlock, на некоторых - смертельные объятия действительно
произойдут.
Часто бывает удобно "спрятать" блокировку в конструктор/деструктор объекта. Для этого предназначен класс MutexLocker с двумя методами:
MutexLocker(const Mutex&) - конструктор, блокирующий Mutex
~MutexLocker() - деструктор, разблокирующий его.
Пример его использования:
Y X::f()
{
MutexLocker l(yMutex_);
return sharedY_;
}
вместо:
Y X::f()
{
yMutex_.lock();
Y retval = sharedY_;
yMutex_.unlock();
return retval;
}
Еще одна часто встречающаяся модель : так называемая read/write блокировка. То есть ресурс доступен для чтения и для записи. В конкретный момент времени к нему могут обращаться либо несколько читателей, либо один писатель.
Соответствующий класс предусмотрен в нашей модели и называется: RWLock.
Посмотрим внимательней на сигнатуру этого класса:
/**
* Read/Write lock
*
* allow multiple readers/single writer.
* access to object with such lock must be
* sequence of atomic read-only or write-only
* operations.
*
* (i. e. rwlock.read_lock(), rwlock.write_lock() is
* the fastest way to deadlock state.
**/
class RWLock
{
........
public:
RWLock() throw(ThreadingExceptions::NoResources,
ThreadingExceptions::InternalError);
virtual ~RWLock() throw(ThreadingExceptions::ResourceBusy,
ThreadingExceptions::InternalError);
void read_lock() const
throw(ThreadingExceptions::NoResources,
ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
bool try_read_lock() const
throw(ThreadingExceptions::NoResources,
ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
void read_unlock() const
throw(ThreadingExceptions::NoPermission,
ThreadingExceptions::InternalError);
void write_lock()
throw(ThreadingExceptions::NoResources,
ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
bool try_write_lock()
throw(ThreadingExceptions::NoResources,
ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
void write_unlock() const
throw(ThreadingExceptions::NoPermission,
ThreadingExceptions::InternalError);
};
Доступ к ресурсу на чтение вы должны обрамлять парой read_lock / read_unlock, на запись: write_lock, write_unlock.
Контроль атомарности операций предоставляется программисту, исполнение следующего фрагмента кода:
rwlock.read_lock(); rwlock.write_lock(); // -- deadlock hereнепременно приведет к тупиковой ситуации.
Это объекты, инкапсулирующие блокировку доступа на чтение и на запись соответственно. Их использование очевидно:
{
ReadLocker rl(rwlock);
...
read-from-resource
...
}
Часто бывает удобно манипулировать классами, состоящими из разделяемого ресурса и соответствующей блокировки. Для целого ряда случаев, когда ресурс обладает свойствами "Создаваемый по умолчанию", "Присваиваемый" и "Сравниваемый" в терминологии STL [5] GradSoft C++ Toolbox предполагает автоматическое конструирование этого класса.
template<class T>
class RWLocked
{
public:
typedef T locked_type;
protected:
T v_;
RWLock rwlock_;
public:
RWLocked()
:v_(),rwlock_() {}
RWLocked(const T& v)
:v_(v),rwlock_() {}
RWLocked(const RWLocked& x);
RWLocked& operator=(const RWLocked& x);
virtual ~RWLocked();
bool operator==(const RWLocked& x);
bool operator==(const T& x);
bool operator!=(const RWLocked& x);
bool operator!=(const T& x);
public:
T& get_value_() { return v_; }
const T& get_value_() const { return v_; }
void read_lock() const
void read_unlock() const
void write_lock()
void write_unlock() const
};
template<class T>
class RWLockedPtr:public RWLocked<T*>
{
};
Как мы видим, этот шаблон корректно определяет набор операций для аргументов. Им удобно пользоваться для определения своего класса, производного от RWLocked. Также полезно использовать RWLocked как шаблон проектирования.
Сегодня програмирование на C++ немыслимо без использования STL контейнеров. Однако в многопотоковых программах мы не можем свободно использовать разделяемые STL контейнеры: как сказано в [5]
The SGI implementation of STL is thread-safe only in the sense that simultaneous accesses to distinct containers are safe, and simultaneous read accesses to to shared containers are safe. If multiple threads access a single container, and at least one thread may potentially write, then the user is responsible for ensuring mutual exclusion between the threads during the container accesses.
Поэтому мы включили в пакет Threading набор адаптеров к STL контейнерам, которые состоят из комбинации RWLock и контейнера, делегируют те методы, которые можно сделать безопасными и предоставляют доступ непосредственно к контейнеру и локу, для "ручного управления" блокировками. Это позволяет программисту выбрать оптимальное соотношение между эффективностью и сложностью кода.
Т. е. мультипотоковая двунаправленная последовательность. (Back Insertion Sequence). Этот адаптер предназначен для STL-моделей "Back Insertion Sequence" (т. е. vector, deque, list). Сначала приведем его сигнатуру, а потом прокомментируем ее:
/**
* threadsafe wrapper arround back insertion sequence.
**/
template<class container>
class threadsafe_biseq: public RWLocked<container>
{
public:
typedef threadsafe_biseq self_type;
typedef ReadLocker rlocker;
typedef WriteLocker wlocker;
typedef container container_type;
typedef typename container::value_type value_type;
typedef typename container::reference reference;
typedef typename container::const_reference const_reference;
typedef typename container::pointer pointer;
typedef typename container::iterator iterator;
typedef typename container::const_iterator const_iterator;
typedef typename container::reverse_iterator reverse_iterator;
typedef typename container::const_reverse_iterator const_reverse_iterator;
typedef typename container::difference_type difference_type;
typedef typename container::size_type size_type;
public:
threadsafe_biseq();
threadsafe_biseq(const threadsafe_biseq& x);
threadsafe_biseq(iterator beg, iterator end);
threadsafe_biseq(size_type n);
void swap(const threadsafe_biseq& x);
bool operator<(const threadsafe_biseq& x);
bool operator<=(const threadsafe_biseq& x);
bool operator>(const threadsafe_biseq& x);
bool operator>=(const threadsafe_biseq& x);
size_type size();
size_type max_size();
bool empty();
iterator begin_();
const_iterator begin_() const;
iterator end_();
const_iterator end_() const;
reverse_iterator rbegin_();
const_reverse_iterator rbegin_() const;
reverse_iterator rend_();
const_reverse_iterator rend_() const;
reference front();
reference front_();
const_reference front() const;
const_reference front_() const;
reference back();
reference back_();
const_reference back() const;
const_reference back_() const;
void push_back(const value_type& v);
void push_back_(const value_type& v);
void pop_back(void);
void pop_back_(void);
iterator insert(iterator it, const value_type& v);
iterator insert_(iterator it, const value_type& v);
iterator insert(iterator it, size_type n, const value_type& v);
iterator insert_(iterator it, size_type n, const value_type& v);
iterator insert(iterator it, iterator p, iterator q);
iterator insert_(iterator it, iterator p, iterator q);
iterator erase(iterator p);
iterator erase_(iterator p);
iterator erase(iterator p, iterator q);
iterator erase_(iterator p, iterator q);
void clear();
void clear_();
void resize(size_type n, const value_type* v);
void resize_(size_type n, const value_type* v);
const container& get_container_() const;
{ return v_; }
container& get_container_();
{ return v_; }
};
Как мы видим этот класс определяет обычные для контейнеров типы и делегирует две версии каждой операции. Операция, знак которой не содержит в конце знака подчеркивания, блокирует соответствующий RWLock автоматически. Операции со знаком подчеркивания не трогают его, оставляя контроль блокировки программисту.
Несколько примеров использования (как правильного, так и неправильного)
typedef threadsafe_biseq<vector<int> > StorageType; StorageType storage; // 0 ..................... storage.push_back(10); // 1. безопасно. ....................... storage.write_lock(); storage.push_back_(10); // 2. то-же что и 1. storage.write_unlock(); .................... storage.write_lock(); storage.push_back(10); // 3. - состояние deadlock-а storage.write_unlock(); .................... storage.write_lock(); remove_if(storage,storage.begin_(),storage.end_(),10); // 4 - Ok storage.write_unlock(); ..................
Этот адаптер предназначен для "Уникальных Ассоциативных Контейнеров". (Unique Asssociative Container). В STL этой модели удовлетворяют set, map, hash_set, hash_map.
Так как его реализация в целом похожа на threadsafe_biseq, мы не будем
приводить сигнатуру класса и описывать ее. Обратитесь к API документации
для подробного изучения; примеры использования можно найти в поддиректори
demo/Containers пакета Threading.
Как Вы уже вероятно догадались, это адаптер для Multiple Associative Containers (multiset, mulimap, hash_multiset, hash_multimap).
Использования этого адаптера в целом аналогично ичспользованию threadsafe_uac.
Часто-встречающаяся техника программирования на C++ - использование т. н.
``умных указателей'' - классов, содержащих указатели на разделяемые объекты
и счетчики ссылок. При работе с подобными вещами следует учитывать, что
использование разделяемых указателей в программах должно учитывать
многопоточность. Grad-Soft C++ ToolBox предоставляет для этой цели класс
counted_mt_ptr, который представляет собой безопасный относительно
многопоточности указатель со счетчиком ссылок.
Использование counted_mt_ptr в точности такое-же, как и у
counted_ptr из раздела ptrs этого-же пакета.
Типичный пример использования:
GradSoft::counted_mt_ptr<MyObject,GradSoft::ptr::safe> obj(new MyObject());
callSomethingInParallelThread(obj);
.........
try {
obj->myFun()
}catch(const GradSoft::NullPointerException& ex){
Object was settet to NULL, do something
}
Здесь второй аргумент шаблоне играет ту же роль, что и в counted_ptr,
см. [2].
В дополнение к безопасным методам доступа (get(), *, ->, ->_) и
усатновки assign
определены методы get()_ и assign()_, который возвращает
указатель не трогая соответствующий мьютекс, и метод getMutex(),
возвращающий мьютекс ``в чистом виде''.
Кроме синхронизации доступа к соместно используемым данным, Threading предоставляет набор примитивов асинхронного взаимодействия потоков.
Эти примитивы инкапсулированыы в класс ThreadEvent, сигнатура
которого выглядит следующим образом:
/**
* Thread Event (Condition) class
**/
class ThreadEvent
{
public:
///
ThreadEvent() throw(ThreadingExceptions::NoResources,
ThreadingExceptions::InternalError);
///
~ThreadEvent() throw(ThreadingExceptions::ResourceBusy,
ThreadingExceptions::InternalError);
///
void wait() throw(ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
///
void wait(long timeout)
throw(ThreadingExceptions::PossibleDeadlock,
ThreadingExceptions::InternalError);
///
void notify() throw();
///
void notifyAll() throw();
private:
....
};
Значение методов следуйщие:
te.notify() или
te.notifyAll()
timeout (в милисекундах)
Семантика этой модели довольно подробно описанна в литературе,
Если мы посмотрим на другие известные API многопоточности, то увидим
прямое соответствие семейству функций pthread_cond в pthread и
семейству методов асинхронного взаемодейсвия в языке Java .
Проиллюстритуем типичное использование ThreadEvent на классическом
примере ограниченного
буфера: 2 потока: поставщик и потребитель, взаимодействуют друг с другом
посредством ограниченного буфера, в котором может уместиться
maxBufferSize
элементов. Поставщик добавляет элементы в буфер, вызывая метод put,
потребитель читает, вызывая метод get. Когда буфер полон, поставщик
в методе get приостанавливает работу, ожидая когда в буфере появится
свободное место; когда буфер пуст соответственно останавливается потребитель.
class BoundedBuffer
{
ThreadEvent elementsExists_;
ThreadEvent freeSpaceExists_;
..........
public:
void put(ElementType element)
{
if (getNumberElements() >= maxBufferSize_) {
freeSpaceExists.wait();
}
... do actual put
elementsExists_.notify();
}
ElementType get()
{
if (getNumberElements() == 0) {
elementsExists_.wait();
}
... do actual get
freeSpaceExists_.notify();
return retval;
}
.....
};
Как видим, у нас есть 2 события, которые связаны с изменением
определенных логических условий
3. Одно событие происходит, когда в буфере появляется
хотя-бы один элемент, второе - когда в буфере есть место для хотя-бы одного
элемента. Если условие не выполняется, то мы ждем соответствующего события:
так, например если в BoundedBuffer::get в буфере нет ни одного
элемента, то мы ждем когда он появится. Мы извещаем потребителя о том,
что элемент появился в конце метода BoundedBuffer::put, когда
это условие точно выполняется - ведь мы только-чо положили туда элемент.
Кстати, заметим что данный пример несколько неоптимален и поддается очевидной оптимизации.
Один из известных и полезных шаблонов проектирования - организация асинхронной обработки запросов. Эта техника позволяет минимизировать время отклика приложения (когда работает "длинный" запрос приложение остается активным и может отвечать на другие запросы) и увеличить параметры масштабируемости и живучести приложения.
GradSoft C++ ToolBox поддерживает использование такого стиля и определяет
некоторую инфраструктуру, в рамках которой программисту предоставляется набор
готовых элементов для асинхронного выполнения запросов и готовых "исполнителей"
запроса типа ThreadPool, SingleThreadBlocking и. т. д.
Типичное использование сервисов потоков выглядит следующим образом:
run
process, который их асинхронно обрабатывает (либо возвращает
прерывание TemoraryNoResources если сервис не способен обработать
запрос в данный момент).
Теперь - подробнее:
Класс Runnable выглядит следующим образом:
/**
* Abstract class for runnable
* Runnable is item of execution
**/
class Runnable
{
public:
///
Runnable();
///
virtual ~Runnable();
///
virtual void run() = 0;
private:
Runnable(const Runnable& );
Runnable& operator=(const Runnable&);
};
Как видим, определение очень похоже на соответствующий java интерфейс.
ThreadService это абстрактный класс, определенный следующим образом:
/**
* ThreadService: entity which process Runnable
* (Runnable may be events, network connections, etc)
* Typical usage pattern:
* 1. Generator generates Runnable
* 2. this Runnables are passed to ThreadServices,
* with help of call ThreadService::process
* 3. ThreadService process this runnable, asynchronicly or
* synchronisly.
*
* ThreadService can be in active or inactive state.
* When it in active state, it can process requests.
* When in inactive - can't.
*
**/
class ThreadService
{
public:
/**
* This exception is throwed, when we try to use
* not-activated ThreadService
**/
struct NotActive {};
private:
<implementation-depended>
public:
///
ThreadService();
///
virtual ~ThreadService();
///
virtual void process(Runnable* runnable)=0;
///
bool is_active() const { return active_.value(); }
///
virtual void activate();
///
virtual void deactivate(bool shutdown);
protected:
virtual void mark_deactivate();
..............
private:
ThreadService(const ThreadService&);
ThreadService& operator=(const ThreadService&);
};
Опишем методы ThreadService более подробно:
activate - активировать сервис потоков. Этот метод должен
быть вызван перед вызовами process. Его работа заключается в
установке внутренних флагов и возможном запуске внутренних потоков
исполнения.
is_active - возвращает true, если сервис находится
в активном состоянии, иначе false
process - сосбственно исполняет Runnable. Это может происходить
синхронно или асинхронно, в зависимости от избранной конкретной реализации
сервиса. Этот метод бросает прерывание: ThreadService::NotActive,
если сервис находится в неактивном состоянии;
ThreadingExceptions::TemporaryNoResources, если сервис не может
принять к исполнению аргумент runnable в данный момент времени.
deactivate - деактивирует сервис. Параметр shutdown
указывает: останавливать ли сервис немедленно, приостанавлвая выполнение
текущих заданий (true) или перевести сервис в неактивное
состояние после выполнения текущего списка заданий.
mark_deactivate может быть полезен при реализации своих
сервисов исполнения. Его реализация по умолчанию просто устанавливает флаг
активации в false
Несколько конкретных реализаций сервисов исполнения поставляются в составе
GradSoft C++ ToolBox. Заметим, что для их использования вы должны
включить соответствующий включаемый файл. Среди них:
Это самый "тривиальный" сервис исполнения, который исполняет задания в текущем
потоке (т. е. process - синхронный вызов).
Запросы выполняются асинхронно в выделенном потоке этого сервиса. Очередь
запросов не организовывается, поэтому если сервис занят исполнением одного запроса, вызов process приведет к прерыванию TemporaryNoResources.
Запросы выполняются асинхронно в выделенном потоке этого сервиса.
Организованна очередь запросов. Размер очереди должен быть передан
в конструктор SingleThreadReactive.
Поведение process зависит от флага режима, переданного в конструкторе.
это может быть один из:
SingleThreadReactive::Blocked - если очередь полна, то
process блокирует текущий поток исполнения, пока в очереди заданий
не появится свободное место.
SingleThreadReactive::Checked - если очередь полна, то
process вызывает прерывание TemporaryNoResources
SingleThreadReactive::CheckedWithTimeout - если очередь полна,
то process ждет timeout милисекунд, после чего вызывает
прерывание TemporaryNoResources если во входная очередь по прежнему
полна.
Запросы выполняются асинхронно. Для каждой задачи выделяется отдельный поток исполнения, заканчивающийся вместе с задачей.
Запросы выполняются асинхронно в пуле потоков этого сервиса: любой свободный
поток исполняет текущий запрос; если все потоки заняты, то запрос кладется
в очередь. Количество потоков в пуле и размер очереди должны быть заданны
в параметрах конструктора.
Поведение process при переполнении очереди зависит от значения флага
режима: (ThreadPool::Blocked или ThreadPool::Checked или
ThreadPool::CheckedWithTimeout ). Семантика флагов режима
такая-же, как и в SingleThreadReactive.
ThreadingConfig.h, генерируемом при компиляции пакета.
Потенциально возможен конфликт между определениями в ThreadingConfig.h
и определниями из других макропакетов. Для того, что-бы этого не
произошло мы рекомендуем заключать ваши макроопределения autoconf в
предложения условной компиляции:
#ifndef HAVE_Xxx #undef HAVE_Xxx #endif
Threading.h,
либо определить препроцессорный символ _GNU_SOURCE до любого включения системных заголовочных файлов.
Вы должны компилировать пакеты, использующие пакет Threading с libThreading (для UNIX) или Threading для Windows NT.
Соответствующий ключ компилятору: oбычно -lThreading.
ThreadContext
counted_mt_ptr