00001
00002
00003
00004
00005
00006 #ifndef SG_THREADEDWORKER_HPP
00007 #define SG_THREADEDWORKER_HPP
00008
00009 #include "SgDebug.h"
00010 #include <boost/thread.hpp>
00011 #include <boost/thread/barrier.hpp>
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014
00015
00016
00017 template<typename I, typename O, typename W>
00018 class SgThreadedWorker
00019 {
00020 public:
00021
00022 SgThreadedWorker(std::vector<W>& workers);
00023
00024 ~SgThreadedWorker();
00025
00026 void DoWork(const std::vector<I>& work,
00027 std::vector<std::pair<I,O> >& output);
00028
00029 private:
00030
00031 void StartDoingWork();
00032
00033 void WaitForThreadsToFinish();
00034
00035 void TellThreadsToQuit();
00036
00037 friend class Thread;
00038
00039
00040 class Thread
00041 {
00042 public:
00043 Thread(std::size_t threadId, W& worker,
00044 SgThreadedWorker<I,O,W>& threadedWork);
00045
00046 void operator()();
00047
00048 private:
00049
00050 std::size_t m_id;
00051
00052 W& m_worker;
00053
00054 SgThreadedWorker<I,O,W>& m_boss;
00055 };
00056
00057
00058 bool m_quit;
00059
00060
00061 boost::mutex m_workMutex;
00062
00063
00064 boost::mutex m_outputMutex;
00065
00066
00067 boost::barrier m_startWork;
00068
00069
00070 boost::barrier m_workFinished;
00071
00072
00073 std::size_t m_workIndex;
00074
00075
00076 const std::vector<I>* m_workToDo;
00077
00078
00079 std::vector<std::pair<I,O> >* m_output;
00080
00081
00082 std::vector<boost::shared_ptr<boost::thread> > m_threads;
00083 };
00084
00085
00086
00087 template<typename I, typename O, typename W>
00088 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers)
00089 : m_quit(false),
00090 m_startWork(workers.size() + 1),
00091 m_workFinished(workers.size() + 1)
00092 {
00093 for (std::size_t i = 0; i < workers.size(); ++i)
00094 {
00095 Thread runnable((int)i, workers[i], *this);
00096 boost::shared_ptr<boost::thread> thread(new boost::thread(runnable));
00097 m_threads.push_back(thread);
00098 }
00099 }
00100
00101 template<typename I, typename O, typename W>
00102 SgThreadedWorker<I,O,W>::~SgThreadedWorker()
00103 {
00104 TellThreadsToQuit();
00105 for (std::size_t i = 0; i < m_threads.size(); ++i)
00106 {
00107 m_threads[i]->join();
00108 SgDebug() << "SgThreadedWorker: joined " << i << '\n';
00109 }
00110 }
00111
00112 template<typename I, typename O, typename W>
00113 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work,
00114 std::vector<std::pair<I,O> >& output)
00115 {
00116 m_workToDo = &work;
00117 m_workIndex = 0;
00118 m_output = &output;
00119 SgDebug() << "SgThreadedWorker::DoWork(): Processing "
00120 << work.size() << " jobs." << '\n';
00121 StartDoingWork();
00122 WaitForThreadsToFinish();
00123 }
00124
00125 template<typename I, typename O, typename W>
00126 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker,
00127 SgThreadedWorker<I,O,W>& threadedWorker)
00128 : m_id(threadId),
00129 m_worker(worker),
00130 m_boss(threadedWorker)
00131 {
00132 }
00133
00134 template<typename I, typename O, typename W>
00135 void SgThreadedWorker<I,O,W>::Thread::operator()()
00136 {
00137 while (true)
00138 {
00139 m_boss.m_startWork.wait();
00140 if (m_boss.m_quit)
00141 break;
00142
00143 while (true)
00144 {
00145 bool finished = false;
00146 const I* currentWork = 0;
00147 {
00148 boost::mutex::scoped_lock lock(m_boss.m_workMutex);
00149 if (m_boss.m_workIndex < m_boss.m_workToDo->size())
00150 currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++];
00151 else
00152 finished = true;
00153 }
00154 if (finished)
00155 break;
00156 O answer = m_worker(*currentWork);
00157 {
00158 boost::mutex::scoped_lock lock(m_boss.m_outputMutex);
00159 m_boss.m_output
00160 ->push_back(std::make_pair(*currentWork, answer));
00161 }
00162 }
00163
00164 m_boss.m_workFinished.wait();
00165 }
00166 }
00167
00168 template<typename I, typename O, typename W>
00169 void SgThreadedWorker<I,O,W>::StartDoingWork()
00170 {
00171 m_startWork.wait();
00172 }
00173
00174 template<typename I, typename O, typename W>
00175 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish()
00176 {
00177 m_workFinished.wait();
00178 }
00179
00180 template<typename I, typename O, typename W>
00181 void SgThreadedWorker<I,O,W>::TellThreadsToQuit()
00182 {
00183 m_quit = true;
00184 m_startWork.wait();
00185 }
00186
00187
00188
00189 #endif // SG_THREADEDWORKER_HPP