root/trunk/whisperlib/net/rpc/lib/client/rpc_service_wrapper.h

Revision 7, 10.7 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #ifndef __NET_RPC_LIB_CLIENT_RPC_SERVICE_WRAPPER_H__
33 #define __NET_RPC_LIB_CLIENT_RPC_SERVICE_WRAPPER_H__
34
35 #include <stdarg.h>
36 #include <string>
37 #include <map>
38 #include <whisperlib/common/base/callback.h>
39 #include <whisperlib/net/rpc/lib/types/rpc_all_types.h>
40 #include <whisperlib/net/rpc/lib/client/irpc_client_connection.h>
41
42 namespace rpc {
43 // The return of any RPC call.
44 template <typename RET>
45 struct CallResult {
46   bool success_;      // success status
47   string error_;      // error description (if success == false)
48   RET result_;        // RPC return value
49 };
50 // an identifier for asynchronous RPC calls.
51 typedef int64 CALL_ID;
52
53 // A base class for calling remote service methods.
54 // It takes a IRPCClientConnection implementation (as transport layer) and
55 // it offers a 2 simple function:
56 //  - void Call(method, encoded_params, result) which
57 //      sends a RPC query to the underlying RPCConnection,
58 //      waits(with timeout) for response, decodes and returns the result.
59 //  - void AsyncCall(method, encoded_params, callback) which
60 //      sends a RPC query to the underlying RPCConnection,
61 //      and returns immediately. The SUCCESS or TIMEOUT result
62 //      is asynchronously delivered to the provided "callback".
63 //      You may also cancel an executing AsyncCall.
64 class ServiceWrapper {
65  public:
66   // input:
67   //   rpcConnection: a previously created rpc client connection.
68   //                  The wrapper uses this connection to invoke
69   //                  remote service methods.
70   //   serviceName: the name of the service you wish to wrap.
71   ServiceWrapper(IClientConnection& rpc_connection,
72                  const string& service_class_name,
73                  const string& service_instance_name);
74
75   virtual ~ServiceWrapper();
76
77   //  Sets the timeout expressed in milliseconds for both synchronous and
78   //  asynchronous calls.
79   // input:
80   //   timeout: milliseconds to wait for a remote call return.
81   void SetCallTimeout(uint32 timeout);
82
83   // Returns the current timeout value.
84   uint32 GetCallTimeout() const;
85
86   // Returns the instance name of the remote service.
87   const std::string& GetServiceName() const;
88   // Returns the class name of the remote service.
89   const std::string& GetServiceClassName() const;
90
91   // Returns the codec used in the underlying connection.
92   Codec& GetCodec() const;
93
94  private:
95   CALL_ID MakeCallId(uint32 qid) {
96     return static_cast<CALL_ID>(qid);
97   }
98   uint32 MakeQid(CALL_ID call_id) {
99     return static_cast<uint32>(call_id);
100   }
101
102   void AddResultCallbackNoSync(CALL_ID call_id,
103                               Callback* callback) {
104     bool s = result_callbacks_.insert(make_pair(call_id, callback)).second;
105     CHECK(s) << "Duplicate call_id: " << call_id;
106   }
107   Callback* PopResultCallbackSync(CALL_ID call_id) {
108     synch::MutexLocker lock(&sync_);
109     ResultCallbackMap::iterator it = result_callbacks_.find(call_id);
110     if ( it == result_callbacks_.end() ) {
111       return NULL;
112     }
113     Callback* result_handler = it->second;
114     result_callbacks_.erase(it);
115     return result_handler;
116   }
117
118   // callback: the connection deliver results here
119   template <typename RET>
120   void HandleCallResult(std::string method,
121                         uint32 qid,
122                         rpc::REPLY_STATUS status,
123                         const io::MemoryStream& cresult) {
124     ////////////////////////////////////////////////////////
125     // Find result callback
126     //
127     const CALL_ID call_id = MakeCallId(qid);
128     Callback* result_callback = PopResultCallbackSync(call_id);
129     if ( result_callback == NULL ) {
130       LOG_ERROR << "No ResultCallback for call_id: " << call_id;
131       return;
132     }
133     typedef Callback1<const CallResult<RET>& > CallResultCallback;
134     CallResultCallback* call_result_callback =
135       reinterpret_cast<CallResultCallback*>(result_callback);
136
137     ////////////////////////////////////////////////////////
138     // decode result and deliver it to the client result callback
139     //
140     io::MemoryStream& result = const_cast<io::MemoryStream&>(cresult);
141     result.MarkerSet();
142
143     CallResult<RET> call_result;
144     call_result.success_ = false;   // assume the worst
145
146     do {
147       if ( status != RPC_SUCCESS )  {
148         call_result.success_ = false;
149         call_result.error_ = ReplyStatusName(status);
150         LOG_ERROR << "rpc::Call failed for method: '" << method << "'"
151                   << " on service '" << GetServiceName() << "'."
152                   << " Returned Status: " << ReplyStatusName(status);
153         if ( result.Size() > 0 ) {
154           rpc::String reason;
155           if ( GetCodec().Decode(result, reason) == DECODE_RESULT_SUCCESS ) {
156             call_result.error_ += string(" Hint: ") + reason.CStr();
157             LOG_ERROR << "Call failed hint: " << reason;
158           }
159         }
160         break;
161       }
162
163       // decode returned value
164       //
165       if ( GetCodec().Decode(result, call_result.result_) ==
166            DECODE_RESULT_ERROR ) {
167         DLOG_ERROR << "RPC cannot decode return value as " << RET::Name()
168                    << ". Bytes: " << result.DebugString();
169         call_result.success_ = false;
170         call_result.error_ =
171             "Error decoding data, the server returned a wrong type";
172         break;
173       }
174       call_result.success_ = true;
175     } while ( false );
176
177     result.MarkerRestore();
178
179     // call the external result handler
180     call_result_callback->Run(call_result);
181   }
182
183   // Helper for synchronous calls.
184   // Basically we use an asynchronous call, catch the result here, and signal
185   // the waiting thread.
186   template <typename RET>
187   void ReceiveSynchronousResult(synch::Event* received,
188                                 CallResult<RET>* out,
189                                 const CallResult<RET>& in) {
190     *out = in;
191     received->Signal();
192   }
193
194  public:
195   //  Call a remote method on the enclosed service. This method blocks at most
196   //  "call_timeout_" milliseconds waiting for remote call result.
197   // input:
198   //   [OUT] returnValue: the return value is copied here.
199   //   method: remote method name. Plain text.
200   //   params: contains remote method parameters encoded.
201   // return:
202   //   Success state. In order for the remote call to succeed
203   //   these requirements must be met:
204   //     - the wrapper must be connected (IsConnected() == true);
205   //     - the method name must exist;
206   //     - the parameters number and types must be correct;
207   //     - the return type must be the one expected.
208   //         The remote call must return the same object type as typename RET.
209   template <typename RET>
210   void Call(const std::string& method,
211             io::MemoryStream& params,
212             CallResult<RET>& return_value) {
213     // we wait call result on this event
214     synch::Event response_received(false, true);
215
216     AsyncCall(method, params,
217         NewCallback(this, &ServiceWrapper::ReceiveSynchronousResult<RET>,
218                           &response_received,
219                           &return_value));
220
221     // A response should be delivered either on success or on timeout.
222     response_received.Wait();
223   }
224
225   //  Call a remote method on the enclosed service. This method returns
226   //  immediately and the remote call result shall be asynchronously delivered
227   //  to the "handle_return" callback. The call has a timeout of "call_timeout_"
228   //  milliseconds.
229   //  You will get an asynchronous result either if call succeeds(server
230   //  responds) or if a timeout occurs.
231   // input:
232   //  method: remote method name. Plain text.
233   //  params: contains remote method parameters encoded according GetCodec()
234   //  handle_return: callback to be called when the server response arrives
235   //                 or a timeout occurs.
236   // return:
237   //  An identifier for this call. You may cancel the call using CancelCall
238   //  and providing the same id.
239   template <typename RET>
240   CALL_ID AsyncCall(const std::string& method,
241                     io::MemoryStream& params,
242                     Callback1<const CallResult<RET>&>* handle_return) {
243     CALL_ID call_id = 0;
244     {
245       synch::MutexLocker lock(&sync_);
246       const uint32 qid = rpc_connection_.AsyncQuery(
247                            GetServiceName(),
248                            method, params,
249                            GetCallTimeout(),
250                            NewCallback(this,
251                                        &ServiceWrapper::HandleCallResult<RET>,
252                                        std::string(method)));
253       call_id = MakeCallId(qid);
254       AddResultCallbackNoSync(call_id, handle_return);
255     }
256
257     return call_id;
258   }
259
260   // Cancel an asynchronous call based on call ID.
261   void CancelCall(CALL_ID call_id);
262
263   // Cancels all asynchronous call.
264   void CancelAllCalls();
265
266  private:
267   // the rpc-connection used by this service wrapper
268   IClientConnection& rpc_connection_;
269
270   // the name of the remote service for this wrapper
271   const string service_class_name_;
272   const string service_instance_name_;
273
274   // milliseconds timeout on calling a remote method
275   uint32 call_timeout_;
276
277   // asynchronous executing calls
278   // map: call id -> client callback
279   typedef std::map<CALL_ID, Callback*> ResultCallbackMap;
280   ResultCallbackMap result_callbacks_;
281
282   // synchronize result_callbacks_
283   synch::Mutex sync_;
284
285  private:
286   DISALLOW_EVIL_CONSTRUCTORS(ServiceWrapper);
287 };
288 }
289 #endif  // __NET_RPC_LIB_CLIENT_RPC_SERVICE_WRAPPER_H__
Note: See TracBrowser for help on using the browser.