next_inactive up previous


Threading: Руководство программиста
DocumentId:GradSoft-PR-09.09.2000-vC


Contents

Введение

Пакет Threading предназначен для организации мультипотоковых программ. Он включает в себя набор платформенно-независимых примитивов, оформленных в виде C++ классов, с помощью которых можно обеспечить эффективную платформенно независимую организацию програм с несколькими потоками управления и общими данными.

В этом документе подразумевается знакомство с основными концепциями многопоточности, необходимое введение можно найти в [4], [3]

Об этом пакете

Данное ПО разработанно и поддерживается компанией GradSoft, Киев, Украина. Последняя версия этого пакета доступна на www-сайте: http://www.gradsoft.com.ua. Вы можете свободно использовать этот пакет и включать его в свои программы, в соответствии с лицензией, находящейся в файле docs/LICENSE (либо docs/LICENSE.rus в дистрибутиве пакета GradSoft C++ ToolBox. При необходимости возможно коммерческое сопровождение пакета.

Об этом документе

Данное Руководство Программиста написанно для версии Threading, входящей в состав GradC++ToolBox версии 1.2.0. Тут описано использование API Threading с точки зрения программиста. Порядок инсталляции пакета описан в Руководстве Администратора пакета GradSoft C++ ToolBox [1]

Описание классов

Иерархия исключений

Наиболее часто используемой функцией бибилиотеки классов, без сомнения, является обработка ошибок ;)

Методы классов из Threading генерируют исключения, являющиеся потомками ThreadingExceptions::Failure. Эти исключения образуют иерархию, показанную на следующей схеме:

 std::runtime_error
    |
    |
    *-ThreadingExceptions::Failure
          |
          |----------ThreadingExceptions::NoResources
          |                     |
          |                     |----ThreadingExceptions::NoMemory
          |                     |----ThreadingExceptions::TemporaryNoResources
          |                     *----ThreadingExceptions::NoPermission 
          |
          |----ThreadingExceptions::ResourceBusy
          |----ThreadingExceptions::InvalidResource
          |----ThreadingExceptions::PossibleDeadlock
          |----ThreadingExceptions::SystemError
          |----ThreadingExceptions::InternalError
          *----ThreadingExceptions::NotImplemented

Рассмотрим эту иерархию подробнее:

При обработке исключения программисту доступно сообщение об ошибке и системно-зависимый код ошибки. Для этого используются методы класса ThreadingExceptions::Failure getErrorMessage() и getErrorCode() соответственно.

Thread


Общий принцип

Класс Thread позволяет пользователю создавать свои потоки управления как потомки класса Thread, встраивая свою логику в методе Thread::run.

Сначала рассмотрим простой пример:

class CountThread: public Thread
{
public:

  void run()
  {
    for(int i=0; i<1000; ++i)
    {
     cout << "i=" << i << endl;
     sleep(1);
    } 
  }

};

Этот поток выводит через каждую секунду текущее значение i. Полностью программа, использующая CountThread может выглядеть следующим образом 1:

#include <GradSoft/Threading.h>
 ...
class CountThread {
   ....
}
 ...
int main(int argc, char** argv)
{
 CountThread countThread;
 countThread.start();
 Thread::sleepCurrent(100);
 return 0;
}

Как мы видим, для того что-бы запустить поток необходимо обратится к методу Thread::start(); по окончании программы поток автоматически останавливается.

Заметим, что хотя цикл countThread рассчитан на выполнение в течение 1000 секунд, он будет остановлен в деструкторе countThread в течении 100 секунд после старта.

Модифицируем программу так, что-бы main ждал окончания цикла:

int main(int argc, char** argv)
{
 CountThread countThread;
 countThread.start();
 while(countThread.is_running()) {
   Thread::sleepCurrent(1);
 }
 return 0;
}

Т. е.

Отметим, что использованный в данном примере циклический вызов Thread::is_running() не является единственным способом дождаться завершения процесса - в классе Thread определен статический метод join(const Thread&) с обычной семантикой: ждать пока аргумент завершится и слить его с текущим потоком управления.

Таким образом, функцию main() последнего примера более естественно было бы определить так:

int main(int argc, char** argv)
{
 CountThread countThread;
 countThread.start();
 Thread::join(countThread);
 return 0;
}

