00001
00002
00003
00004
00005
00006
00007
00008 template<typename Token>
00009 class Bitext<Token>
00010 ::agenda
00011 {
00012 public:
00013 class job;
00014 class worker;
00015 private:
00016 boost::mutex lock;
00017 std::list<SPTR<job> > joblist;
00018 std::vector<SPTR<boost::thread> > workers;
00019 bool shutdown;
00020 size_t doomed;
00021
00022 public:
00023
00024
00025 Bitext<Token> const& bt;
00026
00027 agenda(Bitext<Token> const& bitext);
00028 ~agenda();
00029
00030 void
00031 add_workers(int n);
00032
00033 SPTR<pstats>
00034 add_job(Bitext<Token> const* const theBitext,
00035 typename TSA<Token>::tree_iterator const& phrase,
00036 size_t const max_samples, SPTR<SamplingBias const> const& bias,
00037 bool const track_sids);
00038
00039
00040
00041
00042 SPTR<job>
00043 get_job();
00044 };
00045
00046 template<typename Token>
00047 class
00048 Bitext<Token>::agenda::
00049 worker
00050 {
00051 agenda& ag;
00052 public:
00053 worker(agenda& a) : ag(a) {}
00054 void operator()();
00055 };
00056
00057 #include "ug_bitext_agenda_worker.h"
00058 #include "ug_bitext_agenda_job.h"
00059
00060 template<typename Token>
00061 void Bitext<Token>
00062 ::agenda
00063 ::add_workers(int n)
00064 {
00065 static boost::posix_time::time_duration nodelay(0,0,0,0);
00066 boost::lock_guard<boost::mutex> guard(this->lock);
00067
00068 int target = std::max(1, int(n + workers.size() - this->doomed));
00069
00070 for (size_t i = 0; i < workers.size(); )
00071 {
00072 if (workers[i]->timed_join(nodelay))
00073 {
00074 if (i + 1 < workers.size())
00075 workers[i].swap(workers.back());
00076 workers.pop_back();
00077 }
00078 else ++i;
00079 }
00080
00081 if (int(workers.size()) > target)
00082 this->doomed = workers.size() - target;
00083 else
00084 while (int(workers.size()) < target)
00085 {
00086 SPTR<boost::thread> w(new boost::thread(worker(*this)));
00087 workers.push_back(w);
00088 }
00089 }
00090
00091
00092 template<typename Token>
00093 SPTR<pstats> Bitext<Token>
00094 ::agenda
00095 ::add_job(Bitext<Token> const* const theBitext,
00096 typename TSA<Token>::tree_iterator const& phrase,
00097 size_t const max_samples, SPTR<SamplingBias const> const& bias,
00098 bool const track_sids)
00099 {
00100 boost::unique_lock<boost::mutex> lk(this->lock);
00101 static boost::posix_time::time_duration nodelay(0,0,0,0);
00102 bool fwd = phrase.root == bt.I1.get();
00103 SPTR<job> j(new job(theBitext, phrase, fwd ? bt.I1 : bt.I2,
00104 max_samples, fwd, bias, track_sids));
00105 j->stats->register_worker();
00106
00107 joblist.push_back(j);
00108 if (joblist.size() == 1)
00109 {
00110 size_t i = 0;
00111 while (i < workers.size())
00112 {
00113 if (workers[i]->timed_join(nodelay))
00114 {
00115 if (doomed)
00116 {
00117 if (i+1 < workers.size())
00118 workers[i].swap(workers.back());
00119 workers.pop_back();
00120 --doomed;
00121 }
00122 else
00123 workers[i++] = SPTR<boost::thread>(new boost::thread(worker(*this)));
00124 }
00125 else ++i;
00126 }
00127 }
00128 return j->stats;
00129 }
00130
00131 template<typename Token>
00132 SPTR<typename Bitext<Token>::agenda::job>
00133 Bitext<Token>
00134 ::agenda
00135 ::get_job()
00136 {
00137
00138 SPTR<job> ret;
00139 if (this->shutdown) return ret;
00140 boost::unique_lock<boost::mutex> lock(this->lock);
00141 if (this->doomed)
00142 {
00143 --this->doomed;
00144 return ret;
00145 }
00146
00147 typename std::list<SPTR<job> >::iterator j = joblist.begin();
00148 while (j != joblist.end())
00149 {
00150 if ((*j)->done())
00151 {
00152 (*j)->stats->release();
00153 joblist.erase(j++);
00154 }
00155 else if ((*j)->workers >= 4) ++j;
00156 else break;
00157 }
00158 if (joblist.size())
00159 {
00160 ret = j == joblist.end() ? joblist.front() : *j;
00161
00162
00163 boost::lock_guard<boost::mutex> jguard(ret->lock);
00164 ++ret->workers;
00165 }
00166 return ret;
00167 }
00168
00169 template<typename Token>
00170 Bitext<Token>::
00171 agenda::
00172 ~agenda()
00173 {
00174 this->lock.lock();
00175 this->shutdown = true;
00176 this->lock.unlock();
00177 for (size_t i = 0; i < workers.size(); ++i)
00178 workers[i]->join();
00179 }
00180
00181 template<typename Token>
00182 Bitext<Token>::
00183 agenda::
00184 agenda(Bitext<Token> const& thebitext)
00185 : shutdown(false), doomed(0), bt(thebitext)
00186 { }
00187
00188