00001 #ifndef UTIL_THREAD_POOL_H
00002 #define UTIL_THREAD_POOL_H
00003
00004 #include "util/pcqueue.hh"
00005
00006 #include <boost/ptr_container/ptr_vector.hpp>
00007 #include <boost/optional.hpp>
00008 #include <boost/thread.hpp>
00009
00010 #include <iostream>
00011 #include <cstdlib>
00012
00013 namespace util {
00014
00015 template <class HandlerT> class Worker : boost::noncopyable {
00016 public:
00017 typedef HandlerT Handler;
00018 typedef typename Handler::Request Request;
00019
00020 template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison)
00021 : in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {}
00022
00023
00024 void operator()() {
00025 Request request;
00026 while (1) {
00027 in_.Consume(request);
00028 if (request == poison_) return;
00029 try {
00030 (*handler_)(request);
00031 }
00032 catch(const std::exception &e) {
00033 std::cerr << "Handler threw " << e.what() << std::endl;
00034 abort();
00035 }
00036 catch(...) {
00037 std::cerr << "Handler threw an exception, dropping request" << std::endl;
00038 abort();
00039 }
00040 }
00041 }
00042
00043 void Join() {
00044 thread_.join();
00045 }
00046
00047 private:
00048 PCQueue<Request> &in_;
00049
00050 boost::optional<Handler> handler_;
00051
00052 const Request poison_;
00053
00054 boost::thread thread_;
00055 };
00056
00057 template <class HandlerT> class ThreadPool : boost::noncopyable {
00058 public:
00059 typedef HandlerT Handler;
00060 typedef typename Handler::Request Request;
00061
00062 template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
00063 for (size_t i = 0; i < workers; ++i) {
00064 workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
00065 }
00066 }
00067
00068 ~ThreadPool() {
00069 for (size_t i = 0; i < workers_.size(); ++i) {
00070 Produce(poison_);
00071 }
00072 for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
00073 i->Join();
00074 }
00075 }
00076
00077 void Produce(const Request &request) {
00078 in_.Produce(request);
00079 }
00080
00081
00082 PCQueue<Request> &In() { return in_; }
00083
00084 private:
00085 PCQueue<Request> in_;
00086
00087 boost::ptr_vector<Worker<Handler> > workers_;
00088
00089 Request poison_;
00090 };
00091
00092 }
00093
00094 #endif // UTIL_THREAD_POOL_H