Страниц: 1 2 [3]
  Печать  
Автор Тема: multi producer single consumer  (Прочитано 16082 раз)
vshemm
Sr. Member
****
Offline Offline

Сообщений: 317


Просмотр профиля
« Ответ #30 : Марта 03, 2011, 10:52:10 pm »

Привожу модифицированные исходники с номерами строк, чтобы проще было следить. Из терминологии P1, P2,.. - нити производители, C1, C2,... - нити потребители. Добавлю про терминологию: "P1-3 захватывает мьютекс" означает, что нить производителя P1 захватывает мьютекс в строке 3. "С1-20" - мы в функции consumer в 20-й строчке, нить С1.

Код:
1  void producer(buffer_t *b, char item)
2  {
3     pthread_mutex_lock(&b->mutex);
4     while (b->occupied >= BSIZE)
5         pthread_cond_wait(&b->less, &b->mutex);
6
7     assert(b->occupied < BSIZE);
8
9     b->buf[b->nextin++] = item;
10    b->nextin %= BSIZE;
11    b->occupied++;
12
13    /* now: either b->occupied < BSIZE and b->nextin is the index
14       of the next empty slot in the buffer, or
15       b->occupied == BSIZE and b->nextin is the index of the
16       next (occupied) slot that will be emptied by a consumer
17       (such as b->nextin == b->nextout) */
18
19    pthread_mutex_unlock(&b->mutex);
20    pthread_cond_signal(&b->more);
21 }

и

Код:
1 char consumer(buffer_t *b)
2 {
3     char item;
4     pthread_mutex_lock(&b->mutex);
5     while(b->occupied <= 0)
6         pthread_cond_wait(&b->more, &b->mutex);
7
8     assert(b->occupied > 0);
9
10    item = b->buf[b->nextout++];
11    b->nextout %= BSIZE;
12    b->occupied--;
13
14    /* now: either b->occupied > 0 and b->nextout is the index
15       of the next occupied slot in the buffer, or
16       b->occupied == 0 and b->nextout is the index of the next
17       (empty) slot that will be filled by a producer (such as
18       b->nextout == b->nextin) */
19
20    pthread_mutex_unlock(&b->mutex);
21    pthread_cond_signal(&b->less);
22
23    return(item);
24 }

Edit: поправил в коде порядок отпускания мьютекса и сигналов. Добавил в терминологию.
« Последнее редактирование: Марта 03, 2011, 11:39:59 pm от vshemm » Записан
vshemm
Sr. Member
****
Offline Offline

Сообщений: 317


Просмотр профиля
« Ответ #31 : Марта 04, 2011, 10:22:43 pm »

Ну, что же, execution path действительно расставил все на свои места...
В данном случае потеря сигнала несущественна.
Формально я спор проиграл, шлите реквизиты/способы оплаты.

Но по сути все равно остаются вопросы о целесообразности выноса сигнала за критическую секцию, плюс вторая оптимизация - о ней ничего не известно.
Записан
oder
Гость
« Ответ #32 : Марта 04, 2011, 11:37:39 pm »

Ну, что же, execution path действительно расставил все на свои места...
В данном случае потеря сигнала несущественна.
Формально я спор проиграл, шлите реквизиты/способы оплаты.
Отправил личным сообщением.

Но по сути все равно остаются вопросы о целесообразности выноса сигнала за критическую секцию, ...
Ну как же! Известно, что чем меньше времени занята критическая секция, тем больше свободы для потоков и тем больше потенциальный параллелизм их исполнения. Сигнализирование кондвара - не элементарная операция. Это вход в ядро с неизвестно какой кучей кода внутри и с потенциальной блокировкой, если ядро занято другим потоком (для SMP). Во всяком случае кода там внутри ядра исполнится явно больше, чем в самих функциях производителя и потребителя целиком.

