root/trunk/whisperlib/net/rpc/lib/client/irpc_client_connection.cc

Revision 7, 13.5 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #include "common/base/errno.h"
33 #include "common/base/scoped_ptr.h"
34 #include "net/rpc/lib/client/irpc_client_connection.h"
35 #include "net/rpc/lib/codec/rpc_codec_factory.h"
36
37 namespace rpc {
38
39 const char* ConnectionTypeName(rpc::CONNECTION_TYPE connection_type) {
40   switch ( connection_type ) {
41     case rpc::CONNECTION_TCP:
42       return "tcp";
43     case rpc::CONNECTION_HTTP:
44       return "http";
45     default:
46       LOG_FATAL << "no such connection_type: " << connection_type;
47       return "unknown";
48   }
49 }
50
51 IClientConnection::IClientConnection(net::Selector& selector,
52                                      rpc::CONNECTION_TYPE connection_type,
53                                      rpc::CODEC_ID codec_id)
54   : selector_(selector),
55     connection_type_(connection_type),
56     codec_(rpc::CodecFactory::Create(codec_id)),
57     error_(),
58     next_xid_(1),
59     sync_next_xid_(),
60     response_map_(),
61     sync_response_map_(),
62     timeouter_(&selector_,
63                NewPermanentCallback(this, &IClientConnection::NotifyTimeout)),
64     done_clear_timeouts_(false, true),
65     done_cancel_query_(false, false),
66     done_cancel_all_queries_(false, false) {
67 }
68
69 IClientConnection::~IClientConnection() {
70   // The implementation(by deriving from us) is the first to be destroyed.
71   // So it MUST call NotifyConnectionClosed, triggering the completion
72   // of all pending queries.
73
74   /* TODO(cosmin): remove. See comment above.
75   // Cancel all pending queries. These are here because the
76   // connection may have been closed while waiting for rpc results.
77   CancelAllQueries();
78   */
79
80   CHECK(response_map_.empty())
81     << "#" << response_map_.size() << " calls pending";
82
83   delete codec_;
84   codec_ = NULL;
85 }
86
87 uint32 IClientConnection::GenerateNextXID() {
88   synch::MutexLocker lock(&sync_next_xid_);
89   return next_xid_++;
90 }
91
92 void IClientConnection::SendQuery(uint32 xid,
93                                   const std::string& service,
94                                   const std::string& method,
95                                   io::MemoryStream& params) {
96   rpc::Message* const p = new rpc::Message();
97
98   // fill in the header
99   rpc::Message::Header& header = p->header_;
100   header.xid_ = xid;
101   header.msgType_ = RPC_CALL;
102
103   // fill in call body
104   rpc::Message::CallBody& body = p->cbody_;
105   body.service_ = service;
106   body.method_ = method;
107   body.params_.AppendStreamNonDestructive(&params);
108
109   Send(p);
110 }
111
112 void IClientConnection::HandleResponse(const rpc::Message* msg) {
113   CHECK(selector_.IsInSelectThread());
114   CHECK_NOT_NULL(msg);
115   scoped_ptr<const rpc::Message> auto_del_msg(msg);
116
117   // check msg type correctness
118   if ( msg->header_.msgType_ != RPC_REPLY ) {
119     LOG_ERROR << "Received msgType="
120               << rpc::MessageTypeName(msg->header_.msgType_)
121               << " expected: " << rpc::MessageTypeName(RPC_REPLY);
122     return;
123   }
124
125   LOG_DEBUG << "Received packet: " << *msg;
126
127   const uint32 xid = msg->header_.xid_;
128   const rpc::REPLY_STATUS status =
129     static_cast<rpc::REPLY_STATUS>(msg->rbody_.replyStatus_.Get());
130   const io::MemoryStream& result = msg->rbody_.result_;
131   CompleteQuery(xid, status, result);
132
133   // - the msg is auto-deleted
134 }
135
136 void IClientConnection::NotifySendFailed(const rpc::Message* msg,
137                                          rpc::REPLY_STATUS status) {
138   CHECK(selector_.IsInSelectThread());
139   CHECK_NOT_NULL(msg);
140   CHECK_GE(status, 100);  // the status should be a client failure
141   scoped_ptr<const rpc::Message> auto_del_msg(msg);
142
143   // check msg type correctness
144   CHECK_EQ(msg->header_.msgType_ , RPC_CALL)
145       << "HandleSendError msgType="
146       << rpc::MessageTypeName(msg->header_.msgType_)
147       << " expected: " << rpc::MessageTypeName(RPC_CALL);
148
149   const uint32 xid = msg->header_.xid_;
150
151   // Remove the timeout on current xid, to avoid unnecessary HandleTimeout call.
152   // However, not removing it is not a bug. The HandleTimeout() may be executing
153   // right now.
154   ClearTimeout(xid);
155
156   // there should be a ResponseCallback for the failed message
157   //
158   ResponseCallback * response_callback = PopResponseCallbackSync(xid);
159   if ( !response_callback ) {
160     LOG_ERROR << "Cannot find a ResponseCallback, xid: " << xid;
161     return;
162   }
163
164   // Careful not to call response_callback or anything else in
165   // ServiceWrapper while keeping the sync_response_map_ locked.
166   // Because another thread may try to send something through
167   // the ServiceWrapper and then through us.
168
169   // deliver FAIL result through asynchronous callback
170   //
171   const io::MemoryStream empty_result;
172   response_callback->Run(xid, status, empty_result);
173
174   // the response_callback may destroy itself, but that's his business.
175 }
176 void IClientConnection::NotifyConnectionClosed() {
177   //
178   // By default: all pending queries will complete on timeout.
179   // But for performance and clarity: go through all pending queries and
180   // finish them with error response.
181   //
182   CompleteAllQueries(RPC_CONN_CLOSED);
183 }
184
185 void IClientConnection::NotifyTimeout(int64 xid) {
186   // cancel Send packet in implementation specific layer
187   // Not required, this is just an optimization.
188   Cancel(xid);
189
190   const io::MemoryStream empty_result;
191   CompleteQuery(xid, RPC_QUERY_TIMEOUT, empty_result);
192 }
193
194 void IClientConnection::AddResponseCallbackNoSync(
195     uint32 xid, ResponseCallback * response_callback) {
196   // WARNING! this method is not SYNCHRONIZED!
197   // Use it only with sync_response_map_ LOCKED!
198   const bool success = response_map_.insert(
199       make_pair(xid, response_callback)).second;
200   CHECK(success) << "The client is already waiting a response on xid=" << xid;
201 }
202
203 IClientConnection::ResponseCallback *
204 IClientConnection::PopResponseCallbackSync(uint32 xid) {
205   synch::MutexLocker lock(&sync_response_map_);
206   ResponseCallbackMap::iterator it = response_map_.find(xid);
207   if ( it == response_map_.end() ) {
208     return NULL;
209   }
210   ResponseCallback * response_callback = it->second;
211   response_map_.erase(it);
212   return response_callback;
213 }
214
215 void IClientConnection::AddTimeout(int64 xid, int64 timeout) {
216   if ( !selector_.IsInSelectThread() ) {
217     selector_.RunInSelectLoop(
218         NewCallback(this, &IClientConnection::AddTimeout, xid, timeout));
219     return;
220   }
221   timeouter_.SetTimeout(xid, timeout);
222 }
223
224 void IClientConnection::ClearTimeout(int64 xid) {
225   if ( !selector_.IsInSelectThread() ) {
226     selector_.RunInSelectLoop(
227         NewCallback(this, &IClientConnection::ClearTimeout, xid));
228     return;
229   }
230   timeouter_.UnsetTimeout(xid);
231 }
232
233 void IClientConnection::ClearTimeouts() {
234   if ( !selector_.IsInSelectThread() ) {
235     done_clear_timeouts_.Reset();
236     selector_.RunInSelectLoop(
237         NewCallback(this, &IClientConnection::ClearTimeouts));
238     if ( !done_clear_timeouts_.Wait(10000) ) {
239       LOG_ERROR << "Timeout waiting for ClearTimeouts()";
240     }
241     return;
242   }
243   timeouter_.UnsetAllTimeouts();
244   done_clear_timeouts_.Signal();
245 }
246
247 void __ReceiveSynchronousQueryResult(
248     synch::Event* received,
249     rpc::REPLY_STATUS* status_out, io::MemoryStream* result_out,
250     uint32 qid, rpc::REPLY_STATUS status_in, const io::MemoryStream& result) {
251   *status_out = status_in;
252   result_out->AppendStreamNonDestructive(&result);
253   received->Signal();
254 }
255
256 void IClientConnection::Query(const std::string& service,
257                               const std::string& method,
258                               io::MemoryStream& params,
259                               uint32 timeout,
260                               rpc::REPLY_STATUS& status,
261                               io::MemoryStream& result) {
262   CHECK(result.IsEmpty());
263   DLOG_DEBUG << "Query service=" << service
264              << " method=" << method
265              << " params=" << params.DebugString()
266              << " timeout=" << timeout;
267
268   synch::Event received(false, true);
269   AsyncQuery(service, method, params, timeout,
270       NewCallback(&__ReceiveSynchronousQueryResult,
271                   &received, &status, &result));
272   received.Wait();
273 }
274
275 uint32 IClientConnection::AsyncQuery(const std::string& service,
276                                      const std::string& method,
277                                      io::MemoryStream& params,
278                                      uint32 timeout,
279                                      ResponseCallback* response_callback) {
280   CHECK(response_callback);
281
282   LOG_DEBUG << "AsyncQuery service=" << service
283             << " method=" << method
284             << " params=" << params.DebugString()
285             << " timeout=" << timeout;
286
287   // generate transaction ID
288   const uint32 xid = GenerateNextXID();
289   {
290     synch::MutexLocker lock(&sync_response_map_);
291     AddResponseCallbackNoSync(xid, response_callback);
292     AddTimeout(xid, timeout);
293   }
294
295   // WARNING: SendQuery with sync_response_map_ UNLOCKED !
296   //          Because SendQuery may call HandleResponse (on FAIL) which locks
297   //          sync_response_map_ !
298   SendQuery(xid, service, method, params);
299   return xid;
300 }
301
302 void IClientConnection::CompleteQuery(uint32 qid,
303                                       rpc::REPLY_STATUS status,
304                                       const io::MemoryStream & result) {
305   // Remove the timeout on current xid, to avoid unnecessary HandleTimeout call.
306   // However, not removing it is not a bug. The HandleTimeout() may be executing
307   // right now.
308   ClearTimeout(qid);
309
310   ResponseCallback * response_callback = PopResponseCallbackSync(qid);
311   if ( !response_callback ) {
312     LOG_ERROR << "Cannot find a ResponseCallback, qid: " << qid;
313     return;
314   }
315
316   // deliver given status & result through asynchronous callback
317   //
318   response_callback->Run(qid, status, result);
319
320   // - the response_callback may auto-delete itself, but that's its business.
321 }
322
323 void IClientConnection::CompleteAllQueries(rpc::REPLY_STATUS status) {
324   ClearTimeouts();
325
326   // make a copy of response_map, so that we may call the response handler
327   // without locking the sync_response_map_
328   //
329   ResponseCallbackMap tmp;
330   {
331     synch::MutexLocker lock(&sync_response_map_);
332     tmp = response_map_;
333     response_map_.clear();
334   }
335
336   if ( !tmp.empty() ) {
337     LOG_ERROR << "CompleteAllQueries: completing "
338               << tmp.size() << " pending queries with status: "
339               << status << "(" << rpc::ReplyStatusName(status) << ")";
340   }
341
342   // deliver given status to all pending queries
343   //
344   const io::MemoryStream empty_result;
345   for ( ResponseCallbackMap::iterator it = tmp.begin();
346         it != tmp.end(); ++it) {
347     uint32 qid = it->first;
348     ResponseCallback & response_callback = *it->second;
349     response_callback.Run(qid, status, empty_result);
350   }
351 }
352
353 // Synchronizes with selector thread.
354 // Waits for selector thread to call this method, witch means the selector
355 // is not processing something else.
356 void WaitForSelectorThread(net::Selector * selector, synch::Event * ev = NULL) {
357   if ( !selector->IsInSelectThread() ) {
358     synch::Event sync_selector(false, true);
359     selector->RunInSelectLoop(
360         NewCallback(&WaitForSelectorThread, selector, &sync_selector));
361     sync_selector.Wait();
362     return;
363   }
364
365   if ( ev ) {
366     ev->Signal();
367   }
368 }
369
370 void IClientConnection::CancelQuery(uint32 qid) {
371   ClearTimeout(qid);
372
373   ResponseCallback * response_callback = PopResponseCallbackSync(qid);
374   if ( !response_callback ) {
375     LOG_ERROR << "Cannot find a ResponseCallback for qid: " << qid;
376     WaitForSelectorThread(&selector_);
377     return;
378   }
379
380   // Careful not to call response_callback or anything else in
381   // ServiceWrapper while keeping the sync_response_map_ locked.
382   // Because another thread may try to send something through
383   // the ServiceWrapper and then through us.
384
385   if ( !response_callback->is_permanent() ) {
386     delete response_callback;
387   }
388 }
389
390 void IClientConnection::CancelAllQueries() {
391   ClearTimeouts();
392
393   synch::MutexLocker lock(&sync_response_map_);
394
395   // remove all ResponseCallbacks, delete only the non-permanent ones
396   // Do NOT call ResponseCallback with sync_response_map_ LOCKED!
397   //
398   for ( ResponseCallbackMap::iterator it = response_map_.begin();
399         it != response_map_.end(); ++it) {
400     ResponseCallback * response_callback = it->second;
401     if ( !response_callback->is_permanent() ) {
402       delete response_callback;
403     }
404   }
405   response_map_.clear();
406
407   WaitForSelectorThread(&selector_);
408 }
409 }
Note: See TracBrowser for help on using the browser.