root/trunk/whisperlib/common/io/logio/test/logio_test.cc

Revision 7, 9.8 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 // Simple tests for logio module
33
34 #include <unistd.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
37
38 #include "common/base/types.h"
39 #include "common/base/log.h"
40 #include "common/base/timer.h"
41 #include "common/base/system.h"
42 #include "common/sync/thread.h"
43
44 #include "common/io/buffer/memory_stream.h"
45 #include "common/io/logio/logio.h"
46
47 //////////////////////////////////////////////////////////////////////
48
49 DEFINE_string(test_dir,
50               "/tmp",
51               "Where to write test logs");
52
53 DEFINE_string(test_filebase,
54               "testlog",
55               "Write logs w/ this prefix");
56
57 DEFINE_int32(rand_seed,
58              3274,
59              "Seed the random number generator w/ this");
60
61 DEFINE_int64(num_records,
62              20000,
63              "Number of records for the test");
64
65 DEFINE_int32(block_size,
66              16385,
67              "Create blocks of this size");
68
69 DEFINE_int32(blocks_per_file,
70              1003,
71              "Create files of this size (in blocks)");
72
73 DEFINE_int32(record_size,
74              200,
75              "Generate records of this size");
76
77 DEFINE_int32(writer_mess_probability,
78              10000,
79              "We mess-up the log w/ this probability :)");
80
81 DEFINE_int32(writer_stop_probability,
82              100000,
83              "Reintiialize the writer w/ this probability");
84
85 DEFINE_int32(reader_stop_probability,
86              100000,
87              "Reintiialize the reader w/ this probability");
88
89 DEFINE_int32(num_writers,
90              5,
91              "Use these many concurrent writers");
92
93 DEFINE_bool(deflate,
94             false,
95             "Zip records for writing");
96
97 //////////////////////////////////////////////////////////////////////
98
99 static unsigned int rand_seed;
100
101 void GenerateRecord(int64 rid,
102                     int32 tid,
103                     io::MemoryStream* rec) {
104   char* buf = new char[FLAGS_record_size];
105   memset(buf, 0, FLAGS_record_size);
106   *(reinterpret_cast<int32*>(buf)) = tid;
107   *(reinterpret_cast<int64*>(buf + sizeof(tid))) = rid;
108   rec->AppendRaw(buf, FLAGS_record_size);
109 }
110
111 int32 VerifyRecord(bool fresh_reader,
112                    int64* rid,
113                    int32 num_tids,
114                    io::MemoryStream* rec) {
115   CHECK_EQ(rec->Size(), FLAGS_record_size);
116   string s;
117   rec->ReadString(&s, FLAGS_record_size);
118   const int32 tid = *(reinterpret_cast<const int32*>(s.data()));
119   CHECK_LE(tid, num_tids);
120   CHECK_GE(tid, 0);
121   const int64 crt_rid =
122       *(reinterpret_cast<const int64*>(s.data() + sizeof(tid)));
123   if ( rid[tid] >= 0 ) {
124     if ( FLAGS_writer_mess_probability == 0 ) {
125       CHECK_EQ(rid[tid] + 1, crt_rid);
126     } else {
127       CHECK_LE(rid[tid] + 1, crt_rid);
128       if ( rid[tid] + 1 < crt_rid ) {
129         if ( !fresh_reader ) {
130           CHECK_EQ((crt_rid - rid[tid]) / (FLAGS_block_size - 4) %
131                    FLAGS_record_size,
132                    0) << " crt_rid: " << crt_rid
133                       << " rid[tid]: " << rid[tid];
134         }
135       }
136     }
137   }
138   rid[tid] = crt_rid;
139   return tid;
140 }
141
142 void WriterThread(int32 tid) {
143   io::LogWriter* writer =  new io::LogWriter(FLAGS_test_dir.c_str(),
144                                              FLAGS_test_filebase.c_str(),
145                                              false,
146                                              FLAGS_block_size,
147                                              FLAGS_blocks_per_file,
148                                              false,
149                                              FLAGS_deflate);
150   {
151     io::MemoryStream rec;
152     for ( int64 i = 0; i < FLAGS_num_records; ++i ) {
153       LOG_INFO_IF((i % 1000) == 0) << "tid: " << tid
154                                    << " Writing record: " << i;
155       GenerateRecord(i, tid, &rec);
156       CHECK(writer->WriteRecord(&rec));
157       const bool should_reopen =
158           (rand_r(&rand_seed) % FLAGS_writer_stop_probability) == 0;
159       if ( should_reopen ) {
160         LOG_INFO << " Re-opening the writer: " << tid << " @" << i;
161         delete writer;
162         writer =  new io::LogWriter(FLAGS_test_dir.c_str(),
163                                     FLAGS_test_filebase.c_str(),
164                                     false,
165                                     FLAGS_block_size,
166                                     FLAGS_blocks_per_file,
167                                     false,
168                                     FLAGS_deflate);
169       }
170       if ( FLAGS_writer_mess_probability > 0 ) {
171         const bool should_mess = (
172             rand_r(&rand_seed) % FLAGS_writer_mess_probability) == 0;
173         if ( should_mess && i < (FLAGS_num_records -
174                                  5 * (FLAGS_block_size / FLAGS_record_size)) ) {
175           LOG_INFO << " Messing file: " << writer->file_name();
176           int fd = ::open(writer->file_name().c_str(),
177                           O_WRONLY | O_APPEND | O_SYNC | O_CREAT | O_LARGEFILE,
178                           0644);
179           if ( fd >= 0 ) {
180             const int32 size = FLAGS_block_size * 3 / 4;
181             char* buf = new char[size];
182             CHECK_EQ(size, ::write(fd, buf, size));
183             delete [] buf;
184             ::close(fd);
185           }
186         }
187       }
188     }
189   }
190   delete writer;
191 }
192
193 void ReaderThread(io::LogPos pos, int32 num_tids) {
194   io::LogReader* reader = new io::LogReader(FLAGS_test_dir.c_str(),
195                                             FLAGS_test_filebase.c_str(),
196                                             FLAGS_block_size,
197                                             FLAGS_blocks_per_file);
198   int64* rids = new int64[num_tids];
199   for ( int32 i = 0; i < num_tids; ++i ) {
200     rids[i] = -1LL;
201   }
202   if ( !pos.IsNull() ) {
203     CHECK(reader->Seek(pos));
204   }
205   io::MemoryStream rec;
206   int32 tids_solved = 0;
207   bool fresh_reader = true;
208   while ( tids_solved < num_tids ) {
209     const bool should_reopen = (
210         FLAGS_reader_stop_probability > 0 &&
211         ((rand_r(&rand_seed) % FLAGS_reader_stop_probability) == 0));
212     if ( should_reopen ) {
213       io::LogPos pos(reader->Tell());
214       LOG_INFO << "Reopening Reader at: " << pos.ToString()
215                << " Vs. : " << reader->crt_pos().ToString();
216       delete reader;
217       reader = new io::LogReader(FLAGS_test_dir.c_str(),
218                                  FLAGS_test_filebase.c_str(),
219                                  FLAGS_block_size,
220                                  FLAGS_blocks_per_file);
221       if ( !pos.IsNull() ) {
222         CHECK(reader->Seek(pos));
223         CHECK(reader->Tell() == pos) << " AT: " << reader->Tell().ToString()
224                                      << " vs. " << pos.ToString();
225         LOG_INFO << " OK - > AT: " << reader->Tell().ToString()
226                  << " vs: " << reader->crt_pos().ToString();
227       }
228       fresh_reader = true;
229     }
230     if ( !reader->GetNextRecord(&rec) ) {
231       if ( FLAGS_writer_mess_probability == 0 ) {
232         CHECK_EQ(reader->num_errors(), 0);
233       }
234       sleep(1);
235     } else {
236       int32 tid = VerifyRecord(fresh_reader, rids, num_tids, &rec);
237       LOG_INFO_IF((rids[tid] % 1000) == 0) << " Reader: " << tid
238                                            << " @ " << rids[tid];
239       fresh_reader = false;
240       if ( rids[tid] > FLAGS_num_records - 50 ) {
241         if ( rids[tid] >= FLAGS_num_records - 1 ) {
242           tids_solved++;
243           LOG_INFO << " ===========> Solved TID: " << tid;
244         }
245       }
246       rec.Clear();
247     }
248   }
249 }
250
251 int main(int argc, char* argv[]) {
252   common::Init(argc, argv);
253   rand_seed = FLAGS_rand_seed;
254   srand(rand_seed);
255
256   LOG_INFO << "Clearing old files.."
257            << system(strutil::StringPrintf(
258                          "rm -f %s/%s_??????????_??????????",
259                          FLAGS_test_dir.c_str(),
260                          FLAGS_test_filebase.c_str()).c_str());
261   LOG_INFO << "Creating threads..";
262   thread::Thread** iothreads = new thread::Thread*[FLAGS_num_writers];
263   for ( int32 i = 0; i < FLAGS_num_writers; ++i ) {
264     iothreads[i] = new thread::Thread(NewCallback(&WriterThread, i));
265     iothreads[i]->SetJoinable();
266     iothreads[i]->Start();
267   }
268   sleep(2);
269
270   thread::Thread reader(NewCallback(&ReaderThread,
271                                     io::LogPos(),
272                                     FLAGS_num_writers));
273   reader.SetJoinable();
274   reader.Start();
275
276   LOG_INFO << " Waiting to join !";
277   for ( int32 i = 0; i < FLAGS_num_writers; ++i ) {
278     iothreads[i]->Join();
279     delete iothreads[i];
280     LOG_INFO << " Join: " << i;
281   }
282   delete [] iothreads;
283   reader.Join();
284   LOG_INFO << " Reader Join";
285 }
Note: See TracBrowser for help on using the browser.