Точки останова

Теперь предложим Вашему вниманию следующий пример:

 class Forever: public Thread
 {
 public:
   void run(void)
   {
     for(;;);
   }
 };

int main(int,char**)
{
 Forever forever;
 forever.start();

 Thread::sleepCurrent(10);

 return 0;
}
Попробуйте окомпилировать и запустить этот пример на нескольких платформах, скажем на Sun Solaris и на Linux. Вы обнаружите, что программа будет вести себя по разному: на Sun Solaris она проживет 10 секунд и завершится; на Linux - будет длиться вечно, пока мы не прервем ее средствами операционной системы. Так происходит потому, что различные операционные системы поддерживают различные модели выполнения потоков (например, асинхронную или задержанную), и в некоторых ОС останов потока внешним событием возможен только в т.н. точках останова (cancelation points), которые могут содержаться в функциях операционной системы, а могут и не содержаться.

Так вот, класс Thread определяет несколько методов, в которые точки останова точно встроенны. Это следующие методы:

Если Вы хотите чтобы запущенный Вами поток гарантировано завершался при завершении программы, Вы должны вы должны регулярно вызывать какой-то из этих методов в методе run(). Отметим, что класс CountThread раздела 2.2.1 использует метод Thread::sleep() в перегруженном методе CountThread::run(), поэтому следующий код:
int main(int argc, char** argv)
{
 CountThread countThread;
 countThread.start();
 Thread::sleepCurrent(5);
 return 0;
}
приведет к завершению запущенного процесса через не более чем через 6 секунд после старта.

Точки переключения

Еще одно полезное понятие в проектировании параллельных програм - понятие точки переключения: в этой точке планировщик операционной системы выполняет переключение текущего процессора на другую задачу. В Threading это переключение можно инициировать, с помощью статического метода Thread::yield().

Вызов yield() бывает полезен при организации сервис-ориентирванных приложений.

Контекст потока

В системном программировании нам часто бывает надо ассоциировать с потоком какие-то данные, о которых ничего не известно при создании потока. Пример: асинхронный ввод/вывод, при котором операционная система или ORB активирует callback приема сообщения в потоке, неизвестном заранее либо обработка неявных транзакций, которые зависят от потока выполнения.

Для этих целей в Threading существует инфраструктура работы с контекстами потока (ThreadContext).

Посмотрим на основные определения:

Сначала посмотрим на механизм слотов: что это такое ? - ответ: все, что вам угодно. Слоты создаются пользователем, ThreadContext хранит в себе индексированную последовательность слотов. Вы можете создать слот и ассоциировать его с экземпляром ThreadContext и индексом, также вы можете взять слот по данному индексу.

Пример: пусть мы проектируем приложение, в котором сетевые сообщения передаются в поставляемые пользователем классы, и потоки приема сообщений являются абсолютно независимыми от этих классов. Тогда, нам возможно надо ассоциировать с потоком две вещи: идентификатор текущей операции потока и ссылку на соединение, обрабатывающее этот поток. Мы заранее распределяем индексы слотов, начиная с нуля:

#define CURRENT_THS_INDEX 0
#define CONNECTION_THS_INDEX 0

Определяем слот для сообщения:

class CurrentThreadSlot: public ThreadContext::Slot
{
public:

  .........
 
 // unmarshall message and set parameters and operation.
 void setData(BinaryBuffer message);

 static CurrentThreadSlot*  getOrCreate();

private:
 std::string operation_;
 std::auto_ptr<Parameters> params_;
};

В месте приема сообщения создаем/выбираем нужный нам слот:

 accept..
 // 
 .....
 unmarshall(buffer,operation,params);
 CurrentThreadSlot()->getOrCreate()->setData();
 ///

Где метод getOrCreate выглядит следующим образом:

CurrentThreadSlot* CurrentThreaSlot::getOrCreate()
{
 ThreadContext* threadContext=ThreadContext::current();
 ThreadContext::Slot* ts= treadContext->getSlot(CURRENT_THS_INDEX);
 CurrentThreadSlot* cts=NULL;
 if (ts==NULL) {
   cts = new CurrentThreadSlot();
   treadContext->alloc(cts,CURRENT_THS_INDEX);
 }else{
   cts=dynamic_cast<CurrentThreadSlot*>(ts);
   if (cts==NULL) throw InternalError("bad value in operation thread context");
 }
 return cts; 
}

