00001 #include "util/stream/chain.hh"
00002
00003 #include "util/stream/io.hh"
00004
00005 #include "util/exception.hh"
00006 #include "util/pcqueue.hh"
00007
00008 #include <cstdlib>
00009 #include <new>
00010 #include <iostream>
00011 #include <stdint.h>
00012
00013 namespace util {
00014 namespace stream {
00015
00016 ChainConfigException::ChainConfigException() throw() { *this << "Chain configured with "; }
00017 ChainConfigException::~ChainConfigException() throw() {}
00018
00019 Thread::~Thread() {
00020 thread_.join();
00021 }
00022
00023 void Thread::UnhandledException(const std::exception &e) {
00024 std::cerr << e.what() << std::endl;
00025 abort();
00026 }
00027
00028 void Recycler::Run(const ChainPosition &position) {
00029 for (Link l(position); l; ++l) {
00030 l->SetValidSize(position.GetChain().BlockSize());
00031 }
00032 }
00033
00034 const Recycler kRecycle = Recycler();
00035
00036 Chain::Chain(const ChainConfig &config) : config_(config), complete_called_(false) {
00037 UTIL_THROW_IF(!config.entry_size, ChainConfigException, "zero-size entries.");
00038 UTIL_THROW_IF(!config.block_count, ChainConfigException, "block count zero");
00039 UTIL_THROW_IF(config.total_memory < config.entry_size * config.block_count, ChainConfigException, config.total_memory << " total memory, too small for " << config.block_count << " blocks of containing entries of size " << config.entry_size);
00040
00041 block_size_ = config.total_memory / (config.block_count * config.entry_size) * config.entry_size;
00042 }
00043
00044 Chain::~Chain() {
00045 Wait();
00046 }
00047
00048 ChainPosition Chain::Add() {
00049 if (!Running()) Start();
00050 PCQueue<Block> &in = queues_.back();
00051 queues_.push_back(new PCQueue<Block>(config_.block_count));
00052 return ChainPosition(in, queues_.back(), this, progress_);
00053 }
00054
00055 Chain &Chain::operator>>(const WriteAndRecycle &writer) {
00056 threads_.push_back(new Thread(Complete(), writer));
00057 return *this;
00058 }
00059
00060 Chain &Chain::operator>>(const PWriteAndRecycle &writer) {
00061 threads_.push_back(new Thread(Complete(), writer));
00062 return *this;
00063 }
00064
00065 void Chain::Wait(bool release_memory) {
00066 if (queues_.empty()) {
00067 assert(threads_.empty());
00068 return;
00069 }
00070 if (!complete_called_) CompleteLoop();
00071 threads_.clear();
00072 for (std::size_t i = 0; queues_.front().Consume(); ++i) {
00073 if (i == config_.block_count) {
00074 std::cerr << "Chain ending without poison." << std::endl;
00075 abort();
00076 }
00077 }
00078 queues_.clear();
00079 progress_.Finished();
00080 complete_called_ = false;
00081 if (release_memory) memory_.reset();
00082 }
00083
00084 void Chain::Start() {
00085 Wait(false);
00086 if (!memory_.get()) {
00087
00088 assert(threads_.empty());
00089 assert(queues_.empty());
00090 std::size_t malloc_size = block_size_ * config_.block_count;
00091 memory_.reset(MallocOrThrow(malloc_size));
00092 }
00093
00094 queues_.push_back(new PCQueue<Block>(config_.block_count));
00095
00096 uint8_t *base = static_cast<uint8_t*>(memory_.get());
00097 for (std::size_t i = 0; i < config_.block_count; ++i) {
00098 queues_.front().Produce(Block(base, block_size_));
00099 base += block_size_;
00100 }
00101 }
00102
00103 ChainPosition Chain::Complete() {
00104 assert(Running());
00105 UTIL_THROW_IF(complete_called_, util::Exception, "CompleteLoop() called twice");
00106 complete_called_ = true;
00107 return ChainPosition(queues_.back(), queues_.front(), this, progress_);
00108 }
00109
00110 Link::Link() : in_(NULL), out_(NULL), poisoned_(true) {}
00111
00112 void Link::Init(const ChainPosition &position) {
00113 UTIL_THROW_IF(in_, util::Exception, "Link::Init twice");
00114 in_ = position.in_;
00115 out_ = position.out_;
00116 poisoned_ = false;
00117 progress_ = position.progress_;
00118 in_->Consume(current_);
00119 }
00120
00121 Link::Link(const ChainPosition &position) : in_(NULL) {
00122 Init(position);
00123 }
00124
00125 Link::~Link() {
00126 if (current_) {
00127
00128 std::cerr << "Last input should have been poison." << std::endl;
00129 abort();
00130 } else {
00131 if (!poisoned_) {
00132
00133
00134
00135
00136
00137
00138 out_->Produce(current_);
00139 }
00140 }
00141 }
00142
00143 Link &Link::operator++() {
00144 assert(current_);
00145 progress_ += current_.ValidSize();
00146 out_->Produce(current_);
00147 in_->Consume(current_);
00148 if (!current_) {
00149 poisoned_ = true;
00150 out_->Produce(current_);
00151 }
00152 return *this;
00153 }
00154
00155 void Link::Poison() {
00156 assert(!poisoned_);
00157 current_.SetToPoison();
00158 out_->Produce(current_);
00159 poisoned_ = true;
00160 }
00161
00162 }
00163 }