Страниц: [1] 2 3
  Печать  
Автор Тема: multi producer single consumer  (Прочитано 16052 раз)
avo1981
Jr. Member
**
Offline Offline

Сообщений: 56


Просмотр профиля
« : Февраля 24, 2011, 01:35:07 pm »

Всем привет! Подскажите варианты решения задачи. Мне надо сохранять большой объем данных из разных потоков, и записывать их в файлы. Я так понимаю что это задача multi producer single consumer. Вот тут написал свой лисапед, но может кто нибудь посоветует готовое решение или укажет направление. В приведенном коде каждый поток производитель складывает данные в свою очередь (переполнения не будет, т.к. планируется ограничить длину очереди и если данные не влезают, то выбрасывать последний элемент). И есть поток потребитель, который имеет свою очередь, защищенную мьютексом. Время от времени каждый производитель пытается захватить мьютекс и скинуть свои накопленные данные в общую очередь. Заранее спасибо

#include <cstdlib>
#include <iostream>
#include <pthread.h>
#include <sys/neutrino.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/siginfo.h>
#include <iostream>
#include <queue>
#include <string>
#include <fstream>
#include <errno.h>

using namespace std;

deque<std::string> qStr1;
deque<std::string> qStr2;
deque<std::string> MutexQueConsumer;
pthread_cond_t condvar =PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

int CreateThread( pthread_t *pthread, const pthread_attr_t attrl,void* routine(void*), void *arg );
void*   Produser1( void*  arg );
void*   Produser2( void*  arg );
void*   Produser3( void*  arg );
void*   Consumer( void*  arg );

int main(int argc, char *argv[]) {
   std::cout << "Welcome to the QNX Momentics IDE" << std::endl;
   pthread_attr_t attr[16];
   int read[20];
   CreateThread( &read[0], attr[0], Consumer,     (void*)0 );
   CreateThread( &read[0], attr[0], Produser1,     (void*)0 );
   CreateThread( &read[0], attr[0], Produser2,     (void*)0 );
   while (1)
      sleep(1);
   return EXIT_SUCCESS;
}

int CreateThread( pthread_t *pthread, const pthread_attr_t attrl,
   void* routine(void*), void *arg )
{
   pthread_attr_t attr=attrl;
   pthread_attr_init( &attr );
   pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
   return pthread_create( pthread, &attr, routine, arg );
};

void*   Produser1( void*  arg )
{
   while(1)
   {
      qStr1.push_front("Produser2");
      if (qStr1.size()>1000)
      {
         if (pthread_mutex_trylock(&mutex)==EOK)
         {
            // скопировать данные
            reverse(qStr1.begin(),qStr1.end()); //переворачиваем очередь
            for(deque<string>::iterator i=qStr1.begin(); i!=qStr1.end();i++)
            {
               MutexQueConsumer.push_front(*i);
            }
            qStr1.clear();
            pthread_mutex_unlock(&mutex);
         }
      }

   }
};

void*   Produser2( void*  arg )
{
   while(1)
   {
      qStr2.push_front("Produser3");
      if (qStr2.size()>1000)
      {
         if (pthread_mutex_trylock(&mutex)==EOK)
         {
            // скопировать данные
            reverse(qStr2.begin(),qStr2.end());
            for(deque<string>::iterator i=qStr2.begin(); i!=qStr2.end();i++)
            {
               MutexQueConsumer.push_front(*i);
            }
            qStr2.clear();
            pthread_mutex_unlock(&mutex);
         }
      }
   }
};

void*   Consumer( void*  arg )
{
   std::ofstream ofs;
   ofs.open("out.txt");
   string res1="";
   string res2="";
   while (1)
   {
      if (pthread_mutex_lock(&mutex)==EOK)
      {
         while(MutexQueConsumer.size()>0)
         {

            res1=MutexQueConsumer.back();
            res2=res1;
            MutexQueConsumer.pop_back();
            //std::cout<<res1<<std::endl;
            ofs<<res2<<std::endl;
         }
         pthread_mutex_unlock(&mutex);
      }

   }
};
Записан
oder
Гость
« Ответ #1 : Февраля 24, 2011, 03:55:33 pm »

