root/trunk/whisperlib/common/io/logio/logio.cc

Revision 7, 19.4 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include <unistd.h>
33 #include <sys/stat.h>
34 #include <fcntl.h>
35 #include "common/io/logio/logio.h"
36 #include "common/base/strutil.h"
37 #include "common/io/ioutil.h"
38 #include "common/base/log.h"
39
40 namespace {
41 //////////////////////////////////////////////////////////////////////
42
43 const string ComposeFileName(const char* log_dir,
44                              const char* file_base,
45                              int32 block_size,
46                              int32 file_num) {
47   return strutil::NormalizePath(
48       strutil::StringPrintf("%s/%s_%010d_%010d",
49                             log_dir,
50                             file_base,
51                             static_cast<int32>(block_size),
52                             static_cast<int32>(file_num)));
53 }
54 }
55
56 //////////////////////////////////////////////////////////////////////
57
58 namespace io {
59
60 LogWriter::LogWriter(const char* log_dir,
61                      const char* file_base,
62                      bool sync_on_write,
63                      int32 block_size,
64                      int32 blocks_per_file,
65                      bool temporary_incomplete_file,
66                      bool deflate)
67   : log_dir_(log_dir),
68     file_base_(file_base),
69     sync_on_write_(sync_on_write),
70     re_(string("^") + string(file_base_) +
71         strutil::StringPrintf("_%010d", static_cast<int32>(block_size)) +
72         "_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]$"),
73     max_file_size_(block_size * blocks_per_file),
74     block_size_(block_size),
75     blocks_per_file_(blocks_per_file),
76     temporary_incomplete_file_(temporary_incomplete_file),
77     deflate_(deflate),
78     file_num_(-1),
79     fd_(INVALID_FD_VALUE),
80     recorder_(block_size, deflate) {
81 }
82 LogWriter::~LogWriter() {
83   Flush();
84   CloseLog();
85 }
86
87
88 bool LogWriter::ProcessWriteRecord(bool result, bool force_flush) {
89   const int32 leftover = recorder_.leftover();
90   if ( result ) {
91     if ( leftover == 0 || buf_.Size() > block_size_ ) {
92       if ( buf_.Size() % block_size_ != 0 ) {
93         recorder_.FinalizeContent(&buf_);
94       }
95       return WriteBuffer(force_flush);
96     }
97   }
98   if ( force_flush ) {
99     recorder_.FinalizeContent(&buf_);
100     WriteBuffer(force_flush);
101   }
102   return true;
103 }
104
105 bool LogWriter::WriteRecord(io::MemoryStream* in, bool force_flush) {
106   return ProcessWriteRecord(recorder_.AppendRecord(in, &buf_),
107                             force_flush);
108 }
109
110 bool LogWriter::WriteRecord(const char* buffer, int32 size, bool force_flush) {
111   return ProcessWriteRecord(recorder_.AppendRecord(buffer, size, &buf_),
112                             force_flush);
113 }
114
115 bool LogWriter::Flush() {
116   recorder_.FinalizeContent(&buf_);
117   if ( buf_.IsEmpty() ) {
118     return true;
119   }
120   return WriteBuffer(true);
121 }
122
123 string LogWriter::ComposeFileName(bool temp) const {
124   if ( file_num_ < 0 ) return "";
125   const string normal_file_name = ::ComposeFileName(
126       log_dir_.c_str(), file_base_.c_str(), block_size_, file_num_);
127   if ( !temp || io::Exists(normal_file_name) ) {
128     return normal_file_name;
129   }
130   return ::ComposeFileName(strutil::JoinPaths(log_dir_, "temp").c_str(),
131                            file_base_.c_str(), block_size_, file_num_);
132 }
133
134 LogPos LogWriter::Tell() const {
135   if ( fd_ == INVALID_FD_VALUE ) {
136     return LogPos();
137   }
138   int64 pos = ::lseek64(fd_, 0, SEEK_END);
139   if ( pos < 0 ) {
140     return LogPos(file_num_, 0, 0);
141   }
142   return LogPos(file_num_, pos / block_size_ + 1, 0);
143 }
144
145 //////////////////////////////////////////////////////////////////////
146
147 bool LogWriter::WriteBuffer(bool force_flush) {
148   if ( fd_ == INVALID_FD_VALUE ) {
149     if ( !OpenLog() ) return false;
150   }
151   int64 pos = 0;
152   do {
153     pos = ::lseek64(fd_, 0, SEEK_END);
154     if ( pos < 0 ) {
155       LOG_ERROR << " Error in lseek64: " << errno
156                 << " - " << ::GetLastSystemErrorDescription();
157       return false;
158     }
159     if ( pos >= max_file_size_ ) {
160       LOG_INFO << "Over the position: "
161                << file_name_ << " pos: "
162                << pos << " max_file_size_: " << max_file_size_;
163       if ( !OpenLog() ) return false;
164       continue;
165     }
166   } while ( pos >= max_file_size_ );
167   if ( pos % block_size_ != 0 ) {
168     if ( !NormalizeFileSize(pos) ) {
169       return false;
170     }
171   }
172   string content;
173   buf_.MarkerSet();
174   buf_.ReadString(&content);
175   CHECK_EQ(content.size() % block_size_, 0)
176     << " - content.size(): " << content.size()
177     << " - block_size_: " << block_size_;
178
179   // We are in a good position - do the write
180   const int cb = ::write(fd_, content.data(), content.size());
181   if ( cb < 0 ) {
182     buf_.MarkerRestore();
183     LOG_ERROR << " Error writing the log" << errno
184               << " - " << ::GetLastSystemErrorDescription();
185     return false;
186   }
187   if ( content.size() != cb ) {
188     buf_.MarkerRestore();
189     LOG_ERROR << " Trouble in log write - succeded only" << cb
190               << " out of " << content.size();
191     return false;
192   }
193   buf_.MarkerClear();
194   if ( force_flush ) {
195     fdatasync(fd_);
196   }
197   return true;
198 }
199
200 bool LogWriter::OpenLog() {
201   if ( fd_ == INVALID_FD_VALUE ) {
202     file_num_ = io::GetLastNumberedFile(log_dir_, &re_, 10);
203     if ( file_num_ < -1 ) {
204       return false;
205     }
206     if ( file_num_  < 0 ) {
207       file_num_ = 0;
208     }
209     if ( temporary_incomplete_file_ ) {
210       if ( !io::Mkdir(strutil::JoinPaths(log_dir_, "temp")) ) {
211         LOG_ERROR << "Failed to create temporary directory";
212         return false;
213       }
214     }
215   } else {
216     CloseLog();
217     CHECK_GE(file_num_, 0);
218     ++file_num_;
219   }
220   const string filename(ComposeFileName(temporary_incomplete_file_));
221   DLOG_INFO << "New log file: " << filename;
222   fd_ = ::open(filename.c_str(),
223                (sync_on_write_ ? O_SYNC : 0) |
224                O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE,
225                0644);
226   if ( fd_ < 0 ) {
227     LOG_ERROR << " Error opening log writer for file : "<< filename
228               << " err: " << errno << " - "
229               << ::GetLastSystemErrorDescription();
230     fd_ = INVALID_FD_VALUE;
231     file_num_ = -1;
232     file_name_ = "";
233     return false;
234   }
235   file_name_ = filename;
236   return true;
237 }
238 void LogWriter::CloseLog() {
239   if ( fd_ == INVALID_FD_VALUE ) {
240     return;
241   }
242   ::close(fd_);
243   fd_ = INVALID_FD_VALUE;
244   if ( temporary_incomplete_file_ ) {
245     const string final_file_name = ComposeFileName(false);
246     if ( final_file_name != file_name_ ) {
247       if ( !io::Rename(file_name_, final_file_name, false) ) {
248         LOG_ERROR << "Error renaming temporary file: [" << file_name_ << "]"
249                      " to: [" << final_file_name << "]";
250       }
251     }
252   }
253   file_name_ = "";
254 }
255
256 bool LogWriter::NormalizeFileSize(int64 pos) {
257   const int32 delta = pos % block_size_;
258   if ( delta == 0 ) return true;
259   CHECK_NE(fd_, INVALID_FD_VALUE);
260
261   const int fd_copy = ::open(file_name_.c_str(),
262                              O_WRONLY | O_SYNC | O_LARGEFILE,
263                              0644);
264   if ( fd_copy < 0 ) {
265     LOG_ERROR << " Error opening a file for normalization: "<< file_name_
266               << " err: " << errno << " - "
267               << ::GetLastSystemErrorDescription();
268     return false;
269   }
270   LOG_WARNING << " Normalizing a file: " << file_name_;
271   if ( ::lseek64(fd_copy, pos - delta, SEEK_SET) < 0 ) {
272     LOG_ERROR << " Error seeking while normalizing a file: "<< file_name_
273               << " err: " << errno << " - "
274               << ::GetLastSystemErrorDescription();
275     ::close(fd_copy);
276     return false;
277   }
278   if ( ::lockf(fd_copy, F_LOCK, block_size_) ) {
279     LOG_ERROR << " Error locking while normalizing a file: "<< file_name_
280               << " err: " << errno << " - "
281               << ::GetLastSystemErrorDescription();
282     return false;
283   }
284   const int64 locked_pos = ::lseek64(fd_copy, 0, SEEK_END);
285   if ( locked_pos < 0 ) {
286     LOG_ERROR << "Everything got screwed. Cannot recover .."
287               << " err: " << errno << " - "
288               << ::GetLastSystemErrorDescription();
289     ::close(fd_copy);   // unlocks
290     return false;
291   }
292   const int32 locked_delta = locked_pos % block_size_;
293   bool success = false;
294   if ( locked_delta == 0 ) {
295     LOG_INFO << " Already normalized !!";
296     success = true;
297   } else if ( locked_pos != pos ) {
298     // This is really strange ... btw
299     LOG_WARNING << "Amazingly strange situation: pos: " << pos
300                << " locked_pos: " << locked_pos
301                << " filename: " << file_name_;
302     success = NormalizeFileSize(locked_pos);
303   } else {
304     if ( ::lseek64(fd_copy, pos - delta, SEEK_SET) < 0 ) {
305       LOG_ERROR << " Error re-seeking while normalizing a file: "<< file_name_
306                 << " err: " << errno << " - "
307                 << ::GetLastSystemErrorDescription();
308       ::close(fd_copy);  // unlocks
309       return false;
310     }
311     if ( ::lseek64(fd_copy, delta, SEEK_CUR) > 0 ) {
312       char* const buf = new char[block_size_ - delta];
313       if ( write(fd_copy, buf, block_size_ - delta) != block_size_ - delta ) {
314         LOG_ERROR << " Error writing while normalizing a file: "<< file_name_
315                   << " err: " << errno << " - "
316                   << ::GetLastSystemErrorDescription();
317       } else {
318         LOG_INFO << " Normalized file: " << file_name_
319                  << "@: " << pos << " w/ " << block_size_ - delta << " bytes.";
320         success = true;
321       }
322     } else {
323       LOG_ERROR << " Error seeking while preparing to normalize a file: "
324                 << file_name_ << " err: " << errno << " - "
325                 << ::GetLastSystemErrorDescription();
326     }
327   }
328   if ( ::lseek64(fd_copy, pos - delta, SEEK_SET) < 0 ) {
329     LOG_ERROR << " Error seeking after a file normalization: "<< file_name_
330               << " err: " << errno << " - "
331               << ::GetLastSystemErrorDescription()
332               << " - we must fail (as the file is locked..";
333   } else {
334     if ( ::lockf(fd_copy, F_ULOCK, block_size_) ) {
335       LOG_ERROR << " Error unlocking after normalization: "<< file_name_
336                 << " err: " << errno << " - "
337                 << ::GetLastSystemErrorDescription()
338                 << " - we must fail (as the file is locked..";
339     }
340   }
341   ::close(fd_copy);   // this should unlock everything anyway
342   return success;
343 }
344
345 //////////////////////////////////////////////////////////////////////
346
347
348 LogReader::LogReader(const char* log_dir,
349                      const char* file_base,
350                      int32 block_size,
351                      int32 blocks_per_file)
352   : log_dir_(log_dir),
353     file_base_(file_base),
354     re_(string("^") + string(file_base_) +
355         strutil::StringPrintf("_%010d", static_cast<int32>(block_size)) +
356         "_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]$"),
357     block_size_(block_size),
358     blocks_per_file_(blocks_per_file),
359     fd_(INVALID_FD_VALUE),
360     crt_pos_(),
361     last_read_pos_(),
362     reader_(block_size),
363     num_errors_(0),
364     last_errno_(0) {
365 }
366
367 LogReader::~LogReader() {
368   Close();
369 }
370
371 bool LogReader::Seek(const LogPos& log_pos) {
372   last_errno_ = 0;
373   if ( log_pos.IsNull() ) {
374     return Rewind();
375   }
376   reader_.Clear();
377   buf_.Clear();
378   if ( !OpenFile(log_pos.file_num_) ) {
379     return false;
380   }
381   return AdvanceToPosInFile(log_pos);
382 }
383
384 bool LogReader::GetNextRecord(io::MemoryStream* out) {
385   last_errno_ = 0;
386   if ( fd_ == INVALID_FD_VALUE ) {
387     if ( !Rewind() ) {
388       return false;
389     }
390   }
391   while ( true ) {
392     CHECK(!crt_pos_.IsNull());
393     RecordReader::ReadResult res = reader_.ReadRecord(&buf_, out);
394     if ( res == RecordReader::READ_NO_DATA ) {
395       // LOG_ERROR << " No more data=========:";
396       if ( !ReadNextBlock() ) {
397         return false;
398       }
399     } else {
400       if ( res == RecordReader::READ_OK ) {
401         crt_pos_.record_num_++;
402         last_read_pos_ = crt_pos_;
403         return true;
404       }
405       LOG_ERROR << " Log error: CRC in [" << file_name_
406                 << " @: " << crt_pos_.ToString();
407       buf_.Clear();
408       reader_.Clear();  // force the read of the next block
409       num_errors_++;
410     }
411   }
412   return false;
413 }
414
415 bool LogReader::Rewind() {
416   CloseInternal(false);
417   vector<string> files;
418   // TODO(cpopescu): well .. this is almost there ..
419   //                 as we don't count in the block_size
420   if ( !DirList(log_dir_ + "/", &files, false, &re_) ) {
421     LOG_WARNING << " No file to open for reader: "
422                 << log_dir_ << "/" << file_base_ << "_x_x";
423     return false;
424   }
425   if ( files.empty() ) {
426     // Not an error though ..
427     LOG_INFO << " No log file to read found .. "
428              << log_dir_ << "/" << file_base_ << "_x_x";
429     return false;
430   }
431   sort(files.begin(), files.end());
432
433   CHECK_GT(files.front().size(), 10);
434   errno = 0;  // essential as strtol would not set a 0 errno
435   const int32 file_num = strtol(
436     files.front().c_str() + files.front().size() - 10, NULL, 10);
437   if ( errno || file_num < 0 ) {
438     LOG_ERROR << " Invalid file found: " << files.back();
439     return false;
440   }
441   return OpenFile(file_num);
442 }
443
444 string LogReader::ComposeFileName(int32 file_num) const {
445   if ( file_num < 0 ) return "";
446   return ::ComposeFileName(log_dir_.c_str(), file_base_.c_str(),
447                            block_size_, file_num);
448 }
449
450 //////////////////////////////////////////////////////////////////////
451
452 void LogReader::CloseInternal(bool on_error) {
453   if ( fd_ != INVALID_FD_VALUE ) {
454     ::close(fd_);
455   }
456   fd_ = INVALID_FD_VALUE;
457   file_name_ = "";
458   crt_pos_ = LogPos();
459   last_read_pos_ = LogPos();
460   if ( on_error ) {
461     last_errno_ = errno;
462     num_errors_++;
463   }
464   reader_.Clear();
465   buf_.Clear();
466 }
467
468 bool LogReader::OpenFile(int32 file_num) {
469   if ( fd_ != INVALID_FD_VALUE ) {
470     if ( file_num == crt_pos_.file_num_ ) {
471       if ( ::lseek64(fd_, 0, SEEK_SET) < 0 ) {
472         CloseInternal(true);
473         return false;
474       }
475       return true;
476     }
477     CloseInternal(false);
478   }
479   const string filename = ComposeFileName(file_num);
480   fd_ = ::open(filename.c_str(), O_RDONLY | O_LARGEFILE);
481   if ( fd_ < 0 ) {
482     LOG_ERROR << " Error opening log reader for file : "<< filename
483               << " err: " << errno << " - "
484               << ::GetLastSystemErrorDescription();
485     CloseInternal(true);
486     return false;
487   }
488   crt_pos_ = LogPos(file_num, 0, 0);
489   last_read_pos_ = crt_pos_;
490   file_name_ = filename;
491   return true;
492 }
493
494 bool LogReader::AdvanceToPosInFile(const LogPos& log_pos) {
495   CHECK_EQ(log_pos.file_num_, crt_pos_.file_num_);
496   CHECK_NE(fd_, INVALID_FD_VALUE);
497   crt_pos_.block_num_ = log_pos.block_num_ - 1;
498   crt_pos_.record_num_ = 0;
499
500
501   if ( ::lseek64(fd_, crt_pos_.block_num_ * block_size_, SEEK_SET) < 0 ) {
502     CloseInternal(true);
503     return false;
504   }
505   reader_.Clear();
506   buf_.Clear();
507   crt_pos_.record_num_ = 0;
508   if ( !ReadNextBlock() ) {
509     if ( log_pos.record_num_ > 0 ) {
510       CloseInternal(true);
511       return false;
512     }
513     if ( crt_pos_.block_num_ < 0 ) {
514       crt_pos_.block_num_ = 0;
515     }
516     last_read_pos_ = crt_pos_;
517     return true;
518   }
519   last_read_pos_ = crt_pos_;
520   while ( crt_pos_.record_num_ < log_pos.record_num_ ) {
521     RecordReader::ReadResult res = reader_.ReadRecord(&buf_, NULL);
522     if ( res != RecordReader::READ_OK ) {
523       CloseInternal(true);
524       return false;
525     }
526     ++crt_pos_.record_num_;
527   }
528   last_read_pos_ = crt_pos_;
529   return true;
530 }
531
532 bool LogReader::ReadNextBlock() {
533   CHECK_NE(fd_, INVALID_FD_VALUE);
534
535   char* buffer = new char[block_size_];
536   const int32 cb = ::read(fd_, buffer, block_size_);
537   if ( cb < 0 ) {
538     delete [] buffer;
539     LOG_ERROR << " Error reading: " << file_name_
540               << " errno: " << errno
541               << " - " << ::GetLastSystemErrorDescription();
542     CloseInternal(true);
543     return false;
544   }
545
546   // Incomlete read..
547   if ( cb < block_size_ ) {
548     delete [] buffer;
549     // It is a possibility to be in the new file !
550     if ( crt_pos_.block_num_ >= blocks_per_file_ ) {
551       const string filename = ComposeFileName(crt_pos_.file_num_ + 1);
552       const int fd = ::open(filename.c_str(), O_RDONLY | O_LARGEFILE);
553       if ( fd < 0 ) {
554         // A new file not available yet..
555         return false;
556       }
557       // Cool - advance to the new file
558       ::close(fd_);
559       fd_ = fd;
560       crt_pos_ = LogPos(crt_pos_.file_num_ + 1, 0, 0);
561       file_name_ = filename;
562       return ReadNextBlock();
563     }
564     if ( cb > 0 ) {
565       // Strange - but possible.. we need to revert
566       LOG_WARNING << "Strange block read from : " << file_name_
567                   << " size: " << cb
568                   << " @: " << crt_pos_.ToString();
569       if ( ::lseek64(fd_, -cb, SEEK_CUR) < 0 ) {
570         LOG_ERROR << " Error seeking back: " << file_name_
571                   << " errno: " << errno
572                   << " - " << ::GetLastSystemErrorDescription();
573         CloseInternal(true);
574       }
575     }
576     return false;
577   }
578   crt_pos_.record_num_ = 0;
579   crt_pos_.block_num_++;
580
581   buf_.AppendRaw(buffer, block_size_);
582
583   return true;
584 }
585
586 //////////////////////////////////////////////////////////////////////
587
588 int32 CleanLog(const char* log_dir, const char* file_base, LogPos first_pos,
589                int32 block_size) {
590   re::RE re(string("^") + string(file_base) +
591             strutil::StringPrintf("_%010d", static_cast<int32>(block_size)) +
592             "_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]$");
593   vector<string> files;
594   if ( !DirList(string(log_dir) + "/", &files, false, &re) ) {
595     return 0;
596   }
597   if ( files.empty() ) {
598     return 0;
599   }
600   sort(files.begin(), files.end());
601
602   int32 num_deleted = 0;
603   for ( int32 i = 0; i < files.size(); ++i ) {
604     CHECK_GT(files[i].size(), 10);
605     errno = 0;  // essential as strtol would not set a 0 errno
606     const int32 file_num = strtol(files[i].c_str() + files[i].size() - 10,
607                                   NULL, 10);
608     if ( errno || file_num < 0 ) {
609       LOG_ERROR << " FileNum : " << file_num <<  " for " << files[i];
610       continue;
611     }
612     if ( file_num >= first_pos.file_num_ ) {
613       return num_deleted;
614     }
615     const string filename = ComposeFileName(log_dir, file_base,
616                                             block_size, file_num);
617     if ( unlink(filename.c_str()) ) {
618       LOG_ERROR << "Error removing file: " << filename << " : " << errno
619                 << " - " << ::GetLastSystemErrorDescription();
620     } else {
621       ++num_deleted;
622     }
623   }
624   return num_deleted;
625 }
626
627 //////////////////////////////////////////////////////////////////////
628 }
Note: See TracBrowser for help on using the browser.