root/trunk/whisperlib/net/rpc/lib/client/irpc_client_connection.h

Revision 7, 9.9 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #ifndef __NET_RPC_LIB_CLIENT_IRPC_CLIENT_CONNECTION_H__
33 #define __NET_RPC_LIB_CLIENT_IRPC_CLIENT_CONNECTION_H__
34
35 #include <string>
36 #include <map>
37 #include <whisperlib/common/base/callback.h>
38 #include <whisperlib/common/sync/mutex.h>
39 #include <whisperlib/common/sync/event.h>
40 #include <whisperlib/net/base/selector.h>
41 #include <whisperlib/net/base/timeouter.h>
42 #include <whisperlib/net/rpc/lib/types/rpc_message.h>
43 #include <whisperlib/net/rpc/lib/codec/rpc_codec.h>
44
45 // This is an interface for all client side RPC transport connections.
46 // The interface is capable of sending & receiving RPCMessage through
47 // implementation specific transports.
48
49 namespace rpc {
50
51 enum CONNECTION_TYPE {
52   CONNECTION_TCP,
53   CONNECTION_HTTP,
54   CONNECTION_FAILSAFE_HTTP,
55 };
56 const char* ConnectionTypeName(CONNECTION_TYPE connection_type);
57
58 class IClientConnection {
59  public:
60   IClientConnection(net::Selector& selector,
61                     rpc::CONNECTION_TYPE connection_type,
62                     rpc::CODEC_ID codec_id);
63   virtual ~IClientConnection();
64
65   // Returns a description of the last error.
66   const string& Error() const {
67     return error_;
68   }
69   // Returns the transport type.
70   rpc::CONNECTION_TYPE GetConnectionType() const {
71     return connection_type_;
72   }
73
74   // Retrieve connection codec.
75   rpc::Codec& GetCodec() const {
76     return *codec_;
77   }
78
79  protected:
80   typedef Callback3<uint32, rpc::REPLY_STATUS, const io::MemoryStream&>
81     ResponseCallback;
82
83   // [THREAD SAFE]
84   //  Generates consecutive XIDs.
85   uint32 GenerateNextXID();
86
87   // [THREAD SAFE]
88   //  Create a rpc::Message call from the given params,
89   //  serialize it and send it over network.
90   void SendQuery(uint32 xid,
91                  const std::string& service,
92                  const std::string& method,
93                  io::MemoryStream& params);
94
95   // [THREAD SAFE]
96   //  Tells implementation to serialize and send given packet.
97   //  "p" was dynamically allocated, implementation takes ownership of it.
98   //  If Send fails, you should call HandleSendError(p).
99   //  Call Error() for an error description.
100   virtual void Send(const rpc::Message* p) = 0;
101
102   // [THREAD SAFE]
103   //  Tells implementation to cancel the Send of the corresponding packet.
104   //  If the packet was sent already, or partially sent, leave it sent, and
105   //  the answer will be ignored.
106   //  This method is only an optimization. Called when a query timed out.
107   virtual void Cancel(uint32 xid) = 0;
108
109   // [THREAD SAFE]
110   //  Called by the implementation, after a rpc::Message is received and
111   //  decoded.
112   // input:
113   //  p: the received message. Dynamically allocated, will be automatically
114   //      deleted here.
115   void HandleResponse(const rpc::Message* p);
116
117   // [THREAD SAFE]
118   //  Called by the implementation if it's unable to send an rpc::Message.
119   //  It may happen because of Send timeout or internal fail.
120   // input:
121   //  p: the message from Send. We take ownership of it.
122   //  status: describes the failure.
123   void NotifySendFailed(const rpc::Message* p, rpc::REPLY_STATUS status);
124
125   // [THREAD SAFE]
126   // Called by the implementation, when the underlying connection was closed.
127   // Useful to notify all waiting RPC responses that there will be no response.
128   // This is just an optimization, as the waiting queries will be completed
129   // on timeout by default.
130   void NotifyConnectionClosed();
131
132   // [THREAD SAFE]
133   //  Called by the timeouter_ (selector thread) when a timeout occurs.
134   //  Here we complete the query (the given xid) by RPC_QUERY_TIMEOUT status.
135   void NotifyTimeout(int64 xid);
136
137  private:
138   // [NOT Thread safe] Call this ONLY with sync_response_map_ LOCKED !
139   void AddResponseCallbackNoSync(uint32 xid,
140                                  ResponseCallback * response_callback);
141   // [THREAD SAFE]
142   ResponseCallback * PopResponseCallbackSync(uint32 xid);
143
144   // [THREAD SAFE] All.
145   //  These methods are needed because we have to synchronize with selector
146   //  thread.
147   void AddTimeout(int64 xid, int64 timeout);
148   void ClearTimeout(int64 xid);
149   void ClearTimeouts();
150
151  public:
152   // [THREAD SAFE]
153   //  Send a remote RPC call packet, wait for RPC reply packet with timeout,
154   //  return call result and status. If timeout expires a RPC_TIMEOUT status
155   //  is returned. This methods implements a synchronous rpc.
156   // input:
157   //  [IN]  service: remote service whose method is to be invoked. Plain text.
158   //  [IN]  method:  call method name. Plain text.
159   //  [IN]  params:  encoded list of parameters to be passed to the
160   //                 remote invoke.
161   //  [IN]  timeout: milliseconds to wait for server reply.
162   //  [OUT] status:  Call status. A successfully call returns RPC_SUCCESS.
163   //                 The rest are errors: errors that appear on the server side
164   //                 during call execution (like server out of memory,
165   //                 or bad parameters), or errors in transport layer
166   //                 (connection error, timeout waiting for server reply).
167   //                 In case of error the return value isn't what you expected:
168   //                 it may be rpc::Void or a rpc::String containing a
169   //                 description of the error.
170   //  [OUT] result: receives the encoded return value of the remote method call
171   //              only if the call succeeds (i.e. status == RPC_SUCCESS).
172   //              If the call failed, the result is either empty or contains
173   //              a RPC::String description of the error.
174   //              The caller should know what type of return value is expected.
175   //   Internal errors:
176   //         - failure of the RPC system to send the query,
177   //         - invalid answer from server,
178   //         - timeout in server response
179   void Query(const std::string& service,
180              const std::string& method,
181              io::MemoryStream& params,
182              uint32 timeout,
183              rpc::REPLY_STATUS& status,
184              io::MemoryStream& result);
185
186   // [THREAD SAFE]
187   //  Sends a remote RPC call packet, and does not wait for the result.
188   //  The result will be asynchronously delivered to the the
189   //  response_callback callback.
190   //  If timeout expires a RPC_TIMEOUT status is sent to the
191   //  response_callback callback.
192   //  This methods implements an asynchronous rpc.
193   // returns:
194   //  query ID. Useful for CancelQuery.
195   //
196   uint32 AsyncQuery(const std::string& service,
197                     const std::string& method,
198                     io::MemoryStream& params,
199                     uint32 timeout,
200                     ResponseCallback* response_callback);
201
202   // [THREAD SAFE]
203   //  Complete synchronous or asynchronous query.
204   //  Calls the ResultCallback delivering the given status & result.
205   void CompleteQuery(uint32 qid, rpc::REPLY_STATUS status,
206                      const io::MemoryStream & result);
207
208   // [THREAD SAFE]
209   //  Complete all synchronous and asynchronous queries.
210   //  Calls the ResultCallback delivering the given status & an empty result.
211   void CompleteAllQueries(rpc::REPLY_STATUS status);
212
213   // [THREAD SAFE]
214   //  Cancel synchronous or asynchronous query.
215   //  The ResultCallback is NOT called, but just deleted.
216   void CancelQuery(uint32 qid);
217
218   // [THREAD SAFE]
219   //  Cancel all pending queries, both synchronous and asynchronous.
220   //  The ResultCallback is NOT called, but just deleted.
221   void CancelAllQueries();
222
223  protected:
224   net::Selector& selector_;
225
226   rpc::CONNECTION_TYPE connection_type_;  // identifies the transport layer
227   rpc::Codec* codec_;                     // the codec used. Allocated locally.
228
229   string error_;                          // description of the last error
230
231   uint32 next_xid_;              // for generating consecutive transaction IDs
232   synch::Mutex sync_next_xid_;   // synchronize access to next_xid_
233
234   // map of pending queries response_callbacks, by xid
235   typedef map<uint32, ResponseCallback*> ResponseCallbackMap;
236   ResponseCallbackMap response_map_;
237   synch::Mutex sync_response_map_;
238
239   // used to set timeouts on Query and AsyncQuery
240   // WARNING: the Timeouter MUST be used in Selector thread ONLY!
241   net::Timeouter timeouter_;
242   synch::Event done_clear_timeouts_;
243
244   // signal the completion of CancelQuery or CancelAllQueries.
245   // This must be auto-reset events, as multiple threads may call CancelQuery.
246   synch::Event done_cancel_query_;
247   synch::Event done_cancel_all_queries_;
248
249  private:
250   DISALLOW_EVIL_CONSTRUCTORS(IClientConnection);
251 };
252 }
253 #endif   // __NET_RPC_LIB_CLIENT_IRPC_CLIENT_CONNECTION_H__
Note: See TracBrowser for help on using the browser.