Ужасно.
Результаты вызовов не проверяются.
STL используется бездарно до безобразия.
Синхронизация используется крайне неэффективно.
Функции потоков-производителей продублированы без какой-либо надобности.

+ В слове "Producer" - грамматическая ошибка.
++ Самое страшное забыл (пришлось даже заново включать компьютер и подключать dial-up Wink): жор процессора в потоке-потребителе с потенциальной остановкой программы на однопроцессорной машине в случае постоянного/динамического повышения приоритета потока-потребителя.
« Последнее редактирование: Февраля 24, 2011, 04:06:59 pm от oder » Записан
avo1981
Jr. Member
**
Offline Offline

Сообщений: 56


Просмотр профиля
« Ответ #2 : Февраля 24, 2011, 04:09:08 pm »

Ужасно.
Результаты вызовов не проверяются.
STL используется бездарно до безобразия.
Синхронизация используется крайне неэффективно.
Функции потоков-производителей продублированы без какой-либо надобности.

+ В слове "Producer" - грамматическая ошибка.

Критика частично принимается. Но это примерный набросок. К тому же я и не претендую на то что это правильное решение. Функции потоков-производителей продублированы как раз чтобы заострить внимание что их больше одного. Вообще говоря в интернете конечно можно кое-что найти но в основном под Windows. Опять же если бы я знал как это нормально сделать, наверное бы не стал спрашивать совета.
Записан
oder
Гость
« Ответ #3 : Февраля 24, 2011, 06:31:32 pm »

Опять же если бы я знал как это нормально сделать, наверное бы не стал спрашивать совета.

Вот! А для этого надо читать и думать, читать и думать! TFM читать! И уж только потом за клавиатуру садиться.

От себя скажу: чтоб нормально сделать - это вам не пять минут и никто этим серьезно заниматься не будет. Конечно, вам сейчас помогут сделать (или наже набросают) какую-нибудь халтурку, но это может быть нормально в вашем понимании. А на профессиональном уровне к каждой задаче надо подходить серьезно и выжимать из нее все, что возможно. И просто так (и даже "не просто так") я, например, этого для вас делать не буду.
А объяснять вам придеся дольше, чем самому бы это сделать.
« Последнее редактирование: Февраля 24, 2011, 06:33:44 pm от oder » Записан
oder
Гость
« Ответ #4 : Февраля 24, 2011, 07:35:30 pm »

Ладно, пока я (типа) больной и сижу дома - шлангом прикидываюсь, - набросаю основные цели, которых надо достичь в данной задаче.

1) Минимизировать выделение/освобождение памяти для элементов списков (тоесть, элементы списков должны передаваться от производителей потребителю и потом переиспользоваться обратно). О такой дури как "reverse()" я, вообще, упоминать не хочу - есть push_back() (не только push_front()) и есть reverse_iterator.
2) Перебрасывать весь накопившийся список к потребителю за одну операцию (ну и, конечно, никак не после 1000 элементов, а пробовать непосредственно после каждого). Делать ли trylock или lock - зависит от того, как часто гарантируется поступание входных данных, а, иначе, через неудачный trylock можно с "зависшим" в очереди элементом данных уснуть на пол часа, пока что-нибудь еще на вход не придет. Исходя из того, что гарантировать что-либо у вас может только Госстрах, использовать trylock вообще бы не советовал (заодно отпадает и необходимость держать входные очереди и перебрасывать куда-либо элементы).
3) Убрать потенциально блокирующие операции, типа записи в файл, из-под мютексов. В том числе, убрать оттуда и выделение памяти элементов списков (если оно происходит).
4) Сделать нормальную синхронизацию: на одних мютексах без condvar вы обработку списка не сделаете. Делается так: производитель добавляет элемент(ы) в список потребителю и сигнализирует condvar, если список был пуст. Потребитель извлекает элементы из списка, пока они есть, а если их нет - засыпает на condvar.

