00001 #ifndef UTIL_PCQUEUE_H
00002 #define UTIL_PCQUEUE_H
00003
00004 #include "util/exception.hh"
00005
00006 #include <boost/interprocess/sync/interprocess_semaphore.hpp>
00007 #include <boost/scoped_array.hpp>
00008 #include <boost/thread/mutex.hpp>
00009 #include <boost/utility.hpp>
00010
00011 #include <cerrno>
00012
00013 #ifdef __APPLE__
00014 #include <mach/semaphore.h>
00015 #include <mach/task.h>
00016 #include <mach/mach_traps.h>
00017 #include <mach/mach.h>
00018 #endif // __APPLE__
00019
00020 namespace util {
00021
00022
00023
00024
00025 #ifdef __APPLE__
00026
00027 #define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")
00028
00029 class Semaphore {
00030 public:
00031 explicit Semaphore(int value) : task_(mach_task_self()) {
00032 MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
00033 }
00034
00035 ~Semaphore() {
00036 MACH_CALL(semaphore_destroy(task_, back_));
00037 }
00038
00039 void wait() {
00040 MACH_CALL(semaphore_wait(back_));
00041 }
00042
00043 void post() {
00044 MACH_CALL(semaphore_signal(back_));
00045 }
00046
00047 private:
00048 semaphore_t back_;
00049 task_t task_;
00050 };
00051
00052 inline void WaitSemaphore(Semaphore &semaphore) {
00053 semaphore.wait();
00054 }
00055
00056 #else
00057 typedef boost::interprocess::interprocess_semaphore Semaphore;
00058
00059 inline void WaitSemaphore (Semaphore &on) {
00060 while (1) {
00061 try {
00062 on.wait();
00063 break;
00064 }
00065 catch (boost::interprocess::interprocess_exception &e) {
00066 if (e.get_native_error() != EINTR) {
00067 throw;
00068 }
00069 }
00070 }
00071 }
00072
00073 #endif // __APPLE__
00074
00082 template <class T> class PCQueue : boost::noncopyable {
00083 public:
00084 explicit PCQueue(size_t size)
00085 : empty_(size), used_(0),
00086 storage_(new T[size]),
00087 end_(storage_.get() + size),
00088 produce_at_(storage_.get()),
00089 consume_at_(storage_.get()) {}
00090
00091
00092 void Produce(const T &val) {
00093 WaitSemaphore(empty_);
00094 {
00095 boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
00096 try {
00097 *produce_at_ = val;
00098 }
00099 catch (...) {
00100 empty_.post();
00101 throw;
00102 }
00103 if (++produce_at_ == end_) produce_at_ = storage_.get();
00104 }
00105 used_.post();
00106 }
00107
00108
00109 T& Consume(T &out) {
00110 WaitSemaphore(used_);
00111 {
00112 boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
00113 try {
00114 out = *consume_at_;
00115 }
00116 catch (...) {
00117 used_.post();
00118 throw;
00119 }
00120 if (++consume_at_ == end_) consume_at_ = storage_.get();
00121 }
00122 empty_.post();
00123 return out;
00124 }
00125
00126
00127
00128 T Consume() {
00129 T ret;
00130 Consume(ret);
00131 return ret;
00132 }
00133
00134 private:
00135
00136 Semaphore empty_;
00137
00138 Semaphore used_;
00139
00140 boost::scoped_array<T> storage_;
00141
00142 T *const end_;
00143
00144
00145 T *produce_at_;
00146 boost::mutex produce_at_mutex_;
00147
00148
00149 T *consume_at_;
00150 boost::mutex consume_at_mutex_;
00151
00152 };
00153
00154 }
00155
00156 #endif // UTIL_PCQUEUE_H