root/trunk/whisperlib/common/io/buffer/memory_stream.cc

Revision 7, 15.5 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "common/io/buffer/memory_stream.h"
33 #include "common/base/scoped_ptr.h"
34
35 namespace io {
36
37 MemoryStream::MemoryStream(common::ByteOrder byte_order,
38                            BlockSize block_size)
39   : block_size_(block_size),
40     byte_order_(byte_order),
41     read_pointer_(&blocks_, -1, 0),
42     write_pointer_(&blocks_, -1, 0),
43     scratch_pointer_(&blocks_, -1, 0),
44     invalid_markers_size_(false),
45     size_(0) {
46 }
47
48 MemoryStream::~MemoryStream() {
49   Clear();
50 }
51
52 void MemoryStream::Clear() {
53   while ( MarkerIsSet() )
54     MarkerClear();
55   read_pointer_ = DataBlockPointer(&blocks_, -1, 0);
56   write_pointer_ = DataBlockPointer(&blocks_, -1, 0);
57   for ( BlockDqueue::const_iterator it = blocks_.begin();
58         it != blocks_.end(); ++it ) {
59     (*it)->DecRef();
60   }
61   blocks_.clear();
62   size_ = 0;
63 }
64
65 void MemoryStream::AppendRaw(const char* data, int size,
66                              util::FreeArrayList<char>* disposer) {
67   AppendBlock(new DataBlock(data, size, disposer, NULL));
68 }
69
70 void MemoryStream::AppendBlock(DataBlock* block) {
71   block->IncRef();
72   blocks_.push_back(block);
73   invalid_markers_size_ = true;
74   write_pointer_ = DataBlockPointer(&blocks_, blocks_.end_id() - 1,
75                                     block->size());
76   size_ += block->size();
77 }
78
79 //////////////////////////////////////////////////////////////////////
80
81 int32 MemoryStream::ReadForWritev(struct ::iovec** iov,
82                                   int* iovcnt,
83                                   int32 max_size) {
84   CHECK(scratch_pointer_.IsNull())
85     << "Unconfirmed scratch before Read Next..";
86   if ( !MaybeInitReadPointer() ) {
87     return 0;
88   }
89   MaybeDisposeBlocks();
90   const char* buffer = NULL;
91   int32 size = 0;
92   vector <struct ::iovec> v;
93   // int initial_size = Size();
94   DCHECK_EQ(read_pointer_.Distance(write_pointer_), size_);
95   while ( max_size > 0 && read_pointer_.ReadBlock(&buffer, &size) ) {
96     if ( size > 0 ) {
97       size_ -= size;
98       max_size -= size;
99       v.push_back(::iovec());
100       v.back().iov_base = const_cast<void*>(
101         reinterpret_cast<const void*>(buffer));
102       v.back().iov_len = size;
103     }
104     size = 0;
105   }
106   if ( v.empty() ) return 0;
107   *iov = new struct ::iovec[v.size()];
108   int32 sz = 0;
109   for ( int32 i = 0; i < v.size(); ++i ) {
110     (*iov)[i].iov_base = v[i].iov_base;
111     (*iov)[i].iov_len = v[i].iov_len;
112     sz += v[i].iov_len;
113   }
114   *iovcnt = v.size();
115   CHECK_EQ(read_pointer_.Distance(write_pointer_), size_);
116   return sz;
117 }
118
119
120
121 bool MemoryStream::ReadNext(const char** buffer, int32* size) {
122   CHECK(scratch_pointer_.IsNull())
123     << "Unconfirmed scratch before Read Next..";
124   if ( !MaybeInitReadPointer() ) {
125     return false;
126   }
127   MaybeDisposeBlocks();
128   if ( read_pointer_.ReadBlock(buffer, size) ) {
129     size_ -= *size;
130     return true;
131   }
132   return false;
133 }
134
135 void MemoryStream::GetScratchSpace(char** buffer, int32* size) {
136   invalid_markers_size_ = true;
137   CHECK(scratch_pointer_.IsNull())
138     << "Unconfirmed scratch before new scratch required..";
139   if ( write_pointer_.IsNull() || write_pointer_.AvailableForWrite() == 0 ) {
140     DataBlock* const new_block = new DataBlock(block_size_);
141     new_block->IncRef();
142     blocks_.push_back(new_block);
143     write_pointer_ = DataBlockPointer(&blocks_, blocks_.end_id() - 1, 0);
144   }
145   scratch_pointer_ = write_pointer_;
146   *buffer = ((*write_pointer_.block_it())->mutable_buffer() +
147              write_pointer_.pos());
148   *size = write_pointer_.AdvanceToCurrentBlockEnd();
149 }
150
151 void MemoryStream::ConfirmScratch(int32 size) {
152   invalid_markers_size_ = true;
153   CHECK(!scratch_pointer_.IsNull())
154     << "Must request scratch space before confirming it !";
155   write_pointer_ = scratch_pointer_;
156   write_pointer_.Advance(size);
157   write_pointer_.MarkCurrentBlockEndAtPointer();
158   scratch_pointer_.Clear();
159   size_ += size;
160 }
161
162 int32 MemoryStream::ReadInternal(void* buffer, int32 len, bool dispose) {
163   if ( len == 0 ) {
164     return 0;
165   }
166   if ( !MaybeInitReadPointer() ) {
167     return 0;
168   }
169   const int32 cb =  static_cast<int32>(
170     read_pointer_.ReadData(reinterpret_cast<char*>(buffer), len));
171   DCHECK(read_pointer_ <= write_pointer_);
172   DCHECK_LT(read_pointer_.block_id(), blocks_.end_id());
173   size_ -= cb;
174   if ( dispose ) MaybeDisposeBlocks();
175   return cb;
176 }
177
178 int32 MemoryStream::Skip(int32 len) {
179   if ( !MaybeInitReadPointer() ) {
180     return 0;
181   }
182   const int32 cb = read_pointer_.Advance(len);
183   DCHECK(read_pointer_ <= write_pointer_);
184   size_ -= cb;
185   MaybeDisposeBlocks();
186   return cb;
187 }
188
189 int32 MemoryStream::ReadString(string* s, int32 len) {
190   if ( len == -1 )
191     len = size_;
192   string tmp;
193   tmp.reserve(len);
194   const int32 cb = Read(&tmp[0], len);
195   // this tends to be cheap but we need it in order to set the correct size
196   s->assign(tmp.c_str(), cb);
197   return cb;
198 }
199
200 void MemoryStream::MaybeDisposeBlocks() {
201   while ( blocks_.begin_id() < read_pointer_.block_id() &&
202           blocks_.begin_id() < blocks_.end_id() &&
203           (markers_.empty() ||
204            blocks_.begin_id() < markers_.front().second->block_id()) ) {
205     blocks_.front()->DecRef();
206     blocks_.pop_front();
207   }
208   blocks_.correct_buffer();
209 }
210
211 int32 MemoryStream::Write(const void* buffer, int32 len) {
212   invalid_markers_size_ = true;
213   if ( write_pointer_.IsNull() ) {
214     DataBlock* const pblock = new DataBlock(max(len, block_size_));
215     pblock->IncRef();
216     blocks_.push_back(pblock);
217     write_pointer_ = DataBlockPointer(&blocks_, blocks_.begin_id(), 0);
218   }
219
220   const char* p = reinterpret_cast<const char*>(buffer);
221   int32 crt_len = len;
222   while ( crt_len > 0 ) {
223     const int32 delta = write_pointer_.WriteData(p, crt_len);
224     crt_len -= delta;
225     p += delta;
226     size_ += delta;
227     if ( crt_len > 0 ) {
228       DataBlock* const pblock = new DataBlock(max(crt_len, block_size_));
229       pblock->IncRef();
230       blocks_.push_back(pblock);
231     }
232   }
233   return len - crt_len;
234 }
235
236 string MemoryStream::DumpContent(int32 max_size) const {
237   int32 size = Size();
238   if ( max_size > 0 )
239     size = min(max_size, size);
240   uint8* buffer = new uint8[size];
241   scoped_ptr<uint8> auto_del_buffer(buffer);
242   Peek(buffer, size);
243   return strutil::StringPrintf("Stream size: %d bytes, dump: ", Size()) +
244          strutil::PrintableDataBuffer(buffer, size);
245 }
246
247 string MemoryStream::DumpContentHex(int32 max_size) const {
248   int32 size = Size();
249   if ( max_size > 0 )
250     size = min(max_size, size);
251   uint8* buffer = new uint8[size];
252   scoped_ptr<uint8> auto_del_buffer(buffer);
253   Peek(buffer, size);
254   return strutil::StringPrintf("Stream size: %d bytes, dump: ", Size()) +
255          strutil::PrintableDataBufferHexa(buffer, size);
256 }
257
258 string MemoryStream::DetailedContent() const {
259   string s = "";
260   s += strutil::StringPrintf("  Size: %8d\n", static_cast<int32>(Size()));
261   s += strutil::StringPrintf("  Read Pointer: @ %8d - %8d\n",
262                              static_cast<int32>(read_pointer_.block_id()),
263                              static_cast<int32>(read_pointer_.pos()));
264   s += strutil::StringPrintf(" Write Pointer: @ %8d - %8d\n",
265                              static_cast<int32>(write_pointer_.block_id()),
266                              static_cast<int32>(write_pointer_.pos()));
267   if ( blocks_.empty() ) {
268     s += "[ EMPTY ]\n";
269   } else {
270     int32 size = 0;
271     BlockDqueue::const_iterator it = blocks_.buffer_it(blocks_.begin_id());
272     for ( BlockId i = blocks_.begin_id(); i < blocks_.end_id(); ++i ) {
273       s += strutil::StringPrintf("%4d @%4d [%8d] %c [REF: %2d] %8d\n",
274                                  static_cast<int32>(i),
275                                  static_cast<int32>(size),
276                                  static_cast<int32>((*it)->buffer_size()),
277                                  (*it)->is_mutable() ? 'M' : 'F',
278                                  static_cast<int32>((*it)->ref_count()),
279                                  static_cast<int32>((*it)->size()));
280       size += (*it)->size();
281       ++it;
282     }
283   }
284   if ( markers_.empty() ) {
285     s += "[ No markers ]";
286   } else {
287     s += "Markers:\n";
288     int32 i = 0;
289     for ( MarkersList::const_iterator it_markers = markers_.begin();
290           it_markers != markers_.end(); ++it_markers ) {
291       s += strutil::StringPrintf(
292           "%4d sz:%4d @%4d - %4d\n",
293           static_cast<int32>(i),
294           static_cast<int32>(it_markers->first),
295           static_cast<int32>(it_markers->second->block_id()),
296           static_cast<int32>(it_markers->second->pos()));
297       ++i;
298     }
299   }
300   return s;
301 }
302
303 //////////////////////////////////////////////////////////////////////
304 //
305 // Appends - with PERFORMANCE discussion:
306 //
307
308 //////////////////////////////////////////////////////////////////////
309 //
310 // Performance test:
311 //
312 //  500 rtmp simultaneous rtmp clients for h264 streaming,
313 //  through a switching source
314 //  (and rest the same) @ 560 kbps
315 //  on one threaded whispercast server - 2.0 GHz Xeon, Ubuntu 8.04 32 bit
316 //
317
318 //////////////////////////////////////////////////////////////////////
319 //
320 // NO - __APPEND_WITH_BLOCK_REUSE__
321 // (the rest of defines have no effect)
322
323 // CPU: 22.8%  / 358 M  -- w/ initial reserve(kResizeThreshold) for BlockDeque
324 // CPU: 22.2%  / 355 M  -- w/o initial reserve for BlockDeque
325 // #undef  __APPEND_WITH_BLOCK_REUSE__
326
327 //////////////////////////////////////////////////////////////////////
328 //
329 // with __APPEND_WITH_BLOCK_REUSE__
330 //
331
332 // CPU: 18.5 / 352 M -- w/ initial reserve(kResizeThreshold) for BlockDeque
333 // CPU: 17.4 / 348 M -- w/o no reserve for BlockDeque
334 //
335 // ==== BEST PERFORMANCE ====
336 //
337 // Other performance numbers:
338 //  - for 1000 simultaneous rtmp clients  (and rest the same):
339 //      CPU: 40.64 / 410M
340 //  - for 2000 simultaneous rtmp clients  (and rest the same):
341 //      CPU: 92.44 / 550M
342 //  - for h264 streaming, direct from http source (and rest the same):
343 //      CPU: 16.9% / 350M
344 //  - for h263 streaming, through switching source (and rest the same):
345 //      CPU: 15.5% / 335M
346 //  - for h263 streaming, direct from http source (and rest the same):
347 //      CPU: 14.58% / 335M
348 #define  __APPEND_WITH_BLOCK_REUSE__
349 #undef   __APPEND_WITH_BLOCK_REUSE_PARTIAL__
350 #undef   __APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__
351
352 ////////////////////
353
354 // CPU: 24.3 / 628M  -- w/ initial reserve(kResizeThreshold) for BlockDeque
355 // #define  __APPEND_WITH_BLOCK_REUSE__
356 // #undef   __APPEND_WITH_BLOCK_REUSE_PARTIAL__
357 // #define  __APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__
358
359 //////////////////////////////////////////////////////////////////////
360 //
361 // with __APPEND_WITH_BLOCK_REUSE_PARTIAL__
362 //
363
364 // CPU: 19.33 / 349M  -- w/o initial reserve for BlockDeque
365 // #define  __APPEND_WITH_BLOCK_REUSE__
366 // #define  __APPEND_WITH_BLOCK_REUSE_PARTIAL__
367 // #undef   __APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__
368
369 ////////////////////
370
371 // CPU: 18.66 / 375M -- w/o initial reserve for BlockDeque
372 // #define  __APPEND_WITH_BLOCK_REUSE__
373 // #define  __APPEND_WITH_BLOCK_REUSE_PARTIAL__
374 // #define  __APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__
375
376 //////////////////////////////////////////////////////////////////////
377
378 // The other version performance (in "variant/memory_stream.{h,cc}")
379 // CPU: 20.6 / 409 M
380
381
382 #ifdef __APPEND_WITH_BLOCK_REUSE__
383
384 void MemoryStream::AppendStreamNonDestructive(const MemoryStream* buffer,
385                                               int32 size) {
386   if ( !buffer->MaybeInitReadPointer() ) {
387     return;
388   }
389   DataBlockPointer begin = buffer->read_pointer_;
390   DataBlockPointer end = buffer->write_pointer_;
391   if ( size >= 0 ) {
392     size = min(size, buffer->Size());
393     end = buffer->read_pointer_;
394     end.Advance(size);
395   } else {
396     size = buffer->Size();
397   }
398   if ( !buffer->IsEmpty() ) {
399     BlockDqueue::const_iterator
400         it = buffer->blocks_.buffer_it(begin.block_id());
401     while ( begin.block_id() <= end.block_id() ) {
402       const int32 pos_begin = begin.pos();
403       const int32 pos_end = (begin.block_id() == end.block_id() ?
404                              end.pos() : (*it)->size());
405       if ( pos_begin < pos_end ) {
406 #ifdef __APPEND_WITH_BLOCK_REUSE_PARTIAL__
407         AppendNewBlock((*it), pos_begin, pos_end);
408 #else
409         if ( pos_begin == 0 && pos_end == (*it)->buffer_size() )  {
410           AppendBlock(*it);
411         } else {
412           AppendNewBlock((*it), pos_begin, pos_end);
413         }
414 #endif
415       }
416       ++it;
417       begin.set_block_id(begin.block_id() + 1);
418       begin.set_pos(0);
419     }
420   }
421 }
422 void MemoryStream::AppendStream(MemoryStream* buffer, int32 size) {
423   AppendStreamNonDestructive(buffer, size);
424   if ( size >= 0 && size < buffer->Size() ) {
425     buffer->Skip(size);
426   } else {
427     buffer->Skip(buffer->Size());
428   }
429 }
430
431 #else
432
433 //////////////////////////////////////////////////////////////////////
434 //
435 // Another version for appends w/ more buffer copies.
436 //
437 void MemoryStream::AppendStreamNonDestructive(const MemoryStream* buffer,
438                                               int32 size) {
439   MemoryStream* mbuffer = const_cast<MemoryStream*>(buffer);
440   mbuffer->MarkerSet();
441   AppendStream(mbuffer, size);
442   mbuffer->MarkerRestore();
443 }
444
445 void MemoryStream::AppendStream(MemoryStream* buffer, int32 size) {
446   if ( !buffer->MaybeInitReadPointer() ) {
447     return;
448   }
449   char* my_buf;
450   int32 my_size;
451   GetScratchSpace(&my_buf, &my_size);
452
453   int32 to_read = size == -1 ? buffer->Size() : min(size, buffer->Size());
454   int32 their_size = buffer->Read(my_buf, min(to_read, my_size));
455   ConfirmScratch(their_size);
456   to_read -= their_size;
457   if ( to_read > 0 ) {
458     my_buf = new char[to_read];
459     CHECK_EQ(buffer->Read(my_buf, to_read), to_read);
460     AppendRaw(my_buf, to_read);
461   }
462 }
463
464 #endif  //  __APPEND_WITH_BLOCK_REUSE__
465
466 void MemoryStream::AppendNewBlock(DataBlock* data,
467                                   BlockSize pos_begin, BlockSize pos_end) {
468   DCHECK_GE(pos_end, pos_begin);
469   int32 size = pos_end - pos_begin;
470
471   if ( size > 0 ) {
472 #if defined(__APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__)
473     int32 available = write_pointer_.AvailableForWrite();
474     int32 delta = 0;
475     if ( available > 0 ) {
476       delta = Write(data->buffer() + pos_begin, min(available, size));
477     }
478     if ( delta < size ) {
479       AppendBlock(new DataBlock(data->buffer() + pos_begin + delta,
480                                 size - delta,
481                                 data->GetAllocBlock()));
482     }
483 #else
484     Write(data->buffer() + pos_begin, size);
485 #endif  // __APPEND_BLOCK_WITH_PARTIAL_BLOCK_REUSE__
486   }
487 }
488 }
Note: See TracBrowser for help on using the browser.