Пока-что, все.
Записан
avo1981
Jr. Member
**
Offline Offline

Сообщений: 56


Просмотр профиля
« Ответ #5 : Февраля 25, 2011, 12:01:29 pm »

Oder, спасибо за ценные советы. Попробую воспользоваться.
Записан
ed1k
QOR.Moderator
*****
Offline Offline

Сообщений: 739


Просмотр профиля WWW
« Ответ #6 : Февраля 26, 2011, 06:08:03 am »

У сана была хорошая мурзилка. Сейчас вот только кусок нашел
http://www.shrubbery.net/solaris9ab/SUNWdev/MTP/p27.html
все оракл поработил...
В общем, если не мудрить, то по описанию внизу страницы все хорошо работает.
Записан
ed1k
QOR.Moderator
*****
Offline Offline

Сообщений: 739


Просмотр профиля WWW
« Ответ #7 : Февраля 26, 2011, 06:17:18 am »

Вот нашел куда оракл запрятал, внизу страницы полный пример.
http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html
Записан
oder
Гость
« Ответ #8 : Февраля 26, 2011, 03:20:54 pm »

Вот нашел куда оракл запрятал, внизу страницы полный пример.
http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html

У этого примера есть стандартная проблема всех примеров: его основное предназначение - быть простым и понятным и из-за этого он становится неэффективным.
Нет никакой необходимости сигналить кондвары в одну и в другую сторону, пока буфер не станет полным или пустым.
Записан
oder
Гость
« Ответ #9 : Февраля 26, 2011, 03:41:31 pm »

И, кроме того, в данном примере можно сигналить кондвары уже выйдя из критической секции.
Так что, как минимум, две возможности к оптимизации в простой функции.
Записан
vshemm
Sr. Member
****
Offline Offline

Сообщений: 317


Просмотр профиля
« Ответ #10 : Марта 01, 2011, 12:01:49 am »

Подобные "оптимизации" приведут к полной неработоспособности кода в случае отличном от 1-1.
Действительно, если сигналить не каждый раз в потребителе, то будет работать только один производитель
после того как очередь очередь наполнится. Аналогично, при пустой очереди будут голодать нити-потребители,
если сигналить не каждый раз в производителе. И не нужно про броадкасты говорить, внутри них точно такой же
цикл, поэтому никакого выигрыша не будет.
Если же вынести сигнал за пределы критической секции, то мы легко попадем в assert().
При обеих "оптимизациях" еще добавляется возможность дедлока.
Главная причина всего этого - отсутствие памяти у кондваров.

А вот это

У этого примера есть стандартная проблема всех примеров: его основное предназначение - быть простым и понятным и из-за этого он становится неэффективным.

вызывает, по меньшей мере, недоумение. Код простой, ясный и работает при любых условиях. Именно так и следует
писать. Единственная причина для написания другого кода - это боттлнек. Только когда данный механизм станет
узким местом, тогда и стоит его усложнять, причем оптимизации будут носить совсем другой характер, например,
избавления от примитивов синхронизации вообще.
Записан
oder
Гость
« Ответ #11 : Марта 01, 2011, 01:55:41 am »

Подобные "оптимизации" приведут к полной неработоспособности кода в случае отличном от 1-1.
Действительно, если сигналить не каждый раз в потребителе, то будет работать только один производитель
после того как очередь очередь наполнится. Аналогично, при пустой очереди будут голодать нити-потребители,
если сигналить не каждый раз в производителе. И не нужно про броадкасты говорить, внутри них точно такой же
цикл, поэтому никакого выигрыша не будет.
Вот люблю я весёлых людей! Smiley Сколько вы мне заплатите, если я вам приведу пример кода с оптимизациями, о которых я упоминал, который будет работать для случая m-n и не будет владеть никакими из тех страшных недостатков, о которых вы рассказываете (во всяком случае, не больше, чем изначальный пример)? Wink Даже pthread_cond_broadcast не будет - смысла нет! Smiley

