root/trunk/whisperlib/common/io/file/aio_file.cc

Revision 7, 8.0 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "common/io/file/aio_file.h"
33 #include "common/base/free_list.h"
34 #include "common/base/errno.h"
35
36 //////////////////////////////////////////////////////////////////////
37
38 DEFINE_int32(max_concurrent_aio_ops,
39              64,
40              "We start at most these many aio operations per thread");
41
42 //////////////////////////////////////////////////////////////////////
43
44 // If this is defined we want to log periodically aio operations
45 #undef __AIO_DEEP_LOG__
46
47 // Helper Namespace
48
49 namespace {
50
51 static const int32 kMaxConcurrentRequests = 65536;
52
53
54 // The guy which perfoms the actual reads - run this in a thread -
55 void MainAioProcessThread(
56   int32 block_type,
57   int lio_opcode,
58   io::AioManager::ReqQueue* reqs,
59   io::AioManager::ReqQueue* resps) {
60   util::FreeList<struct aiocb> aio_freelist(kMaxConcurrentRequests);
61   struct aiocb* ops[kMaxConcurrentRequests];
62   io::AioManager::Request* crt_reqs[kMaxConcurrentRequests];
63
64   LOG_INFO << "Starting AIO processing thread: " << pthread_self()
65            << " Block Type: " << block_type << " - opcode: " << lio_opcode;
66
67   io::AioManager::Request* crt = NULL;
68   int32 crt_ndx = 0;
69 #ifdef __AIO_DEEP_LOG__
70   int64 total_time = 0;
71   int64 total_ops = 0;
72   int64 total_bytes = 0;
73 #endif
74   do {
75     // TODO(cpopescu): Shall we wait a little for the first result ?
76     //                 (like 1 ms ?)
77     while ( (crt = reqs->Get(0)) != NULL &&
78             crt_ndx < FLAGS_max_concurrent_aio_ops ) {
79       crt_reqs[crt_ndx] = crt;
80       struct aiocb* p = aio_freelist.New();
81       ops[crt_ndx] = crt->PrepareAioCb(p, lio_opcode);
82       ++crt_ndx;
83     }
84     if ( crt_ndx > 0 ) {
85       LOG_INFO_IF(crt_ndx >= FLAGS_max_concurrent_aio_ops)
86         << " AIO operations Maxed out !!";
87 #ifdef __AIO_DEEP_LOG__
88       const int64 now = timer::TicksMsec();
89 #endif
90       int64 cb = 0;
91       const int err = lio_listio(LIO_WAIT, ops, crt_ndx, NULL);
92 #ifdef __AIO_DEEP_LOG__
93       const int64 delta = (timer::TicksMsec() - now);
94 #endif
95       if ( err && errno != EIO ) {
96         LOG_ERROR << " ERROR in lio_listio: "
97                   << GetSystemErrorDescription(errno);
98         for ( int32 i = 0; i < crt_ndx; ++i ) {
99           crt_reqs[i]->errno_ = errno;
100           crt_reqs[i]->result_ = -1;
101           resps->Put(crt_reqs[i]);
102           aio_freelist.Dispose(ops[i]);
103         }
104       } else {
105         for ( int32 i = 0; i < crt_ndx; ++i ) {
106           struct aiocb* p = ops[i];
107           crt_reqs[i]->errno_ = aio_error(p);
108           if ( crt_reqs[i]->errno_ < 0 ) {
109             LOG_ERROR << " I/O error on file: " << crt_reqs[i]->errno_
110                       << " - " << GetSystemErrorDescription(errno);
111           }
112           crt_reqs[i]->result_ = crt_reqs[i]->errno_ == 0 ? aio_return(p) : -1;
113           cb += crt_reqs[i]->result_;
114           resps->Put(crt_reqs[i]);
115           aio_freelist.Dispose(p);
116         }
117 #ifdef __AIO_DEEP_LOG__
118         total_time += delta;
119         total_ops += crt_ndx;
120         total_bytes += cb;
121         LOG_INFO << " AIO Ops: " << crt_ndx
122                  << " t=" << delta << " ms"
123                  << " sz=" << cb  << " B  " << cb / (delta + 1) << " KBps"
124                  << " AVG : #ops " << (static_cast<double>(total_ops) * 1000 /
125                                        total_time)
126                  << " op/sec " << " speed: " << total_bytes  / (total_time + 1)
127                  << " KBps.";
128 #endif
129       }
130     }
131     crt_ndx = 0;
132     crt = reqs->Get();     // waiting part :)
133     if ( crt != NULL ) {
134       crt_reqs[crt_ndx] = crt;
135       struct aiocb* p = aio_freelist.New();
136       ops[crt_ndx] = crt->PrepareAioCb(p, lio_opcode);
137       ++crt_ndx;
138     }
139   } while ( crt != NULL);
140   LOG_INFO << "Exiting AIO processing thread: " << pthread_self();
141 }
142 }
143
144 //////////////////////////////////////////////////////////////////////
145
146 namespace io {
147
148 struct aiocb* AioManager::Request::PrepareAioCb(struct aiocb* p,
149                                                 int lio_opcode) const {
150   bzero(p, sizeof(*p));
151   p->aio_fildes = fd_;
152   p->aio_buf = buffer_;
153   p->aio_nbytes = size_;
154   p->aio_offset = offset_;
155
156   p->aio_lio_opcode = lio_opcode;
157   return p;
158 }
159
160 AioManager::AioManager(const char* name, net::Selector* selector)
161   : name_(name),
162     selector_(selector),
163     response_queue_(kMaxConcurrentRequests),
164     response_thread_(NewCallback(
165                        this, &AioManager::ProcessResponses)) {
166   CHECK(response_thread_.SetJoinable());
167   CHECK(response_thread_.SetStackSize(PTHREAD_STACK_MIN + (1 << 20)));
168   CHECK(response_thread_.Start());
169   for ( int32 i = 0; i < NUM_OPS; ++i ) {
170     const int lio_opcode = i < kNumBlockTypes ? LIO_READ : LIO_WRITE;
171     for ( int32 j = 0; j < kNumBlockTypes; ++j ) {
172       const int32 ndx = i * kNumBlockTypes + j;
173       request_queues_[ndx] = new ReqQueue(kMaxConcurrentRequests);
174       Closure* const c = NewCallback(&MainAioProcessThread,
175                                      j,
176                                      lio_opcode,
177                                      request_queues_[ndx],
178                                      &response_queue_);
179       aio_threads_[ndx] = new thread::Thread(c);
180       // 1MB - over the minimum ..
181       CHECK(aio_threads_[ndx]->SetStackSize(PTHREAD_STACK_MIN + (1 << 20)));
182       CHECK(aio_threads_[ndx]->SetJoinable());
183       CHECK(aio_threads_[ndx]->Start());
184     }
185   }
186 }
187
188 AioManager::~AioManager() {
189   LOG_INFO << " Deleting: " << this;
190   for ( size_t i = 0; i < NUMBEROF(request_queues_); ++i ) {
191     request_queues_[i]->Put(NULL);
192     request_queues_[i]->Put(NULL);   // need to put two of them :)
193   }
194   for ( size_t i = 0; i < NUMBEROF(aio_threads_); ++i ) {
195     LOG_INFO << " Waiting for aio_thread " << i << " to stop.";
196     aio_threads_[i]->Join();
197     delete aio_threads_[i];
198   }
199   for ( size_t i = 0; i < NUMBEROF(request_queues_); ++i ) {
200     delete request_queues_[i];
201   }
202   response_queue_.Put(NULL);
203   LOG_INFO << " Waiting for response thread..";
204   response_thread_.Join();
205   LOG_INFO << " AioManager ended !";
206 }
207
208 void AioManager::ProcessResponses() {
209   while ( true ) {
210     AioManager::Request* resp = response_queue_.Get();
211     if ( resp == NULL ) {
212       break;
213     }
214     if ( resp->closure_ == NULL ) {
215       continue;
216     }
217     selector_->RunInSelectLoop(resp->closure_);
218   }
219   LOG_INFO << "Response processor ended ..";
220 }
221
222 void AioManager::Read(Request* req) {
223   const int32 ndx = OP_READ * kNumBlockTypes + SizePool(req->size_);
224   request_queues_[ndx]->Put(req);
225 }
226 void AioManager::Write(Request* req) {
227   const int32 ndx = OP_WRITE * kNumBlockTypes + SizePool(req->size_);
228   request_queues_[ndx]->Put(req);
229 }
230 }
Note: See TracBrowser for help on using the browser.