И в API предоставляемом пользователю определим функцию получения текущего сообщения:

class Current
{
public:
  static std::string get_operation()
  {
   return getThreadSlot()->get_operation();
  }
  ........
private:

  CurrentThreadSlot* getThreadSlot()
  {
   ThreadContext* threadContext=ThreadContext::current();
   ThreadContext::Slot* ts= treadContext->getSlot(CURRENT_THS_INDEX);
   if (ts==NULL) throw NoOperation();
   CurrentThreadSlot* cts=dynamic_cast<CurrentThreadSlot*>(ts);
   if (ts==NULL) throw InternalError("bad value in operation thread context");
   return cts; 
  }

}

Со вторым слотом производим аналогичные действия.

Ну и наконец unsigned int ThreadContext::allocSlot(x) предназанчена для использования в более сложных ситуациях, когда нам необходимо сформировать множество индексов слотов динамически.

Так, со слотами разобрались, теперь перейдем собственно к Threading: как соотносятся контексты потоков и сами потоки:

Время жизни ThreadContext это время существования потока - все слоты автоматически удаляются системой после завершения соответствующего потока.

Так, например, использование следующего слота выполнения потока будет приводить к тому, что через некоторое время после завершения потока будет выводится сообщения:

class EndThreadSlot: public GradSoft::ThreadSlot
{
 int id_;
public:
 EndThreadSlot(int id):id_(id) {  }
 ~EndThreadSlot() 
  { cerr << "thread which binded context with id " << id_ 
         << " was finished some time ago" << endl; }
};

Кстати, этот пример также иллюстрирует зависмость поведенияч от системы: для posix-совместимых систем это сообщение будет выводится во время разрушения потока, для Win32 - действительно, спустя некоторое время ;)

Итого:

  1. Программист определяет свои потоки исполнения путем создания объекта, который наследуется от Thread, и перегрузки в нем метода run.
  2. Для старта потока должнен быть вызван метод Thread::start();
  3. Для возможности останова потока "извне" один из методов testcancel, sleep, nanosleep должен регулярно вызываться из run.
  4. Для проверки статуса потока можно воспользоваться методом Thread::is_running();
  5. Поток останавливается, когда:
    1. функция run оканчивает работу.
    2. Происходит вызов Thread::cancel (при условии достижения в run точек останова)
    3. Происходит вызов деструктора (при условии достижения в run точек останова)
  6. Вы можете принудительно инициировать переключение активных процессов в операционной системе (что может привести к переключению текущего потока в данном процессе) с помощью вызова Thread::yield
  7. Поток утилизируется операционной системой, когда:
    1. Поток остановился и вызвана Thread::join
    2. Происходит вызов деструктора Thread.
  8. С каждым потоком связан 'контекст потока', который организован как индексированный набор слотов.
  9. Этот контекст может быть ассоциирован как с потоками созданными средствами GradSoft C++ ToolBox, так и с потоками, созданными другими компонентами
  10. Слоты в контексте автоматически удаляются после завершения соответствующего потока.

Примитивы сихронизации

При написании мультипотоковых программ необходимо организовывать совместный доступ к ресурсам из различных потоков управления. Для этой цели в Threading определенны следующие примитивы синхронизации:

Mutex

(от слов Mutual Exclusive Lock). Как видно из названия, mutex предусматривает использование в ресурса данный момент времени только из одного потока.

У класса Mutex есть 3 метода: lock, try_lock и unlock. Процесс должен перед использованием ресурса заблокировать соответствующий mutex (т. е. вызвать Mutex::lock), потом разблокировать (вызвать Mutex::unlock). try_lock делает попытку заблокировать mutex без ожидания. Если она неудачна (т. е. ресурс к этому моменту уже занят) - возвращает false.

......
resourceMutex.lock();
 .. work with shared resource here ...
resourceMutex.unlock();
..........

Заметим, что операции с mutex-ами должны быть атомарными (в смысле неделимыми).Т. е. кратчайший способ добиться ситуации deadlock (тупика) - это вызвать одну операцию, блокирующую ресурс, из другой, которая также блокирует доступ к этому-же ресурсу.

