root/trunk/whisperlib/common/io/file/aio_file.h

Revision 7, 6.0 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31 //
32 // Engine for AIO operations. Here is how things go:
33 //  - the AIO thread runs in a different thread(s) and performs commands
34 //  - submit commands to us from another thread which contain an operation code,
35 //    parameters and a completion callback.
36 //  - we split commands into read / write / and based on size..
37 //  - you provide us w/ a selector where we run your completed callbacks.
38 //  - upon error we automatically delete the guilty file descriptor.
39 //
40 // IMPORTANT - use ine AioManager per phisical disk !!
41 //
42 #ifndef __COMMON_IO_FILE_AIO_FILE__
43 #define __COMMON_IO_FILE_AIO_FILE__
44
45 #include <aio.h>
46
47 #include <whisperlib/net/base/selector.h>
48 #include <whisperlib/common/base/types.h>
49 #include <whisperlib/common/base/callback.h>
50 #include <whisperlib/common/sync/thread.h>
51 #include <whisperlib/common/sync/producer_consumer_queue.h>
52
53 namespace io {
54
55 class AioManager {
56  public:
57   AioManager(const char* name, net::Selector* selector);
58   ~AioManager();
59
60   //////////////////////////////////////////////////////////////////////
61
62   // The interface is done through this structure.
63   struct Request {
64     const int fd_;       // a request regarding this file
65     int64 offset_;       // offset where to perform operations
66                          // *** IMPORTANT:  MUST be disk block alligned ***
67     void* buffer_;       // memory buffer involved in the operation
68                          // for write we write here, for read we put data
69                          // here
70                          // *** IMPORTANT:  MUST be allocated at disk block
71                          //     size alligned memory ***
72     int32 size_;         // size of buffer_
73                          // *** IMPORTANT:  MUST be a multiple of disk block
74                          //     size alligned memory ***
75     Closure* closure_;   // we run this closure in the 'selector_' thread
76                          // upon request completion
77     int errno_;          // error status of the request 0 meand OK
78     int result_;         // result associated w/ the operation
79                          // for read: the number of bytes read from the file
80                          // for write: the number of bytes written to the file
81     Request(int fd, int64 offset, void* buffer, int32 size, Closure* closure)
82       :  fd_(fd),
83          offset_(offset),
84          buffer_(buffer),
85          size_(size),
86          closure_(closure),
87          errno_(0),
88          result_(0) {
89     }
90     struct aiocb* PrepareAioCb(struct aiocb* p, int lio_opcode) const;
91   };
92
93   //////////////////////////////////////////////////////////////////////
94
95   // Initiate a read request, as instructed by 'req'
96   void Read(Request* req);
97
98   // Initiate a write request, as instructed by 'req'
99   void Write(Request* req);
100
101   const string& name() const { return name_; }
102
103   // For passing messages between different threads ..
104   typedef synch::ProducerConsumerQueue<Request*> ReqQueue;
105
106  private:
107   enum Op {
108     OP_READ = 0,
109     OP_WRITE,
110     NUM_OPS,
111   };
112   // We process results from response_queue_ here
113   void ProcessResponses();
114
115   // We have threads for :
116   //  OPERATIONS: OP_READ     OP_WRITE
117   //  SIZE:
118   //       size < 8 block
119   //       8 blocks <= size < 32 blocks
120   //       32 blocks <= size
121   //
122   // TODO(cpopescu): !!! Parametrize !!!
123   static const int32 kBlockSize = 4096;     // most standard..
124   static const int32 kBlockSizeShift = 12;  // i.e. 2^12 = 4096
125   static const int32 kNumBlockTypes = 3;
126
127   int32 SizePool(size_t size) const {
128     const size_t nblocks = size >> kBlockSizeShift;
129     return ((nblocks >> 3) != 0 ) + ((nblocks >> 6) != 0);
130   }
131   int32 OpPool(Op op) const {
132     return static_cast<int32>(op);
133   }
134
135   const string name_;   // you can give a name to this guy..
136   net::Selector* const selector_;   // all requests should come through
137                                     // this selector thread
138
139   // We run one thread per operation x request size bucket
140   thread::Thread* aio_threads_[NUM_OPS * kNumBlockTypes];
141
142
143   // These threads read their requests through a reques queue..
144   ReqQueue* request_queues_[NUM_OPS * kNumBlockTypes];
145   // .. and write back the responses to the response thread through this
146   ReqQueue response_queue_;
147
148   // this guy reads responses from 'response_queue_' and calls the
149   // corresponding closures in the selector
150   thread::Thread response_thread_;
151
152   // TODO(cpopescu):  More statistics then that ugly log
153
154   DISALLOW_EVIL_CONSTRUCTORS(AioManager);
155 };
156 }
157 #endif  // __COMMON_IO_FILE_AIO_FILE__
Note: See TracBrowser for help on using the browser.