Index   Main   Namespaces   Classes   Hierarchy   Annotated   Files   Compound   Global   Pages  

SgThreadedWorker.h

Go to the documentation of this file.
00001 //----------------------------------------------------------------------------
00002 /** @file SgThreadedWorker.hpp
00003  */
00004 //----------------------------------------------------------------------------
00005 
00006 #ifndef SG_THREADEDWORKER_HPP
00007 #define SG_THREADEDWORKER_HPP
00008 
00009 #include "SgDebug.h"
00010 #include <boost/thread.hpp>
00011 #include <boost/thread/barrier.hpp>
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014 
00015 //----------------------------------------------------------------------------
00016 
00017 template<typename I, typename O, typename W>
00018 class SgThreadedWorker
00019 {
00020 public:
00021     
00022     SgThreadedWorker(std::vector<W>& workers);
00023 
00024     ~SgThreadedWorker();
00025 
00026     void DoWork(const std::vector<I>& work, 
00027                 std::vector<std::pair<I,O> >& output);
00028     
00029 private:
00030 
00031     void StartDoingWork();
00032 
00033     void WaitForThreadsToFinish();
00034 
00035     void TellThreadsToQuit();
00036 
00037     friend class Thread;
00038 
00039     /** Copyable object run in a boost::thread. */
00040     class Thread
00041     {
00042     public:
00043         Thread(std::size_t threadId, W& worker, 
00044                SgThreadedWorker<I,O,W>& threadedWork);
00045 
00046         void operator()();
00047 
00048     private:
00049 
00050         std::size_t m_id;
00051 
00052         W& m_worker;
00053             
00054         SgThreadedWorker<I,O,W>& m_boss;
00055     };
00056 
00057     /** Flag telling threads to exit. */
00058     bool m_quit;
00059 
00060     /** Threads must lock this mutex before getting work from list. */
00061     boost::mutex m_workMutex;
00062 
00063     /** Threads must lock this mutex before updating output. */
00064     boost::mutex m_outputMutex;
00065 
00066     /** Threads block on this barrier until told to start. */
00067     boost::barrier m_startWork;
00068 
00069     /** Threads block on this barrier until all are finished. */
00070     boost::barrier m_workFinished;
00071 
00072     /** Index of next problem to solve. */
00073     std::size_t m_workIndex;
00074 
00075     /** Problems to solve. */
00076     const std::vector<I>* m_workToDo;
00077 
00078     /** Solved problems. */
00079     std::vector<std::pair<I,O> >* m_output;
00080 
00081     /** The threads. */
00082     std::vector<boost::shared_ptr<boost::thread> > m_threads;
00083 };
00084 
00085 //----------------------------------------------------------------------------
00086 
00087 template<typename I, typename O, typename W>
00088 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers)
00089     : m_quit(false),
00090       m_startWork(workers.size() + 1),
00091       m_workFinished(workers.size() + 1)
00092 {
00093     for (std::size_t i = 0; i < workers.size(); ++i)
00094     {
00095         Thread runnable((int)i, workers[i], *this);
00096         boost::shared_ptr<boost::thread> thread(new boost::thread(runnable));
00097         m_threads.push_back(thread);
00098     }
00099 }
00100 
00101 template<typename I, typename O, typename W>
00102 SgThreadedWorker<I,O,W>::~SgThreadedWorker()
00103 {
00104     TellThreadsToQuit();
00105     for (std::size_t i = 0; i < m_threads.size(); ++i)
00106     {
00107         m_threads[i]->join();
00108         SgDebug() << "SgThreadedWorker: joined " << i << '\n';
00109     }
00110 }
00111 
00112 template<typename I, typename O, typename W>
00113 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work,
00114                                    std::vector<std::pair<I,O> >& output)
00115 {
00116     m_workToDo = &work;
00117     m_workIndex = 0;
00118     m_output = &output;
00119     SgDebug() << "SgThreadedWorker::DoWork(): Processing " 
00120               << work.size() << " jobs." << '\n';
00121     StartDoingWork();
00122     WaitForThreadsToFinish();
00123 }
00124 
00125 template<typename I, typename O, typename W>
00126 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker, 
00127                                       SgThreadedWorker<I,O,W>& threadedWorker)
00128     : m_id(threadId),
00129       m_worker(worker),
00130       m_boss(threadedWorker)
00131 {
00132 }
00133 
00134 template<typename I, typename O, typename W>
00135 void SgThreadedWorker<I,O,W>::Thread::operator()()
00136 {
00137     while (true)
00138     {
00139         m_boss.m_startWork.wait();
00140         if (m_boss.m_quit) 
00141             break;
00142         //SgDebug() << "[" << m_id << "]: starting..."  << '\n';
00143         while (true)
00144         {
00145             bool finished = false;
00146             const I* currentWork = 0;
00147             {
00148                 boost::mutex::scoped_lock lock(m_boss.m_workMutex);
00149                 if (m_boss.m_workIndex < m_boss.m_workToDo->size())
00150                     currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++];
00151                 else
00152                     finished = true;
00153             }
00154             if (finished)
00155                 break;
00156             O answer = m_worker(*currentWork);
00157             {
00158                 boost::mutex::scoped_lock lock(m_boss.m_outputMutex);
00159                 m_boss.m_output
00160                     ->push_back(std::make_pair(*currentWork, answer));
00161             }
00162         }
00163         //SgDebug() << "[" << m_id << "]: finished." << '\n';
00164         m_boss.m_workFinished.wait();
00165     }
00166 }
00167 
00168 template<typename I, typename O, typename W>
00169 void SgThreadedWorker<I,O,W>::StartDoingWork()
00170 {
00171     m_startWork.wait();
00172 }
00173 
00174 template<typename I, typename O, typename W>
00175 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish()
00176 {
00177     m_workFinished.wait();
00178 }
00179 
00180 template<typename I, typename O, typename W>
00181 void SgThreadedWorker<I,O,W>::TellThreadsToQuit()
00182 {
00183     m_quit = true;
00184     m_startWork.wait();
00185 }
00186 
00187 //----------------------------------------------------------------------------
00188 
00189 #endif // SG_THREADEDWORKER_HPP


17 Jun 2010 Doxygen 1.4.7