Например, следующий фрагмент кода введет программу в бесконечное ожидание:

mutex.lock();
mutex.lock(); // -- deadlock here
mutex.unlock();
mutex.unlock();

Заметим, что на некоторых платформах будет сгенерированно прерывание PossibleDeadlock, на некоторых - смертельные объятия действительно произойдут.

MutexLocker

Часто бывает удобно "спрятать" блокировку в конструктор/деструктор объекта. Для этого предназначен класс MutexLocker с двумя методами:

Пример его использования:

 Y X::f() 
 {
  MutexLocker l(yMutex_);
  return sharedY_;
 }

вместо:

Y X::f() 
{
 yMutex_.lock();
 Y retval = sharedY_;
 yMutex_.unlock();
 return retval;
}

RWLock

Еще одна часто встречающаяся модель : так называемая read/write блокировка. То есть ресурс доступен для чтения и для записи. В конкретный момент времени к нему могут обращаться либо несколько читателей, либо один писатель.

Соответствующий класс предусмотрен в нашей модели и называется: RWLock.

Посмотрим внимательней на сигнатуру этого класса:

/**
 * Read/Write lock
 * 
 *  allow multiple readers/single writer.
 *  access to object with such lock must be 
 *  sequence of atomic read-only or write-only
 *  operations.
 *
 *  (i. e. rwlock.read_lock(), rwlock.write_lock() is
 * the fastest way to deadlock state.
 **/
class RWLock
{
 ........
public:

   RWLock()  throw(ThreadingExceptions::NoResources,
                   ThreadingExceptions::InternalError);

   virtual ~RWLock() throw(ThreadingExceptions::ResourceBusy,
                           ThreadingExceptions::InternalError);

   void  read_lock() const
                      throw(ThreadingExceptions::NoResources,
                            ThreadingExceptions::PossibleDeadlock,
                            ThreadingExceptions::InternalError);

   bool  try_read_lock() const
                      throw(ThreadingExceptions::NoResources,
                            ThreadingExceptions::PossibleDeadlock,
                            ThreadingExceptions::InternalError);

   void  read_unlock() const
                      throw(ThreadingExceptions::NoPermission,
                            ThreadingExceptions::InternalError);

   void  write_lock()
                      throw(ThreadingExceptions::NoResources,
                            ThreadingExceptions::PossibleDeadlock,
                            ThreadingExceptions::InternalError);

   bool  try_write_lock()
                      throw(ThreadingExceptions::NoResources,
                            ThreadingExceptions::PossibleDeadlock,
                            ThreadingExceptions::InternalError);

   void  write_unlock() const
                      throw(ThreadingExceptions::NoPermission,
                            ThreadingExceptions::InternalError);

};

Доступ к ресурсу на чтение вы должны обрамлять парой read_lock / read_unlock, на запись: write_lock, write_unlock.

Контроль атомарности операций предоставляется программисту, исполнение следующего фрагмента кода:

rwlock.read_lock();
rwlock.write_lock(); // -- deadlock here
непременно приведет к тупиковой ситуации.

ReadLocker, WriteLocker

Это объекты, инкапсулирующие блокировку доступа на чтение и на запись соответственно. Их использование очевидно:

 {
  ReadLocker rl(rwlock);
  ...
  read-from-resource
  ...
 }

RWLocked

Часто бывает удобно манипулировать классами, состоящими из разделяемого ресурса и соответствующей блокировки. Для целого ряда случаев, когда ресурс обладает свойствами "Создаваемый по умолчанию", "Присваиваемый" и "Сравниваемый" в терминологии STL [5] GradSoft C++ Toolbox предполагает автоматическое конструирование этого класса.

template<class T>
class RWLocked
{
public:

 typedef T locked_type;

protected:

 T v_;
 RWLock rwlock_;

public:
 
 RWLocked()
  :v_(),rwlock_() {}

 RWLocked(const T& v)
  :v_(v),rwlock_() {} 

 RWLocked(const RWLocked& x);

 RWLocked& operator=(const RWLocked& x);

 virtual ~RWLocked();

 bool  operator==(const RWLocked& x);
 bool  operator==(const T& x);
 bool  operator!=(const RWLocked& x);
 bool  operator!=(const T& x);
 
public:

