00001 #include "util/stream/rewindable_stream.hh"
00002 #include "util/pcqueue.hh"
00003
00004 #include <iostream>
00005
00006 namespace util {
00007 namespace stream {
00008
00009 RewindableStream::RewindableStream()
00010 : current_(NULL), in_(NULL), out_(NULL), poisoned_(true) {
00011
00012 }
00013
00014 void RewindableStream::Init(const ChainPosition &position) {
00015 UTIL_THROW_IF2(in_, "RewindableStream::Init twice");
00016 in_ = position.in_;
00017 out_ = position.out_;
00018 hit_poison_ = false;
00019 poisoned_ = false;
00020 progress_ = position.progress_;
00021 entry_size_ = position.GetChain().EntrySize();
00022 block_size_ = position.GetChain().BlockSize();
00023 block_count_ = position.GetChain().BlockCount();
00024 blocks_it_ = 0;
00025 marked_ = NULL;
00026 UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two");
00027 AppendBlock();
00028 }
00029
00030 RewindableStream &RewindableStream::operator++() {
00031 assert(*this);
00032 assert(current_ < block_end_);
00033 assert(current_);
00034 assert(blocks_it_ < blocks_.size());
00035 current_ += entry_size_;
00036 if (UTIL_UNLIKELY(current_ == block_end_)) {
00037
00038 if (++blocks_it_ == blocks_.size()) {
00039 if (!marked_) {
00040 Flush(blocks_.begin() + blocks_it_);
00041 blocks_it_ = 0;
00042 }
00043 AppendBlock();
00044 assert(poisoned_ || (blocks_it_ == blocks_.size() - 1));
00045 if (poisoned_) return *this;
00046 }
00047 Block &cur_block = blocks_[blocks_it_];
00048 current_ = static_cast<uint8_t*>(cur_block.Get());
00049 block_end_ = current_ + cur_block.ValidSize();
00050 }
00051 assert(current_);
00052 assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
00053 assert(current_ < block_end_);
00054 assert(block_end_ == blocks_[blocks_it_].ValidEnd());
00055 return *this;
00056 }
00057
00058 void RewindableStream::Mark() {
00059 marked_ = current_;
00060 Flush(blocks_.begin() + blocks_it_);
00061 blocks_it_ = 0;
00062 }
00063
00064 void RewindableStream::Rewind() {
00065 if (current_ != marked_) {
00066 poisoned_ = false;
00067 }
00068 blocks_it_ = 0;
00069 current_ = marked_;
00070 block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd());
00071
00072 assert(current_);
00073 assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
00074 assert(current_ < block_end_);
00075 assert(block_end_ == blocks_[blocks_it_].ValidEnd());
00076 }
00077
00078 void RewindableStream::Poison() {
00079 if (blocks_.empty()) return;
00080 assert(*this);
00081 assert(blocks_it_ == blocks_.size() - 1);
00082
00083
00084 blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get()));
00085 Flush(blocks_.end());
00086 blocks_it_ = 0;
00087
00088 Block poison;
00089 if (!hit_poison_) {
00090 in_->Consume(poison);
00091 }
00092 poison.SetToPoison();
00093 out_->Produce(poison);
00094 hit_poison_ = true;
00095 poisoned_ = true;
00096 }
00097
00098 void RewindableStream::AppendBlock() {
00099 if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) {
00100 std::cerr << "RewindableStream trying to use more blocks than available" << std::endl;
00101 abort();
00102 }
00103 if (UTIL_UNLIKELY(hit_poison_)) {
00104 poisoned_ = true;
00105 return;
00106 }
00107 Block get;
00108
00109
00110 do {
00111 in_->Consume(get);
00112 if (UTIL_LIKELY(get)) {
00113 blocks_.push_back(get);
00114 } else {
00115 hit_poison_ = true;
00116 poisoned_ = true;
00117 return;
00118 }
00119 } while (UTIL_UNLIKELY(get.ValidSize() == 0));
00120 current_ = static_cast<uint8_t*>(blocks_.back().Get());
00121 block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd());
00122 blocks_it_ = blocks_.size() - 1;
00123 }
00124
00125 void RewindableStream::Flush(std::deque<Block>::iterator to) {
00126 for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) {
00127 out_->Produce(*i);
00128 progress_ += i->ValidSize();
00129 }
00130 blocks_.erase(blocks_.begin(), to);
00131 }
00132
00133 }
00134 }