root/trunk/whisperlib/net/rpc/lib/server/rpc_server_connection.h

Revision 7, 8.4 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #ifndef __NET_RPC_LIB_SERVER_RPC_SERVER_CONNECTION_H__
33 #define __NET_RPC_LIB_SERVER_RPC_SERVER_CONNECTION_H__
34
35 #include <string>
36 #include <whisperlib/common/base/types.h>
37 #include <whisperlib/common/io/buffer/io_memory_stream.h>
38 #include <whisperlib/common/sync/mutex.h>
39 #include <whisperlib/net/base/connection.h>
40 #include <whisperlib/net/rpc/lib/types/rpc_loggable.h>
41 #include <whisperlib/net/rpc/lib/types/rpc_all_types.h>
42 #include <whisperlib/net/rpc/lib/server/irpc_result_handler.h>
43 #include <whisperlib/net/rpc/lib/server/irpc_async_query_executor.h>
44
45 // This is the TCP connection from server side working as transport layer
46 // for RPC calls & results. Can send & receive rpc::Message packets.
47
48 namespace rpc {
49
50 class Server;
51 class ServerConnection
52     : public rpc::IResultHandler,
53       public rpc::Loggable {
54
55  public:
56   ServerConnection(net::Selector* selector,
57                    bool auto_delete_on_close,
58                    net::NetConnection* net_connection,
59                    rpc::IAsyncQueryExecutor& queryExecutor);
60   ~ServerConnection();
61
62  protected:
63   //  Helper methods for synchronized incrementing and decrementing of
64   //  the expectedWriteReplyCalls_ .
65   // The working pattern is:
66   //   - an external thread(worker) sending a packet: increments
67   //     the expectedWriteReplyCalls_ and queues a closure in selector
68   //     (which does the actual send to network)
69   //   - the closure: sends the packet, decrements the expectedWriteReplyCalls_,
70   //     and if the connection is closed and there are no more
71   //     expectedWriteReplyCalls it deletes the connection according
72   //     to autoCloseOnDelete_ flag.
73   void IncExpectedWriteReplyCalls();
74   void DecExpectedWriteReplyCallsAndPossiblyDeleteConnection();
75
76  public:
77   //////////////////////////////////////////////////////////////
78   //
79   //  Methods available to any external thread (worker threads).
80   //
81
82   // create a reply rpc-packet, and queue a closure to send it.
83   void WriteReply(uint32 xid,
84                   rpc::REPLY_STATUS status,
85                   const io::MemoryStream& result);
86
87   //  Queue a closure to send the given packet to the network.
88   //  The selector does the encoding.
89   //  NOTE: this saves time in the calling thread, loading the selector.
90   // input:
91   //   p: the packet to be sent. Must be dynamically allocated.
92   void WriteWithEncodeInSelector(const rpc::Message* p);
93
94   //  Encode the packet in a dynamically allocate a memory stream,
95   //  then queues a closure to send the stream from selector thread.
96   //  NOTE: this saves time in selector, loading the current thread.
97   void WriteWithEncodeNow(const rpc::Message& p);
98
99
100  protected:
101   //////////////////////////////////////////////////////////////
102   //
103   //     Methods available only from the selector thread.
104   //
105
106   //  Sends the given RPC packet to underlying BufferedConnection.
107   //  Does not wait for a response, not event for a complete write
108   //  (the data to send is put in BufferedConnection's output buffer).
109   // input:
110   //   msg: the rpc message to be sent. Must be dynamically allocated& will
111   //        be automatically deleted.
112   void CallbackSendRPCPacket(const rpc::Message* msg);
113
114   //  Sends all the data in the given memory stream to the network
115   // input:
116   //   ms: the stream containing the data to be sent.
117   //       Must be dynamically allocated & will be automatically deleted.
118   void CallbackSendData(const io::MemoryStream* ms);
119
120   //////////////////////////////////////////////////////////////////////
121   //
122   //             net::BufferedConnection methods
123   //
124   //  Handle incoming data.
125   //  Use net::BufferedConnection::HandleRead() to buffer incoming data
126   //  then process data in net::BufferedConnection::inbuf()
127   // returns:
128   //   true - data successfully read. Even if an incomplete packet was received.
129   //   false - error reading data from network. The caller will close
130   //           the connection immediately on return.
131   virtual void XHandleRead() {
132   }
133   bool ConnectionReadHandler();
134
135   virtual void XHandleWrite() {
136   }
137   bool ConnectionWriteHandler();
138   virtual void XHandleConnect() {
139   }
140   virtual void XHandleError() {
141   }
142   void ConnectionCloseHandler(int err, net::NetConnection::CloseWhat what);
143   virtual void XHandleTimeout() {
144   }
145   virtual void XHandleAccept() {
146   }
147
148   //////////////////////////////////////////////////////////////////////
149   //
150   //                net::Connection methods
151   //
152
153   //////////////////////////////////////////////////////////////////////
154   //
155   //  Methods available to any external thread (worker threads).
156   //
157
158   //////////////////////////////////////////////////////////////////////
159   //
160   //          rpc::IResultHandler interface methods
161   //
162   // Called by the execution system (possibly a worker) to announce
163   // the result of a query.
164   void HandleRPCResult(const rpc::Query& q);
165
166   //////////////////////////////////////////////////////////////////////
167   //
168   //              rpc::Loggable interface methods
169   //
170   // Returns a description of this connection. Good for logging.
171   string ToString() const;
172
173  private:
174   net::Selector * selector_;
175
176   // we own the underlying TCP connection
177   net::NetConnection * net_connection_;
178
179   // Buffer used by the Write function to serialize every packet before
180   // sending it to network.
181   io::MemoryStream cachedPacketBuffer_;
182
183   // Synchronize access to cachedPacketBuffer_ . The execution model is
184   // asynchronous, so query results may arrive & be sent back to client
185   // anytime anyorder.
186   synch::Mutex syncCachedPacketBuffer_;
187
188   enum HANDSHAKE_STATE {
189     HS_WAITING_REQUEST = 0,
190     HS_WAITING_RESPONSE = 1,
191     HS_CONNECTED = 2,
192     HS_FAILURE = 3,
193   };
194
195   HANDSHAKE_STATE handshakeState_;
196
197   enum {
198     HANDSHAKE_RANDOM_SIZE = 32,   // protocol constant
199   };
200
201   // random generated data, used in handshake.
202   uint8 handshakeServerRandomData_[HANDSHAKE_RANDOM_SIZE];
203
204   // the query executor. Estabilished in constructor.
205   rpc::IAsyncQueryExecutor& asyncQueryExecutor_;
206
207   // true if we're registered in the executor.
208   bool registeredToQueryExecutor_;
209
210   // the codec used by this connection. It is estabilished in the hanshake.
211   rpc::Codec* codec_;
212
213   // the number of WriteWith... closures queued in selector and not run yet
214   uint32 expectedWriteReplyCalls_;
215
216   // synchronize access expectedWriteReplyCalls_
217   synch::Mutex accessExpectedWriteReplyCalls_;
218
219   bool auto_delete_on_close_;
220
221   // Does the initial handshake.
222   // input:
223   //  in: contains client data. There may be less than a full handshake message
224   //      bytes in the input stream, in which case the method should do nothing.
225   //      When more data arrives the method is called again, and the
226   //      input stream will contain more (old+new) data.
227   void ProtocolHandleHandshake(io::MemoryStream& in);
228
229   // Execute client query and send back the result.
230   void ProtocolHandleMessage(const rpc::Message& msg);
231
232   DISALLOW_EVIL_CONSTRUCTORS(ServerConnection);
233 };
234 }
235 #endif   // __NET_RPC_LIB_SERVER_RPC_SERVER_CONNECTION_H__
Note: See TracBrowser for help on using the browser.