 T&       get_value_()  { return v_; }
 const T& get_value_() const { return v_; }

 void  read_lock() const
 void  read_unlock() const

 void  write_lock()
 void  write_unlock() const

};

template<class T>
class RWLockedPtr:public RWLocked<T*>
{
};

Как мы видим, этот шаблон корректно определяет набор операций для аргументов. Им удобно пользоваться для определения своего класса, производного от RWLocked. Также полезно использовать RWLocked как шаблон проектирования.

Использование STL контейнеров

Сегодня програмирование на C++ немыслимо без использования STL контейнеров. Однако в многопотоковых программах мы не можем свободно использовать разделяемые STL контейнеры: как сказано в [5]

The SGI implementation of STL is thread-safe only in the sense that simultaneous accesses to distinct containers are safe, and simultaneous read accesses to to shared containers are safe. If multiple threads access a single container, and at least one thread may potentially write, then the user is responsible for ensuring mutual exclusion between the threads during the container accesses.

Поэтому мы включили в пакет Threading набор адаптеров к STL контейнерам, которые состоят из комбинации RWLock и контейнера, делегируют те методы, которые можно сделать безопасными и предоставляют доступ непосредственно к контейнеру и локу, для "ручного управления" блокировками. Это позволяет программисту выбрать оптимальное соотношение между эффективностью и сложностью кода.

treadsafe_biseq

Т. е. мультипотоковая двунаправленная последовательность. (Back Insertion Sequence). Этот адаптер предназначен для STL-моделей "Back Insertion Sequence" (т. е. vector, deque, list). Сначала приведем его сигнатуру, а потом прокомментируем ее:

/**
 * threadsafe wrapper arround back insertion sequence.
 **/
template<class container>
class threadsafe_biseq: public RWLocked<container>
{
public:

  typedef threadsafe_biseq self_type;
  typedef ReadLocker rlocker;
  typedef WriteLocker wlocker;
  
  typedef container container_type;

  typedef typename container::value_type value_type;
  typedef typename container::reference reference;
  typedef typename container::const_reference const_reference;
  typedef typename container::pointer pointer;

  typedef typename container::iterator iterator;
  typedef typename container::const_iterator const_iterator;

  typedef typename container::reverse_iterator reverse_iterator;
  typedef typename container::const_reverse_iterator const_reverse_iterator;

  typedef typename container::difference_type difference_type;
  typedef typename container::size_type size_type;

public:

  threadsafe_biseq();
  threadsafe_biseq(const threadsafe_biseq& x);
  threadsafe_biseq(iterator beg, iterator end);
  threadsafe_biseq(size_type n);

  void swap(const threadsafe_biseq& x);

  bool operator<(const threadsafe_biseq& x);
  bool operator<=(const threadsafe_biseq& x);
  bool operator>(const threadsafe_biseq& x);
  bool operator>=(const threadsafe_biseq& x);

  size_type size();
  size_type max_size();

  bool      empty();

  iterator begin_();
  const_iterator begin_() const;

  iterator end_();
  const_iterator end_() const;

  reverse_iterator rbegin_();
  const_reverse_iterator rbegin_() const;

  reverse_iterator rend_();
  const_reverse_iterator rend_() const;

  reference front();
  reference front_();
  const_reference front() const;
  const_reference front_() const;

  reference back();
  reference back_();
  const_reference back() const;
  const_reference back_() const;

  void push_back(const value_type& v);
  void push_back_(const value_type& v);

  void pop_back(void);
  void pop_back_(void);

  iterator insert(iterator it, const value_type& v);
  iterator insert_(iterator it, const value_type& v);
  iterator insert(iterator it, size_type n, const value_type& v);
  iterator insert_(iterator it, size_type n, const value_type& v);
  iterator insert(iterator it, iterator p, iterator q);
  iterator insert_(iterator it, iterator p, iterator q);

  iterator erase(iterator p);
  iterator erase_(iterator p);
  iterator erase(iterator p, iterator q);
  iterator erase_(iterator p, iterator q);
  
  void  clear();
  void  clear_();

  void  resize(size_type n, const value_type* v);
  void  resize_(size_type n, const value_type* v);

  const container& get_container_() const;
  { return v_; }
  container& get_container_();
  { return v_; }

};

