c++ - How to apply a concurrent solution to a Producer-Consumer like situation -


i have xml file sequence of nodes. each node represents element need parse , add in sorted list (the order must same of nodes found in file).

at moment using sequential solution:

struct graphic {     bool parse()     {         // parsing...         return parse_outcome;     } };  vector<unique_ptr<graphic>> graphics;  void producer() {     (size_t = 0; < n_graphics; i++)     {         auto g = new graphic();          if (g->parse())             graphics.emplace_back(g);         else             delete g;     } } 

so, if graphic (that instance of class derived graphic, line, rectangle , on, why new) can parse, added data structure.

since care order in thes graphics added list, though call parse method asynchronously, such producer has task of read each node file , add graphic data structure, while consumer has task of parse each graphic whenever new graphic ready parsed.

now have several consumer threads (created in main) , code looks following:

queue<pair<graphic*, size_t>> q; mutex m; atomic<size_t> n_elements;  void producer() {     (size_t = 0; < n_graphics; i++)     {         auto g = new graphic();         graphics.emplace_back(g);         q.emplace(make_pair(g, i));     }      n_elements = graphics.size(); }  void consumer() {     pair<graphic*, size_t> item;      while (true)     {         {             std::unique_lock<std::mutex> lk(m);              if (n_elements == 0)                 return;              n_elements--;             item = q.front();             q.pop();         }          if (!item.first->parse())         {             // here should remove item vector             assert(graphics[item.second].get() == item.first);             delete item.first;             graphics[item.second] = nullptr;         }     } } 

i run producer first of in main, when first consumer starts queue full.

int main() {     producer();      vector<thread> threads;      (auto = 0; < n_threads; i++)         threads.emplace_back(consumer);      (auto& t : threads)         t.join();      return 0; } 

the concurrent version seems @ least twice faster original one. full code has been uploaded here.

now wondering:

  • are there (synchronization) errors in code?
  • is there way achieve same result faster (or better)?

also, noticed on computer best result (in terms of elapsed time) if set number of thread equals 8. more (or less) threads give me worst results. why?

blockquote there isn't synchronization errors, think memory managing better, since code leaked if parse() throws exception.

there isn't synchronization errors, think memory managing better, since have leaks if parse() throw exception.

blockquote there way achieve same result faster (or better)?

probably. use simple implementation of thread pool , lambda parse() you.

the code below illustrate approach. use threadpool implementation here

#include <iostream> #include <stdexcept> #include <vector> #include <memory> #include <chrono> #include <utility> #include <cassert> #include <threadpool.h>  using namespace std; using namespace std::chrono;    #define n_graphics        (1000*1000*1) #define n_threads       8   struct graphic; using gptr = std::unique_ptr<graphic>;  static vector<gptr> graphics;  struct graphic {     graphic()         : status(false)     {     }       bool parse()     {         // waste time         try         {             throw runtime_error("");         }          catch (runtime_error)         {         }          status = true;         //return false;         return true;     }       bool status; };   int main() {     auto start = system_clock::now();      auto producer_unit = []()-> gptr {         std::unique_ptr<graphic> g(new graphic);         if(!g->parse()){             g.reset(); // if g don't parse, return nullptr         }         return g;             };      using resultpool = std::vector<std::future<gptr>>;     resultpool results;     // threadpool pool(thread::hardware_concurrency());     threadpool pool(n_threads);     for(int = 0; <n_graphics; ++i){      // running async task      results.emplace_back(pool.enqueue(producer_unit));     }     for(auto &t : results){         auto value = t.get();         if(value){           graphics.emplace_back(std::move(value));         }     }      auto duration = duration_cast<milliseconds>(system_clock::now() - start);     cout << "elapsed: " << duration.count() << endl;      (size_t = 0; < graphics.size(); i++)     {         if (!graphics[i]->status)         {             cerr << "assertion failed! (" << << ")" << endl;             break;         }     }      cin.get();     return 0; } 

it bit faster (1s) on machine, more readable, , removes necessity of shared datas (synchronization evil, avoid or hide in reliable , efficient way).


Comments