00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "ThreadPool.h"
00024
00025 #ifdef WITH_THREADS
00026
00027 using namespace std;
00028 using namespace Moses;
00029
00030 namespace Moses
00031 {
00032
00033 ThreadPool::ThreadPool( size_t numThreads )
00034 : m_stopped(false), m_stopping(false), m_queueLimit(0)
00035 {
00036 for (size_t i = 0; i < numThreads; ++i) {
00037 m_threads.create_thread(boost::bind(&ThreadPool::Execute,this));
00038 }
00039 }
00040
00041 void ThreadPool::Execute()
00042 {
00043 do {
00044 boost::shared_ptr<Task> task;
00045 {
00046
00047 boost::mutex::scoped_lock lock(m_mutex);
00048 if (m_tasks.empty() && !m_stopped) {
00049 m_threadNeeded.wait(lock);
00050 }
00051 if (!m_stopped && !m_tasks.empty()) {
00052 task = m_tasks.front();
00053 m_tasks.pop();
00054 }
00055 }
00056
00057 if (task) {
00058 task->Run();
00059 }
00060 m_threadAvailable.notify_all();
00061 } while (!m_stopped);
00062 }
00063
00064 void ThreadPool::Submit(boost::shared_ptr<Task> task)
00065 {
00066 boost::mutex::scoped_lock lock(m_mutex);
00067 if (m_stopping) {
00068 throw runtime_error("ThreadPool stopping - unable to accept new jobs");
00069 }
00070 while (m_queueLimit > 0 && m_tasks.size() >= m_queueLimit) {
00071 m_threadAvailable.wait(lock);
00072 }
00073 m_tasks.push(task);
00074 m_threadNeeded.notify_all();
00075 }
00076
00077 void ThreadPool::Stop(bool processRemainingJobs)
00078 {
00079 {
00080
00081 boost::mutex::scoped_lock lock(m_mutex);
00082 if (m_stopped) return;
00083 m_stopping = true;
00084 }
00085 if (processRemainingJobs) {
00086 boost::mutex::scoped_lock lock(m_mutex);
00087
00088 while (!m_tasks.empty() && !m_stopped) {
00089 m_threadAvailable.wait(lock);
00090 }
00091 }
00092
00093 {
00094 boost::mutex::scoped_lock lock(m_mutex);
00095 m_stopped = true;
00096 }
00097 m_threadNeeded.notify_all();
00098
00099 m_threads.join_all();
00100 }
00101
00102 }
00103 #endif //WITH_THREADS
00104