Как мы видим этот класс определяет обычные для контейнеров типы и делегирует две версии каждой операции. Операция, знак которой не содержит в конце знака подчеркивания, блокирует соответствующий RWLock автоматически. Операции со знаком подчеркивания не трогают его, оставляя контроль блокировки программисту.

Несколько примеров использования (как правильного, так и неправильного)

  typedef threadsafe_biseq<vector<int> > StorageType;
  StorageType storage; // 0
  .....................
  storage.push_back(10); // 1. безопасно.
  .......................
  storage.write_lock();
  storage.push_back_(10);  // 2. то-же что и 1.
  storage.write_unlock();
  ....................
  storage.write_lock();
  storage.push_back(10); // 3. - состояние deadlock-а
  storage.write_unlock();
  ....................
  storage.write_lock();
  remove_if(storage,storage.begin_(),storage.end_(),10); // 4 - Ok
  storage.write_unlock();
  ..................

threadsafe_uac

Этот адаптер предназначен для "Уникальных Ассоциативных Контейнеров". (Unique Asssociative Container). В STL этой модели удовлетворяют set, map, hash_set, hash_map.

Так как его реализация в целом похожа на threadsafe_biseq, мы не будем приводить сигнатуру класса и описывать ее. Обратитесь к API документации для подробного изучения; примеры использования можно найти в поддиректори demo/Containers пакета Threading.

threadsafe_mac

Как Вы уже вероятно догадались, это адаптер для Multiple Associative Containers (multiset, mulimap, hash_multiset, hash_multimap).

Использования этого адаптера в целом аналогично ичспользованию threadsafe_uac.

Классы-указатели и мультипоточность

Часто-встречающаяся техника программирования на C++ - использование т. н. ``умных указателей'' - классов, содержащих указатели на разделяемые объекты и счетчики ссылок. При работе с подобными вещами следует учитывать, что использование разделяемых указателей в программах должно учитывать многопоточность. Grad-Soft C++ ToolBox предоставляет для этой цели класс counted_mt_ptr, который представляет собой безопасный относительно многопоточности указатель со счетчиком ссылок.

Использование counted_mt_ptr в точности такое-же, как и у counted_ptr из раздела ptrs этого-же пакета.

Типичный пример использования:

 GradSoft::counted_mt_ptr<MyObject,GradSoft::ptr::safe> obj(new MyObject());

 callSomethingInParallelThread(obj);
 
 .........

 try {
   obj->myFun()
 }catch(const GradSoft::NullPointerException& ex){
   Object was settet to NULL, do something 
 }

Здесь второй аргумент шаблоне играет ту же роль, что и в counted_ptr, см. [2].

В дополнение к безопасным методам доступа (get(), *, ->, ->_) и усатновки assign определены методы get()_ и assign()_, который возвращает указатель не трогая соответствующий мьютекс, и метод getMutex(), возвращающий мьютекс ``в чистом виде''.

Примитивы асинхронного взаимодействия: ThreadEvent

Кроме синхронизации доступа к соместно используемым данным, Threading предоставляет набор примитивов асинхронного взаимодействия потоков.

Эти примитивы инкапсулированыы в класс ThreadEvent, сигнатура которого выглядит следующим образом:

/**
 * Thread Event (Condition) class
 **/
class ThreadEvent
{
public:
   ///
   ThreadEvent() throw(ThreadingExceptions::NoResources,
                            ThreadingExceptions::InternalError);

   ///
   ~ThreadEvent() throw(ThreadingExceptions::ResourceBusy,
                            ThreadingExceptions::InternalError);

   ///
   void wait()  throw(ThreadingExceptions::PossibleDeadlock,
                      ThreadingExceptions::InternalError);

   ///
   void wait(long timeout)
                  throw(ThreadingExceptions::PossibleDeadlock,
                        ThreadingExceptions::InternalError);

   ///
   void notify() throw();

   ///
   void notifyAll() throw();

private:

  ....

};

Значение методов следуйщие:

Семантика этой модели довольно подробно описанна в литературе, Если мы посмотрим на другие известные API многопоточности, то увидим прямое соответствие семейству функций pthread_cond в pthread и семейству методов асинхронного взаемодейсвия в языке Java .

