root/trunk/whisperlib/common/io/logio/recordio.cc

Revision 7, 8.3 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "common/io/logio/recordio.h"
33
34 namespace {
35 int32 ComputeCRC(io::MemoryStream* buf) {
36   int32 crc = static_cast<int32>(crc32(0L, Z_NULL, 0));
37   buf->MarkerSet();
38   while ( !buf->IsEmpty() ) {
39     int32 crt_size = buf->Size();
40     const char* crt_buf = NULL;
41     CHECK(buf->ReadNext(&crt_buf, &crt_size));
42     crc = crc32(crc, reinterpret_cast<const Bytef*>(crt_buf), crt_size);
43   }
44   buf->MarkerRestore();
45   return crc;
46 }
47 }
48
49 // COMPILE_ASSERT(sizeof(uLong) == sizeof(int32),  zlib_uLong_should_be_32_bit);
50
51 namespace io {
52
53 //////////////////////////////////////////////////////////////////////
54
55 RecordWriter::RecordWriter(int32 block_size,
56                            bool deflate,
57                            float dumpable_percent)
58   : block_size_(block_size),
59     dumpable_size_(static_cast<int32>(dumpable_percent * block_size)),
60     content_(common::BIGENDIAN, block_size_),
61     padding_(new char[block_size_]),
62     zlib_(deflate ? new ZlibDeflateWrapper() : NULL),
63     zlib_content_(common::BIGENDIAN, block_size_) {
64   CHECK_LT(block_size_, kMaximumRecordBlockSize);
65   bzero(padding_, block_size_);
66 }
67
68
69 RecordWriter::~RecordWriter() {
70   CHECK(content_.IsEmpty());
71   delete [] padding_;
72   delete zlib_;
73   zlib_ = NULL;
74 }
75
76 bool RecordWriter::AppendRecord(io::MemoryStream* in,
77                                 io::MemoryStream* out,
78                                 bool is_zipped) {
79   bool retval = false;
80   if ( zlib_ && !is_zipped ) {
81     DCHECK(zlib_content_.IsEmpty());
82     CHECK(zlib_->Deflate(in, &zlib_content_));
83     return AppendRecord(&zlib_content_, out, true);
84   }
85   do {
86     const int32 max_to_write = max_block_record_size() - content_.Size();
87     if ( max_to_write <= 0 ) {
88       retval = true;
89       FinalizeContent(out);
90     } else if ( max_to_write >= in->Size() ) {
91       io::NumStreamer::WriteByte(&content_,
92                                  is_zipped ? IS_ZIPPED : 0,
93                                  common::BIGENDIAN);
94       io::NumStreamer::WriteUInt24(&content_, in->Size(), common::BIGENDIAN);
95       content_.AppendStream(in);
96     } else if ( content_.Size() > dumpable_size_ ) {
97       retval = true;
98       FinalizeContent(out);
99     } else {
100       io::NumStreamer::WriteByte(&content_,
101                                  is_zipped ? IS_ZIPPED | HAS_CONT : HAS_CONT,
102                                  common::BIGENDIAN);
103       io::NumStreamer::WriteUInt24(&content_, max_to_write, common::BIGENDIAN);
104       content_.AppendStream(in, max_to_write);
105     }
106   } while ( !in->IsEmpty() );
107
108   return retval;
109 }
110
111 bool RecordWriter::AppendRecord(const char* buffer, int32 size,
112                                 io::MemoryStream* out) {
113   if ( zlib_ ) {
114     CHECK(zlib_->Deflate(buffer, size, &zlib_content_));
115     return AppendRecord(&zlib_content_, out, true);
116   }
117   bool retval = false;
118   const char* p = buffer;
119   do {
120     const int32 max_to_write = max_block_record_size() - content_.Size();
121     if ( max_to_write <= 0 ) {
122       retval = true;
123       FinalizeContent(out);
124     } else if ( max_to_write >= size ) {
125       io::NumStreamer::WriteByte(&content_, 0, common::BIGENDIAN);
126       io::NumStreamer::WriteUInt24(&content_, size, common::BIGENDIAN);
127       content_.Write(p, size);
128       p += size;
129       size = 0;
130     } else if ( content_.Size() > dumpable_size_ ) {
131       retval = true;
132       FinalizeContent(out);
133     } else {
134       io::NumStreamer::WriteByte(&content_, HAS_CONT, common::BIGENDIAN);
135       io::NumStreamer::WriteUInt24(&content_, max_to_write, common::BIGENDIAN);
136       content_.Write(p, max_to_write);
137       p += max_to_write;
138       size -= max_to_write;
139     }
140   } while ( size > 0 );
141
142   return retval;
143 }
144
145 void RecordWriter::FinalizeContent(io::MemoryStream* out) {
146   if ( content_.IsEmpty() ) return;
147   const int32 content_size = content_.Size();
148   const int32 padding_size = block_size_ - content_size - kTrailerEnd;
149   CHECK_GE(padding_size, 0) << " content: " << content_.DetailedContent();
150   if ( padding_size > 0 ) {
151     content_.Write(padding_, padding_size);
152   }
153   io::NumStreamer::WriteInt32(&content_, content_size, common::BIGENDIAN);
154   io::NumStreamer::WriteInt32(&content_, ComputeCRC(&content_),
155                               common::BIGENDIAN);
156   out->AppendStream(&content_);
157
158   DCHECK(content_.IsEmpty());
159 }
160
161 //////////////////////////////////////////////////////////////////////
162
163 RecordReader::RecordReader(int32 block_size)
164   : block_size_(block_size),
165     temp_(common::BIGENDIAN, block_size_),
166     content_(common::BIGENDIAN, block_size_),
167     zip_content_(common::BIGENDIAN, block_size_) {
168 }
169
170 RecordReader::~RecordReader() {
171 }
172
173 RecordReader::ReadResult RecordReader::ReadNextBlock(io::MemoryStream* in) {
174   if ( in->Size() < block_size_ ) {
175     return READ_NO_DATA;
176   }
177   temp_.Clear();
178   temp_.AppendStream(in, block_size_ - kTrailerEnd);
179   const int32 content_size = io::NumStreamer::ReadInt32(in, common::BIGENDIAN);
180   io::NumStreamer::WriteInt32(&temp_, content_size, common::BIGENDIAN);
181   const int32 crc = io::NumStreamer::ReadInt32(in, common::BIGENDIAN);
182   if ( ComputeCRC(&temp_) != crc ) {
183     temp_.Clear();
184     return READ_CRC_CORRUPTED;
185   }
186   CHECK_LE(content_size, block_size_ - kTrailerEnd);
187   content_.AppendStream(&temp_, content_size);
188   return READ_OK;
189 }
190
191 RecordReader::ReadResult RecordReader::ReadRecord(io::MemoryStream* in,
192                                                   io::MemoryStream* out) {
193   if ( content_.IsEmpty() ) {
194     RecordReader::ReadResult ret = ReadNextBlock(in);
195     if ( ret != READ_OK )
196       return ret;
197   }
198   // At this point we have verified data in content_.
199   uint8 flags = 0;
200   int zip_error = Z_STREAM_END;
201   bool unzip = false;
202   zlib_.Clear();
203
204   do {
205     // TODO(cpopescu): probably this meas a bug somewhere at reading or writing
206     //                 shall we mark a corruption and live ?
207     CHECK_GE(content_.Size(), sizeof(int32));
208     flags = io::NumStreamer::ReadByte(&content_,
209                                       common::BIGENDIAN);
210     int32 crt_len = io::NumStreamer::ReadUInt24(&content_,
211                                                 common::BIGENDIAN);
212     if ( crt_len > 0 ) {
213       if ( out ) {
214         if ( !(flags & RecordWriter::IS_ZIPPED) ) {
215           out->AppendStream(&content_, crt_len);
216         } else {
217           zip_content_.AppendStream(&content_, crt_len);
218           unzip = true;
219         }
220       } else {
221         content_.Skip(crt_len);
222       }
223     }
224     if ( (flags & RecordWriter::HAS_CONT) && content_.IsEmpty() ) {
225       RecordReader::ReadResult ret = ReadNextBlock(in);
226       if ( ret != READ_OK )
227         return ret;
228     }
229   } while ( flags & RecordWriter::HAS_CONT );
230
231   if ( !zip_content_.IsEmpty() ) {
232     if ( out ) {
233       zip_error = zlib_.Inflate(&zip_content_, out);
234       if ( zip_error != Z_STREAM_END ) {
235         zip_content_.Clear();
236         return READ_ZIP_CORRUPTED;
237       }
238     }
239     zip_content_.Clear();
240   }
241   return READ_OK;
242 }
243 }
Note: See TracBrowser for help on using the browser.