00001 //---------------------------------------------------------------------------- 00002 /** @file SgThreadedWorker.hpp 00003 */ 00004 //---------------------------------------------------------------------------- 00005 00006 #ifndef SG_THREADEDWORKER_HPP 00007 #define SG_THREADEDWORKER_HPP 00008 00009 #include "SgDebug.h" 00010 #include <boost/thread.hpp> 00011 #include <boost/thread/barrier.hpp> 00012 #include <boost/thread/mutex.hpp> 00013 #include <boost/shared_ptr.hpp> 00014 00015 //---------------------------------------------------------------------------- 00016 00017 template<typename I, typename O, typename W> 00018 class SgThreadedWorker 00019 { 00020 public: 00021 00022 SgThreadedWorker(std::vector<W>& workers); 00023 00024 ~SgThreadedWorker(); 00025 00026 void DoWork(const std::vector<I>& work, 00027 std::vector<std::pair<I,O> >& output); 00028 00029 private: 00030 00031 void StartDoingWork(); 00032 00033 void WaitForThreadsToFinish(); 00034 00035 void TellThreadsToQuit(); 00036 00037 friend class Thread; 00038 00039 /** Copyable object run in a boost::thread. */ 00040 class Thread 00041 { 00042 public: 00043 Thread(std::size_t threadId, W& worker, 00044 SgThreadedWorker<I,O,W>& threadedWork); 00045 00046 void operator()(); 00047 00048 private: 00049 00050 std::size_t m_id; 00051 00052 W& m_worker; 00053 00054 SgThreadedWorker<I,O,W>& m_boss; 00055 }; 00056 00057 /** Flag telling threads to exit. */ 00058 bool m_quit; 00059 00060 /** Threads must lock this mutex before getting work from list. */ 00061 boost::mutex m_workMutex; 00062 00063 /** Threads must lock this mutex before updating output. */ 00064 boost::mutex m_outputMutex; 00065 00066 /** Threads block on this barrier until told to start. */ 00067 boost::barrier m_startWork; 00068 00069 /** Threads block on this barrier until all are finished. */ 00070 boost::barrier m_workFinished; 00071 00072 /** Index of next problem to solve. */ 00073 std::size_t m_workIndex; 00074 00075 /** Problems to solve. */ 00076 const std::vector<I>* m_workToDo; 00077 00078 /** Solved problems. */ 00079 std::vector<std::pair<I,O> >* m_output; 00080 00081 /** The threads. */ 00082 std::vector<boost::shared_ptr<boost::thread> > m_threads; 00083 }; 00084 00085 //---------------------------------------------------------------------------- 00086 00087 template<typename I, typename O, typename W> 00088 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers) 00089 : m_quit(false), 00090 m_startWork(workers.size() + 1), 00091 m_workFinished(workers.size() + 1) 00092 { 00093 for (std::size_t i = 0; i < workers.size(); ++i) 00094 { 00095 Thread runnable((int)i, workers[i], *this); 00096 boost::shared_ptr<boost::thread> thread(new boost::thread(runnable)); 00097 m_threads.push_back(thread); 00098 } 00099 } 00100 00101 template<typename I, typename O, typename W> 00102 SgThreadedWorker<I,O,W>::~SgThreadedWorker() 00103 { 00104 TellThreadsToQuit(); 00105 for (std::size_t i = 0; i < m_threads.size(); ++i) 00106 { 00107 m_threads[i]->join(); 00108 SgDebug() << "SgThreadedWorker: joined " << i << '\n'; 00109 } 00110 } 00111 00112 template<typename I, typename O, typename W> 00113 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work, 00114 std::vector<std::pair<I,O> >& output) 00115 { 00116 m_workToDo = &work; 00117 m_workIndex = 0; 00118 m_output = &output; 00119 SgDebug() << "SgThreadedWorker::DoWork(): Processing " 00120 << work.size() << " jobs." << '\n'; 00121 StartDoingWork(); 00122 WaitForThreadsToFinish(); 00123 } 00124 00125 template<typename I, typename O, typename W> 00126 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker, 00127 SgThreadedWorker<I,O,W>& threadedWorker) 00128 : m_id(threadId), 00129 m_worker(worker), 00130 m_boss(threadedWorker) 00131 { 00132 } 00133 00134 template<typename I, typename O, typename W> 00135 void SgThreadedWorker<I,O,W>::Thread::operator()() 00136 { 00137 while (true) 00138 { 00139 m_boss.m_startWork.wait(); 00140 if (m_boss.m_quit) 00141 break; 00142 //SgDebug() << "[" << m_id << "]: starting..." << '\n'; 00143 while (true) 00144 { 00145 bool finished = false; 00146 const I* currentWork = 0; 00147 { 00148 boost::mutex::scoped_lock lock(m_boss.m_workMutex); 00149 if (m_boss.m_workIndex < m_boss.m_workToDo->size()) 00150 currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++]; 00151 else 00152 finished = true; 00153 } 00154 if (finished) 00155 break; 00156 O answer = m_worker(*currentWork); 00157 { 00158 boost::mutex::scoped_lock lock(m_boss.m_outputMutex); 00159 m_boss.m_output 00160 ->push_back(std::make_pair(*currentWork, answer)); 00161 } 00162 } 00163 //SgDebug() << "[" << m_id << "]: finished." << '\n'; 00164 m_boss.m_workFinished.wait(); 00165 } 00166 } 00167 00168 template<typename I, typename O, typename W> 00169 void SgThreadedWorker<I,O,W>::StartDoingWork() 00170 { 00171 m_startWork.wait(); 00172 } 00173 00174 template<typename I, typename O, typename W> 00175 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish() 00176 { 00177 m_workFinished.wait(); 00178 } 00179 00180 template<typename I, typename O, typename W> 00181 void SgThreadedWorker<I,O,W>::TellThreadsToQuit() 00182 { 00183 m_quit = true; 00184 m_startWork.wait(); 00185 } 00186 00187 //---------------------------------------------------------------------------- 00188 00189 #endif // SG_THREADEDWORKER_HPP