Проиллюстритуем типичное использование ThreadEvent на классическом примере ограниченного буфера: 2 потока: поставщик и потребитель, взаимодействуют друг с другом посредством ограниченного буфера, в котором может уместиться maxBufferSize элементов. Поставщик добавляет элементы в буфер, вызывая метод put, потребитель читает, вызывая метод get. Когда буфер полон, поставщик в методе get приостанавливает работу, ожидая когда в буфере появится свободное место; когда буфер пуст соответственно останавливается потребитель.

 class BoundedBuffer
 {
  ThreadEvent elementsExists_;
  ThreadEvent freeSpaceExists_;
   ..........
  public:

   void put(ElementType element)
   {
    if (getNumberElements() >= maxBufferSize_) {
      freeSpaceExists.wait(); 
    }
    ... do actual put
    elementsExists_.notify();
   }

   ElementType get()
   {
    if (getNumberElements() == 0) {
      elementsExists_.wait();
    }
    ... do actual get
    freeSpaceExists_.notify();
    return retval;
   }

   .....

 };

Как видим, у нас есть 2 события, которые связаны с изменением определенных логических условий 3. Одно событие происходит, когда в буфере появляется хотя-бы один элемент, второе - когда в буфере есть место для хотя-бы одного элемента. Если условие не выполняется, то мы ждем соответствующего события: так, например если в BoundedBuffer::get в буфере нет ни одного элемента, то мы ждем когда он появится. Мы извещаем потребителя о том, что элемент появился в конце метода BoundedBuffer::put, когда это условие точно выполняется - ведь мы только-чо положили туда элемент.

Кстати, заметим что данный пример несколько неоптимален и поддается очевидной оптимизации.

Понятие сервиса потока (ThreadService)

Один из известных и полезных шаблонов проектирования - организация асинхронной обработки запросов. Эта техника позволяет минимизировать время отклика приложения (когда работает "длинный" запрос приложение остается активным и может отвечать на другие запросы) и увеличить параметры масштабируемости и живучести приложения.

GradSoft C++ ToolBox поддерживает использование такого стиля и определяет некоторую инфраструктуру, в рамках которой программисту предоставляется набор готовых элементов для асинхронного выполнения запросов и готовых "исполнителей" запроса типа ThreadPool, SingleThreadBlocking и. т. д.

Типичное использование сервисов потоков выглядит следующим образом:

Теперь - подробнее:

Класс Runnable выглядит следующим образом:

/**
 * Abstract class for runnable
 * Runnable is item of execution
 **/
class Runnable
{
public:

  ///
  Runnable();

  ///
  virtual ~Runnable();

  ///
  virtual void run() = 0;

private:

  Runnable(const Runnable& );
  Runnable& operator=(const Runnable&);

};

Как видим, определение очень похоже на соответствующий java интерфейс.

ThreadService это абстрактный класс, определенный следующим образом:

/**
 * ThreadService: entity which process Runnable
 * (Runnable may be events, network connections, etc)
 * Typical usage pattern:
 *  1. Generator generates Runnable
 *  2. this Runnables are passed to ThreadServices,
 *    with help of call ThreadService::process 
 *  3. ThreadService process this runnable, asynchronicly or
 *   synchronisly.
 *
 * ThreadService can be in active or inactive state.
 * When it in active state, it can process requests.
 * When in inactive - can't.
 * 
 **/
class ThreadService
{
public:

   /**
    * This exception is throwed, when we try to use
    * not-activated ThreadService
    **/
   struct NotActive {};

private:

   <implementation-depended>

public:

   ///
   ThreadService();

   ///
   virtual ~ThreadService();

   ///
   virtual void  process(Runnable* runnable)=0;

   ///
   bool is_active() const { return active_.value(); }

   ///
   virtual void  activate();

   ///
   virtual void  deactivate(bool shutdown);

protected:

   virtual void mark_deactivate();

   ..............

private:

   ThreadService(const ThreadService&);
   ThreadService& operator=(const ThreadService&);

};

Опишем методы ThreadService более подробно:

Несколько конкретных реализаций сервисов исполнения поставляются в составе GradSoft C++ ToolBox. Заметим, что для их использования вы должны включить соответствующий включаемый файл. Среди них:

SingleThreadBlocking