Если же вынести сигнал за пределы критической секции, то мы легко попадем в assert().
При обеих "оптимизациях" еще добавляется возможность дедлока.
Главная причина всего этого - отсутствие памяти у кондваров.
То же самое предложение: давайте, я вам делаю здесь исходничек, вы его у себя компилируете-запускаете, он у вас дедлочится, вы снимаете с процесса дампером корку и цепляете её здесь - я вам 1024 рубля. Если за неделю не сможете задедлочить - вы мне 1024 рубля (больше мне совестно вас разорять - что ж я, изверг какой?) Smiley.
Записан
vshemm
Sr. Member
****
Offline Offline

Сообщений: 317


Просмотр профиля
« Ответ #12 : Марта 01, 2011, 02:12:11 am »

Вот люблю я весёлых людей! Smiley Сколько вы мне заплатите, если я вам приведу пример кода с оптимизациями, о которых я упоминал, который будет работать для случая m-n и не будет владеть никакими из тех страшных недостатков, о которых вы рассказываете (во всяком случае, не больше, чем изначальный пример)? Wink Даже pthread_cond_broadcast не будет - смысла нет! Smiley
Нисколько Smiley Теперь ваша очередь приводить примеры кода. Внесете вышеумомянутые оптимизации в вышеупомянутый код так, что бы он работал, и работал лучше оригинального кода, - признаю свою неправоту. Иначе это все бла-бла-бла.

Цитировать
То же самое предложение: давайте, я вам делаю здесь исходничек, вы его у себя компилируете-запускаете, он у вас дедлочится, вы снимаете с процесса дампером корку и цепляете её здесь - я вам 1024 рубля. Если за неделю не сможете задедлочить - вы мне 1024 рубля (больше мне совестно вас разорять - что ж я, изверг какой?) Smiley.
Исходник не нужен, все это проделано мозгом, сиречь - аналитически. Встречный конкретный вопрос - сколько я получу, если докажу, что вынос сигналов за критическую секцию приведет к ассертам?
Записан
oder
Гость
« Ответ #13 : Марта 01, 2011, 02:49:29 am »

Нисколько Smiley Теперь ваша очередь приводить примеры кода. Внесете вышеумомянутые оптимизации в вышеупомянутый код так, что бы он работал, и работал лучше оригинального кода, - признаю свою неправоту. Иначе это все бла-бла-бла.
Нет - за просто так не буду. Smiley Немножко соврал я, конечно: для случая больше одного потока pthread_cond_broadcast, таки, нужен (но вы же его и не запрещали!). А в остальном договор остаётся в силе. Предложите сумму - будем разговаривать. По-моему, из моих начальных постов и так очевидно, какие изменения надо сделать.

Исходник не нужен, все это проделано мозгом, сиречь - аналитически. Встречный конкретный вопрос - сколько я получу, если докажу, что вынос сигналов за критическую секцию приведет к ассертам?
Вы меня, право, ставите в неловкое положение. Даю вам ещё один день подумать о своей неправоте. Если за день на вас прозрение не снизойдёт - избираем суддейскую коллегию в числе 3 человек из числа желающих и вы ей пробуете обьяснить, почему при выносе pthread_cond_signal за пределы критической секции начнут падать ассерты (assert я вижу в каждой из функций только один), а я той же коллегии объясню, почему они падать не начнут. Как коллегия большинством голосов решит - тот, кто неправ, тому, кто прав, 1024 рублей и переводит (Privat Money, например).
« Последнее редактирование: Марта 01, 2011, 02:51:14 am от oder » Записан
ed1k
QOR.Moderator
*****
Offline Offline

Сообщений: 739


Просмотр профиля WWW
« Ответ #14 : Марта 01, 2011, 07:36:54 am »

Цирк какой-то... Человек спросил, не видел ли кто велосипед. Я сказал, что видел, и где я его видел. А вы говорите, что на одном колесе маневренность лучше, и только лохи ездят на велосипедах с равновеликими колесами... Чепуха какая-то.
Записан
Страниц: [1] 2 3
  Печать  
 
Перейти в: