root/trunk/whisperlib/net/rpc/lib/client/rpc_client_connection_tcp.h

Revision 7, 9.7 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #ifndef __NET_RPC_LIB_CLIENT_RPC_CLIENT_CONNECTION_TCP_H__
33 #define __NET_RPC_LIB_CLIENT_RPC_CLIENT_CONNECTION_TCP_H__
34
35 #include <string>
36 #include <map>
37 #include <whisperlib/common/sync/event.h>
38 #include <whisperlib/net/base/selector.h>
39 #include <whisperlib/net/base/connection.h>
40 #include <whisperlib/net/rpc/lib/types/rpc_all_types.h>
41 #include <whisperlib/net/rpc/lib/types/rpc_message.h>
42 #include <whisperlib/net/rpc/lib/codec/rpc_codec.h>
43 #include <whisperlib/net/rpc/lib/rpc_version.h>
44 #include <whisperlib/net/rpc/lib/client/irpc_client_connection.h>
45
46 namespace rpc {
47 // The TCP connection from client side working as transport layer for
48 // RPC calls & results.
49 // Can send & receive rpc::Message packets.
50
51 class ClientConnectionTCP
52     : public rpc::IClientConnection {
53  private:
54   enum HANDSHAKE_STATE {
55     NOT_INITIALIZED,   // initial state: handshake not started
56     WAITING_RESPONSE,  // request sent, waiting response
57     CONNECTED,         // response received and verified.
58                        // Connection estabilished.
59     FAILURE,           // some error occured in handshake procedure.
60                        // For more information call Error();
61   };
62  public:
63   // @param: success state. Call Error()
64   typedef Callback1<bool> OpenCallback;
65
66   //////////////////////////////////////////////////////////////
67   //
68   //        Methods available to any external thread.
69   //
70   ClientConnectionTCP(net::Selector& selector,
71                       net::NetFactory& net_factory,
72                       net::PROTOCOL net_protocol,
73                       const net::HostPort& remote_addr,
74                       rpc::CODEC_ID codec_id,
75                       int64 open_timeout_ms = 20000,
76                       uint32 max_paralel_queries = 100);
77   virtual ~ClientConnectionTCP();
78
79   //////////////////////////////////////////////////////////////////////
80   //
81   //              Generic connection management
82   //
83
84   //  Connect to remote address (ctor param) and execute RPC handshake.
85   //  If open_callback == NULL this method blocks untill TCP connect
86   //   and RPC handshake are completed or the timeout expires.
87   //  If open_callback != NULL this method returns immediately and the
88   //   open_callback will be called after RPC handshake completes or
89   //   the timeout expires.
90   //
91   //  //This is a one shot method. Calling Open a second time gives
92   //  // unpredictable results.
93   //
94   //  In BLOCKING mode, while Open is in progress, you cannot delete
95   //   the connection.
96   //  In NON-BLOCKING mode, you can delete the connection anytime. And
97   //   the completion callback will be immediately called with fail status.
98   //
99   // input:
100   //  open_callback: callback to be run after Open completes or fails.
101   //                 In selector thread context.
102   //                 If you wish to destroy the connection in callback then
103   //                 DO NOT call delete, use selector.DeleteInSelectLoop(..).
104   //
105   // return:
106   //  If open_callback == NULL (i.e. BLOCKING):
107   //   - true = success.
108   //   - false = failure. Call Error() for aditional information.
109   //  If open_callback != NULL (i.e. UNBLOCKING):
110   //   - true = connect & handshake in progress, the open_callback will
111   //            be called after connect & handshake completes/fails,
112   //            or the timeout expires.
113   //            The callback indicates the Open success status;
114   //            you can also check IsOpen().
115   //   - false = something went bad right from the begining,
116   //             The open_callback won't be called, but will be destroyed if
117   //             non-permanent. Check Error().
118   //
119   //  Possible failure reasons are:
120   //   - unable to connect to remote host. SYSTEM_ERROR: ECONNREFUSED
121   //   - handshake failed. RPC_HANDSHAKE_FAILED
122   //     Bad version, bad codec, invalid data, or timeout waiting for hand
123   //     reply.
124   //   - open_timeout_ expired
125   //
126   bool Open(OpenCallback* open_callback = NULL);
127
128   // Test if tcp is alive & RPC handshake is ok.
129   bool IsOpen() const;
130
131   // Close RPC connection.
132   void Shutdown(synch::Event* signal_me_when_done = NULL);
133
134   //////////////////////////////////////////////////////////////////////
135   //
136   //          rpc::IClientConnectin interface methods
137   //
138
139  protected:
140   // [THREAD SAFE]
141   //  Forward to SendPacket(..).
142   // NOTE: "p" was dynamically allocated and must be deleted.
143   virtual void Send(const rpc::Message* p);
144
145   // [THREAD SAFE]
146   //  Cancel the send of the corresponding packet.
147   virtual void Cancel(uint32 xid);
148
149  protected:
150   //////////////////////////////////////////////////////////////
151   //
152   //     Methods available only from the selector thread.
153   //
154
155   //////////////////////////////////////////////////////////////////////
156   //
157   //             RPC connection control callbacks
158   //
159
160   // Initiates the opening operation.
161   void StartOpen();
162   // Open ended. This happens on Connect error or Handshake error
163   // or Handshake succeeded.
164   void EndOpen(const char* err = NULL);
165
166   // starts a TCP connect to the given address
167   void StartConnect();
168   // Connect ended. If "err" not NULL, it will be set as last error_.
169   void EndConnect(const char* err = NULL);
170
171   // starts the handshake procedure. The connection must be connected.
172   void StartHandshake();
173   // End handshake process with the given error "err".
174   void EndHandshake(HANDSHAKE_STATE state, const char* err = NULL);
175
176   // Exchange RPC handshake on the internal connected BufferedConnection.
177   // After this function returns, check handshake_state_ for completion,
178   // in progress or error status.
179   void DoHandshake();
180
181   // [THREAD SAFE]
182   //  Serialize and send given packet.
183   //  If disconnected, this method starts connect and delays send until after
184   //  connect completed. If Send fails, we call NotifySendFailed(p).
185   // NOTE: "p" was dynamically allocated and we take ownership of it here.
186   void SendPacket(const rpc::Message* p);
187
188   //  net::Connection methods
189   bool ConnectionReadHandler();
190   bool ConnectionWriteHandler();
191   void ConnectionConnectHandler();
192   void ConnectionCloseHandler(int err, net::NetConnection::CloseWhat what);
193
194  private:
195   void TimeoutHandler(int64 timeout_id);
196
197  private:
198   net::Selector& selector_;
199
200   net::NetFactory& net_factory_; // used to create the transport layer
201
202   net::NetConnection* net_connection_; // the underlying transport layer
203
204   static const int kOpenEvent = 1;
205   static const int HANDSHAKE_RANDOM_SIZE = 32;  // protocol constant
206
207   // random generated data, used in handshake.
208   uint8 kHandshakeClientRandomData[HANDSHAKE_RANDOM_SIZE];
209
210   HANDSHAKE_STATE handshake_state_;
211
212   // we are connected to this guy:
213   const net::HostPort remote_addr_;
214   // Milliseconds timeout for the Open process (TCP connect + handshake)
215   const int64 open_timeout_ms_;
216   // queries exceeding this number will fail with RPC_TOO_MANY_QUERIES
217   const uint32 max_paralel_queries_;
218
219   synch::Event open_completed_;        // signal TCP connect & handshake done
220
221   // True if open is in progress.
222   bool is_opening_;
223
224   // True if shutdown is executing.
225   // Tells ConnectionCloseHandler he is invoked as a consequence of Shutdown,
226   // so it should not call Shutdown recursively.
227   bool shutdown_is_executing_;
228
229   struct ClientQuery {
230     const rpc::Message* msg_;    // the whole message to be sent, we own it
231     io::MemoryStream buf_;     // contains the serialized message
232     explicit ClientQuery(const rpc::Message* msg)
233         : msg_(msg), buf_() {
234     }
235     ~ClientQuery() {
236       delete msg_;
237       msg_ = NULL;
238     }
239   };
240   typedef map<uint32, ClientQuery*> ClientQueryMap;
241   // Map of pending queries, to be sent to remote address.
242   // While the connection is established these are popped one by one
243   // in active_query_ and written to network.
244   ClientQueryMap queries_;
245
246   // The current query being written to network.
247   ClientQuery* active_query_;
248
249   // synchronization is achieved by accessing queries_ from selector thread only
250
251  private:
252   // called when asynchronous Open completes:
253   //  - error in connect
254   //  - error in handshake
255   //  - handshake succeeded = SUCCESS
256   //  - timeout
257   OpenCallback* open_callback_;
258
259   net::Timeouter timeouter_;
260
261  private:
262   DISALLOW_EVIL_CONSTRUCTORS(ClientConnectionTCP);
263 };
264 }
265 #endif  // __NET_RPC_LIB_CLIENT_RPC_CLIENT_CONNECTION_TCP_H__
Note: See TracBrowser for help on using the browser.