Это самый "тривиальный" сервис исполнения, который исполняет задания в текущем потоке (т. е. process - синхронный вызов).

SingleThreadChecking

Запросы выполняются асинхронно в выделенном потоке этого сервиса. Очередь запросов не организовывается, поэтому если сервис занят исполнением одного запроса, вызов process приведет к прерыванию TemporaryNoResources.

SingleThreadReactive

Запросы выполняются асинхронно в выделенном потоке этого сервиса. Организованна очередь запросов. Размер очереди должен быть передан в конструктор SingleThreadReactive. Поведение process зависит от флага режима, переданного в конструкторе. это может быть один из:

ThreadPerClient

Запросы выполняются асинхронно. Для каждой задачи выделяется отдельный поток исполнения, заканчивающийся вместе с задачей.

ThreadPool

Запросы выполняются асинхронно в пуле потоков этого сервиса: любой свободный поток исполняет текущий запрос; если все потоки заняты, то запрос кладется в очередь. Количество потоков в пуле и размер очереди должны быть заданны в параметрах конструктора. Поведение process при переполнении очереди зависит от значения флага режима: (ThreadPool::Blocked или ThreadPool::Checked или ThreadPool::CheckedWithTimeout ). Семантика флагов режима такая-же, как и в SingleThreadReactive.

Требования к программному окружению

Переменные макропроцессора

  1. Несколько переменных макропроцессора определены в файле ThreadingConfig.h, генерируемом при компиляции пакета. Потенциально возможен конфликт между определениями в ThreadingConfig.h и определниями из других макропакетов. Для того, что-бы этого не произошло мы рекомендуем заключать ваши макроопределения autoconf в предложения условной компиляции:
    #ifndef HAVE_Xxx
    #undef HAVE_Xxx
    #endif
    
  2. Если Вы работаете в среде LINUX, то Вы должны либо включить Threading.h, либо определить препроцессорный символ _GNU_SOURCE до любого включения системных заголовочных файлов.
  3. Если Вы работаете под управлением Windows NT, Вам необходимо:
    1. определить макрос WIN32 перед включением файла Threading.h
    2. использовать "новые" библиотеки и, соответственно, заголовочные файлы iostream, fstream и т.п. вместо аналогичных iostream.h, fstream.h и т.п.

Библиотеки

Вы должны компилировать пакеты, использующие пакет Threading с libThreading (для UNIX) или Threading для Windows NT.

Соответствующий ключ компилятору: oбычно -lThreading.

Перечень изменений

09.10.2002
- косметическая правка.
11.07.2002
- добавлено описание ThreadContext
26.03.2002
- добавлено описание counted_mt_ptr
03.01.2002
- коррекция в соответствии с новым релизом GradC++ToolBox 1.4.0.
06.07.2001
- добавлено описание SystemError.
24.05.2001
- добавлено описание yield.
18.05.2001
- описана инфраструктура вокруг ThreadService.
25.04.2001
- описан класс ThreadEvent
23.04.2001
- описана иерархия исключений v. 1.0.5, добавлены условия лицензирования.
17.02.2001
- добавлены формальные атрибуты эксплуатациооной документации.
09.09.2000
- создание.

Bibliography

1
Ukraine GradSoft, Kiev.
GradSoft C++ ToolBox: Administration Guide, 2000,2001.
GradSoft-AD-e-04.09.2000-vC.

2
Ukraine GradSoft, Kiev.
GradSoft C++ ToolBox: ptrs: Programming Guide, 2002.
GradSoft-PR-e-07.02.2002-vC.

3
Bil Lewis.
COMP.PROGRAMMING.THREADS FAQ, 2000.
http://www.lambdaCS.com/newsgroup/FAQ.html.

4
Shashi Prasad.
Mutlithreading Programming Techniques.
McGras-Hill, 1997.
ISBN 0201379279.

5
inc; Hewlett-Packard company Silicon Graphics Computer System.
Standart Template Library Programmers Guide, 1999.
http://www.sgi.com/Technology/STL/.



Footnotes

... образом1
В примерах для простоты опущена обработка ошибок
... te.wait()2
Здесь te - экземпляр класса ThreadEvent
... условий3
недаром в отдельных пакетах подобная модель называется thread condition

next_inactive up previous
GradSoft