...плюс вторая оптимизация - о ней ничего не известно.
Вторая оптимизация состоит в том, что нет надобности сигналить кондвар каждый раз, ибо если список не был полон, то производители не могли быть заблокированными, а если список не был пуст, то не могли быть заблокированы потребители (во всяком случае, не больше их, чем сейчас есть свободных ячеек для первых или зянятых - для вторых). Таким образом, сигнализоворать кондвар можно лишь в граничных случаях. Но тут есть одна тонкость! Если не вынносить pthread_cond_signal за критическую секцию, то его можно так и оставить, а если выносить - его уже надо менять на pthread_cond_broadcast. Тоесть, с обеими оптимизациями функции приобретают следующий вид.
Код:
void producer(buffer_t *b, char item)
{
   int was_empty;
   
   pthread_mutex_lock(&b->mutex);
   while (b->occupied >= BSIZE)
       pthread_cond_wait(&b->less, &b->mutex);

   assert(b->occupied < BSIZE);

   b->buf[b->nextin++] = item;
   b->nextin %= BSIZE;
   was_empty = b->occupied++ == 0;

   pthread_mutex_unlock(&b->mutex);

   if (was_empty)
      pthread_cond_broadcast(&b->more);
}

и

Код:
char consumer(buffer_t *b)
{
   char item;
   int was_full;
   
   pthread_mutex_lock(&b->mutex);
   while(b->occupied <= 0)
       pthread_cond_wait(&b->more, &b->mutex);

   assert(b->occupied > 0);

   item = b->buf[b->nextout++];
   b->nextout %= BSIZE;
   was_full = b->occupied-- == BSIZE;

   pthread_mutex_unlock(&b->mutex);

   if (was_full)
      pthread_cond_broadcast(&b->less);

   return item;
}

P.S. Оптимизация выглядит достаточно подозрительно опасно - я даже сам-вот сейчас засомневался и долго проверял в уме - но должно быть всё нормально.
Записан
oder
Гость
« Ответ #33 : Марта 05, 2011, 12:41:43 am »

Всё-таки, наврал немножко: менять на pthread_cond_broadcast надо, даже если не выносить за пределы критической секции (в смысле, если сигналить только в граничных случаях).
Записан
avo1981
Jr. Member
**
Offline Offline

Сообщений: 56


Просмотр профиля
« Ответ #34 : Марта 14, 2011, 06:59:45 pm »

Вот нашел куда оракл запрятал, внизу страницы полный пример.
http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html

Спасибо ed1k за хороший пример, и всем остальным за ценные замечания. Но у меня еще вопрос возник. Получается, что если делать на условных переменных, то потоки будут часто блокироваться на
pthread_cond_wait. Но мне очень важно, чтобы потоки производители как можно меньше висели и желательно не блокировались, а продолжали работать дальше, по возможности не теряя данные. Может быть данную задачу надо решать опираясь на сообщения (асинхронные) между производителями и потребителем, или вынести сохранение данных в отдельный процесс и передавать туда данные через очереди?
Записан
oder
Гость
« Ответ #35 : Марта 14, 2011, 07:16:24 pm »

Спасибо ed1k за хороший пример, и всем остальным за ценные замечания. Но у меня еще вопрос возник. Получается, что если делать на условных переменных, то потоки будут часто блокироваться на pthread_cond_wait.
Только если получатели не будут успевать разгребать поток данных, который генерируют производители. А это значит, что у вас система несбалансирована и нужно либо искусственно ограничивать производителей либо распараллеливать получателей на несколько машин (если сделать их работу быстрее в пределах одной машины не представляется возможным).
Если система сбалансирована, очередь большинство времени должна быть, в среднем, наполовину заполнена и никаких блокировок не будет происходить вообще (ну, блокировки на мютексе - не в счёт Wink).

Но мне очень важно, чтобы потоки производители как можно меньше висели и желательно не блокировались, а продолжали работать дальше, по возможности не теряя данные. Может быть данную задачу надо решать опираясь на сообщения (асинхронные) между производителями и потребителем, или вынести сохранение данных в отдельный процесс и передавать туда данные через очереди?
А отдельный процесс что - резиновый? Вы точно так же можете организовать динамическое увеличение длины очереди и в собственном процессе.
Записан
Страниц: 1 2 [3]
  Печать  
 
Перейти в: