root/trunk/whisperlib/net/rpc/lib/client/rpc_client_connection_tcp.cc

Revision 7, 19.9 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #include "common/base/log.h"
33 #include "common/base/errno.h"
34 #include "common/base/timer.h"
35 #include "common/base/scoped_ptr.h"
36 #include "common/io/buffer/memory_stream.h"
37 #include "net/rpc/lib/client/rpc_client_connection_tcp.h"
38 #include "net/rpc/lib/codec/rpc_encoder.h"
39 #include "net/rpc/lib/codec/rpc_decoder.h"
40
41 // TODO(cpopescu): very important - all timeouts should be configurable
42
43 namespace rpc {
44
45 //////////////////////////////////////////////////////////////
46 //
47 //        Methods available to any external thread.
48 //
49 rpc::ClientConnectionTCP::ClientConnectionTCP(
50     net::Selector& selector,
51     net::NetFactory& net_factory,
52     net::PROTOCOL net_protocol,
53     const net::HostPort& remote_addr,
54     rpc::CODEC_ID codec_id,
55     int64 open_timeout_ms,
56     uint32 max_paralel_queries)
57     : IClientConnection(selector, rpc::CONNECTION_TCP, codec_id),
58       selector_(selector),
59       net_factory_(net_factory),
60       net_connection_(net_factory.CreateConnection(net_protocol)),
61       handshake_state_(NOT_INITIALIZED),
62       remote_addr_(remote_addr),
63       open_timeout_ms_(open_timeout_ms),
64       max_paralel_queries_(max_paralel_queries),
65       open_completed_(false, true),
66       is_opening_(false),
67       shutdown_is_executing_(false),
68       queries_(),
69       active_query_(NULL),
70       open_callback_(NULL),
71       timeouter_(&selector, NewPermanentCallback(
72           this, &rpc::ClientConnectionTCP::TimeoutHandler)) {
73   net_connection_->SetReadHandler(NewPermanentCallback(
74       this, &rpc::ClientConnectionTCP::ConnectionReadHandler), true);
75   net_connection_->SetWriteHandler(NewPermanentCallback(
76       this, &rpc::ClientConnectionTCP::ConnectionWriteHandler), true);
77   net_connection_->SetConnectHandler(NewPermanentCallback(
78       this, &rpc::ClientConnectionTCP::ConnectionConnectHandler), true);
79   net_connection_->SetCloseHandler(NewPermanentCallback(
80       this, &rpc::ClientConnectionTCP::ConnectionCloseHandler), true);
81   //  generate random data
82   for ( uint32 i = 0; i < HANDSHAKE_RANDOM_SIZE; i++ ) {
83     kHandshakeClientRandomData[i] = 'a' + i % ('z'-'a');
84   }
85 }
86
87 ClientConnectionTCP::~ClientConnectionTCP() {
88   Shutdown();
89   CHECK_NULL(open_callback_);
90   CHECK(queries_.empty());
91   CHECK_NULL(active_query_);
92   delete net_connection_;
93   net_connection_ = NULL;
94 }
95
96 bool ClientConnectionTCP::Open(OpenCallback* open_callback) {
97   CHECK_EQ(net_connection_->state(), net::NetConnection::DISCONNECTED)
98       << "Already Opened or Open in progress.";
99   CHECK_EQ(handshake_state_, NOT_INITIALIZED)
100       << "Already Opened or Open in progress.";
101
102   open_callback_ = open_callback;
103
104   // Start Open procedure
105   StartOpen();
106
107   if ( open_callback_ ) {
108     // Asynchronous Open
109     return true;
110   }
111
112   // Synchronous Open
113   CHECK(!selector_.IsInSelectThread())
114       << "Cannot wait for Open in selector thread. Use async Open instead.";
115
116   // Wait for TCP connect + RPC handshake completion.
117   // In the worst case a Timeout notification will wake us.
118   open_completed_.Wait();
119
120   if ( !IsOpen() ) {
121     Shutdown();
122     return false;
123   }
124
125   return true;
126 }
127
128 bool ClientConnectionTCP::IsOpen() const {
129   return net_connection_->state() == net::NetConnection::CONNECTED &&
130       handshake_state_ == CONNECTED;
131 }
132
133 void ClientConnectionTCP::Shutdown(synch::Event* signal_me_when_done) {
134   // Run this function no mater what. Maybe the selector just closed the
135   // connection, or a HandleError is in progress.. it doesn't matter.
136   // Testing for socket alive does not assure that the selector is not executing
137   // something in here.
138   //  e.g. [selector] -> RPCClientConnectionTCP::Close which closes fd but does
139   //                     not finish execution of Close()
140   //       [external] -> destructor -> Close -> test fd_ is invalid
141   //                     -> go on with destructor
142   //                  ==> Crash
143   if ( !selector_.IsInSelectThread() ) {
144     synch::Event shutdown_completed(false, true);
145     selector_.RunInSelectLoop(
146         NewCallback(this, &ClientConnectionTCP::Shutdown, &shutdown_completed));
147     shutdown_completed.Wait();
148     return;
149   }
150
151   //
152   // selector thread
153   //
154
155   // tells ConnectionCloseHandler it should not call Shutdown
156   shutdown_is_executing_ = true;
157
158   // Closes underlying TCP connection.
159   // Triggers a call to ConnectionCloseHandler.
160   net_connection_->ForceClose();
161
162   shutdown_is_executing_ = false;
163
164   CHECK_EQ(net_connection_->state(), net::NetConnection::DISCONNECTED);
165   handshake_state_ = NOT_INITIALIZED;
166
167   // if an asynchronous Open was in progress, clean it up.
168   if ( is_opening_ ) {
169     EndOpen("Disconnected");
170   }
171   CHECK_NULL(open_callback_);
172
173   // clear pending queries
174   if ( active_query_ ) {
175     delete active_query_;
176     active_query_ = NULL;
177   }
178   for ( ClientQueryMap::iterator it = queries_.begin();
179         it != queries_.end(); ++it ) {
180     ClientQuery* q = it->second;
181     delete q;
182   }
183   queries_.clear();
184
185   // Notify IClientConnection about us being broken
186   NotifyConnectionClosed();
187
188   LOG_DEBUG << "Disconnect from "
189             << net_connection_->remote_address()
190             << " completed";
191
192   // Let Signal() be the last statement! The destructor may be waiting
193   // for Shutdown completion.
194   if ( signal_me_when_done != NULL ) {
195     signal_me_when_done->Signal();
196   }
197 }
198
199 //////////////////////////////////////////////////////////////////////
200 //
201 //                    RPC I/O methods
202 //
203
204 // [THREAD SAFE]
205 void ClientConnectionTCP::Send(const rpc::Message* p) {
206   // IMPORTANT:
207   //  The query result (or failure) MUST be delivered on a different call !
208   //  Many times we synchronize the Call and HandleCallResult, so the Call
209   //  should not call HandleCallResult right away.
210   //  We also defend against stack-overflow:
211   //   - call async function
212   //     - call completion handler
213   //       - call another async function
214   //         - ...
215   selector_.RunInSelectLoop(
216       NewCallback(this, &ClientConnectionTCP::SendPacket, p));
217 }
218
219 void ClientConnectionTCP::Cancel(uint32 xid) {
220   if ( !selector_.IsInSelectThread() ) {
221     selector_.RunInSelectLoop(
222         NewCallback(this, &ClientConnectionTCP::Cancel, xid));
223     return;
224   }
225   ClientQueryMap::iterator it = queries_.find(xid);
226   if ( it == queries_.end() ) {
227     return;
228   }
229   ClientQuery* q = it->second;
230   delete q;
231   queries_.erase(it);
232 }
233
234 //////////////////////////////////////////////////////////////
235 //
236 //     Methods available only from the selector thread.
237 //
238 //////////////////////////////////////////////////////////////
239
240 void ClientConnectionTCP::StartOpen() {
241   if ( !selector_.IsInSelectThread() ) {
242     selector_.RunInSelectLoop(
243         NewCallback(this, &ClientConnectionTCP::StartOpen));
244     return;
245   }
246   is_opening_ = true;
247   timeouter_.SetTimeout(kOpenEvent, open_timeout_ms_);
248   StartConnect();
249 }
250
251 void ClientConnectionTCP::EndOpen(const char* err) {
252   CHECK ( is_opening_ );
253   // maybe set last error
254   if ( err != NULL && error_.empty() ) {
255     error_ = err;
256   }
257
258   if ( err ) {
259     // RPC connection failed
260     LOG_ERROR << "Open completed with error: " << Error();
261   } else {
262     // RPC connection established
263     LOG_INFO << " Connected to: " << net_connection_->remote_address()
264              << " Local address is: " << net_connection_->local_address()
265              << " #" << queries_.size() << " queries pending on send.";
266   }
267
268   timeouter_.UnsetTimeout(kOpenEvent);
269   is_opening_ = false;
270   open_completed_.Signal();
271   if ( open_callback_ ) {
272     LOG_DEBUG << "Running open_callback(" << boolalpha << IsOpen() << ")";
273     open_callback_->Run(IsOpen());
274     open_callback_ = NULL;
275   }
276 }
277
278 void ClientConnectionTCP::StartConnect() {
279   if ( !net_connection_->Connect(remote_addr_) ) {
280     // connection refused, or host unreachable
281     LOG_ERROR << "Failed to connect to remote host: " << remote_addr_
282               << ". Error: " << GetLastSystemErrorDescription();
283     EndConnect(GetLastSystemErrorDescription());
284   }
285 }
286
287 void ClientConnectionTCP::EndConnect(const char* err) {
288   if ( err ) {
289     EndOpen(err);
290     return;
291   }
292   StartHandshake();
293 }
294
295 void ClientConnectionTCP::StartHandshake() {
296   LOG_INFO << "Starting handshake";
297   CHECK_EQ(handshake_state_, NOT_INITIALIZED);
298   DoHandshake();
299 }
300
301 void ClientConnectionTCP::EndHandshake(
302     ClientConnectionTCP::HANDSHAKE_STATE state, const char* err) {
303   if ( handshake_state_ == FAILURE || handshake_state_ == CONNECTED ) {
304     LOG_FATAL << "Handshake already ended with handshake_state_="
305               << handshake_state_;
306   }
307   handshake_state_ = state;
308   EndOpen(err);
309 }
310
311 void ClientConnectionTCP::DoHandshake() {
312   // It's a 3 way handshake. The involved packets are as follows:
313   //  1. client -> server
314   //    - 3 bytes: "rpc"
315   //    - 2 bytes: rpc-protocol-version (hi, lo)
316   //    - 1 byte: codec identifier
317   //    - 32 bytes: client random generated data.
318   // 2. server -> client
319   //    - 3 bytes: "rpc"
320   //    - 2 bytes: rpc-protocol-version. Should match the client version.
321   //               Otherwise don't send this packet and drop the handshake.
322   //    - 1 byte: codec identifier. Should match the client codec.
323   //    - 32 bytes: server random generated data. Different from the
324   //                client data.
325   //    - 32 bytes: repeat client data.
326   // 3. client -> server
327   //    - 3 bytes: "rpc"
328   //    - 2 bytes: rpc-protocol-version. Should match the server version.
329   //               Otherwise don't send this packet and drop the handshake.
330   //    - 1 byte: codec id. Should be identical with the first packet.
331   //    - 32 bytes: repeat server data.
332   //
333   CHECK_EQ(net_connection_->state(), net::NetConnection::CONNECTED);
334   CHECK_NE(handshake_state_, CONNECTED);
335   CHECK_NE(handshake_state_, FAILURE);
336
337   // TODO(cosmin): take out all the inlined consts from here they look ugly
338   switch ( handshake_state_ ) {
339     case NOT_INITIALIZED: {  // initial state: handshake not started
340       // 1. create client first handshake packet
341       LOG_INFO << "Handshake step 1: sending client request";
342       uint8 clientHand[3+2+1+HANDSHAKE_RANDOM_SIZE];
343       memcpy(clientHand, "rpc", 3);        // "rpc" head
344       clientHand[3] = RPC_VERSION_MAJOR;   // version HI
345       clientHand[4] = RPC_VERSION_MINOR;   // version LO
346       clientHand[5] = GetCodec().GetCodecID();
347       memcpy(clientHand + 6,
348              kHandshakeClientRandomData,
349              HANDSHAKE_RANDOM_SIZE);  // 32 bytes client random data
350
351       // Send it to server
352       net_connection_->Write(clientHand, sizeof(clientHand));
353
354       handshake_state_ = WAITING_RESPONSE;
355       LOG_INFO << "Handshake step 1: completed. Client request sent";
356       return;
357     }
358     case WAITING_RESPONSE: {  // request sent, waiting response
359       // 2. wait for server reply ended, check reply
360       LOG_INFO << "Handshake step 2: waiting server response";
361       uint8 serverHand[3+2+1+HANDSHAKE_RANDOM_SIZE+HANDSHAKE_RANDOM_SIZE];
362       if ( net_connection_->inbuf()->Size() < sizeof(serverHand) ) {
363         return;   // not enough data received
364       }
365
366       uint32 read = net_connection_->inbuf()->Read(
367           serverHand, sizeof(serverHand));
368       CHECK_EQ(read, sizeof(serverHand));
369
370       // decode server reply
371       const uint8* replyMark = serverHand + 0;     // 3 bytes long
372       const uint8 replyVersionHi = serverHand[3];  // 1 byte
373       const uint8 replyVersionLo = serverHand[4];  // 1 byte
374       const uint8 replyCodecID = serverHand[5];    // 1 byte
375       const uint8* replyServerRandom = serverHand + 6;
376                          // HANDSHAKE_RANDOM_SIZE bytes
377       const uint8* replyClientRandom = serverHand + 6 + HANDSHAKE_RANDOM_SIZE;
378                         // HANDSHAKE_RANDOM_SIZE bytes
379       // check server reply for correctness.
380       if ( ::memcmp(replyMark, "rpc", 3) ) {
381         LOG_ERROR << "Handshake: bad mark. Server reply starts with: "
382                   << strutil::PrintableDataBuffer(replyMark, 3);
383         EndHandshake(FAILURE, "Bad handshake mark");
384         return;
385       }
386       if ( replyVersionHi != RPC_VERSION_MAJOR ||
387            replyVersionLo != RPC_VERSION_MINOR ) {
388         LOG_ERROR << "Handshake: bad version. Server is "
389                   << replyVersionHi << "." << replyVersionLo
390                   << " while client is " << RPC_VERSION_STR;
391         EndHandshake(FAILURE, "Bad handshake version");
392         return;
393       }
394       if ( replyCodecID != GetCodec().GetCodecID() ) {
395         LOG_ERROR << "Hanshake: bad codec. Server has codec_id=" << replyCodecID
396                   << " while client has codec_id=" << GetCodec().GetCodecID();
397         EndHandshake(FAILURE, "Bad codec id");
398         return;
399       }
400       if ( ::memcmp(replyClientRandom, kHandshakeClientRandomData,
401                     HANDSHAKE_RANDOM_SIZE) ) {
402         LOG_ERROR << "Handshake: bad random";
403         EndHandshake(FAILURE, "Bad handshake random");
404         return;
405       }
406       LOG_INFO << "Handshake step 2: completed. Server response OK.";
407
408       // 3. create client acknowledge packet
409       LOG_INFO << "Handshake step 3: sending client acknowledge";
410       uint8 clientHand[3+2+1+HANDSHAKE_RANDOM_SIZE];
411       memcpy(clientHand, "rpc", 3);        // "rpc" head
412       clientHand[3] = RPC_VERSION_MAJOR;   // version HI
413       clientHand[4] = RPC_VERSION_MINOR;   // version LO
414       clientHand[5] = GetCodec().GetCodecID();  // codec identifier
415       memcpy(clientHand + 6, replyServerRandom,
416              HANDSHAKE_RANDOM_SIZE);       // 32 bytes server random data
417
418       // Send it to server
419       net_connection_->Write(clientHand, sizeof(clientHand));
420
421       LOG_INFO << "Handshake step 3: completed. Handshake successful";
422
423       // Handshake successful
424       EndHandshake(CONNECTED);
425       return;
426     }
427     case CONNECTED:  // response received and verified. Connection estabilished.
428       LOG_FATAL << "Handshake already succeeded.";
429     case FAILURE:
430       LOG_FATAL << "Handshake already failed.";
431     default:
432       LOG_FATAL << "No such handshake_state_ = " << handshake_state_;
433   }
434 }
435
436 void ClientConnectionTCP::SendPacket(const rpc::Message* p) {
437   if ( !selector_.IsInSelectThread() ) {
438     selector_.RunInSelectLoop(
439         NewCallback(this, &ClientConnectionTCP::SendPacket, p));
440     return;
441   }
442
443   if ( queries_.size() >= max_paralel_queries_ ) {
444     NotifySendFailed(p, RPC_TOO_MANY_QUERIES);
445     return;
446   }
447
448   // create a new query which will store the message until sent
449   ClientQuery* q = new ClientQuery(p);
450
451   // encode the RPC message
452   GetCodec().EncodePacket(q->buf_, *p);
453
454   // Always keep a read marker on the stream begin,
455   // to be able to rewind on write error.
456   q->buf_.MarkerSet();
457
458   // enqueue the packet for send
459   pair<ClientQueryMap::iterator, bool>
460       result = queries_.insert(make_pair(p->header_.xid_, q));
461   CHECK(result.second) << "Duplicate XID";
462
463   if ( !IsOpen() ) {
464     if ( net_connection_->state() == net::NetConnection::DISCONNECTED ) {
465       StartOpen();
466     }
467     return;
468   }
469
470   // enable write (will trigger write events which write queries_ to network)
471   net_connection_->RequestWriteEvents(true);
472 }
473
474 //////////////////////////////////////////////////////////////////////
475 //
476 //             net::BufferedConnection methods
477 //
478 //
479 bool ClientConnectionTCP::ConnectionReadHandler() {
480   CHECK(selector_.IsInSelectThread());
481
482   // wrap connection's input buffer
483   io::MemoryStream* const in = net_connection_->inbuf();
484
485   if ( handshake_state_ != CONNECTED && handshake_state_ != FAILURE ) {
486     // still on handshake stage
487     DoHandshake();
488
489     if ( handshake_state_ == FAILURE ) {
490       LOG_ERROR << "Handshake failed, closing connection: " << Error();
491       return false;
492     }
493     if ( handshake_state_ != CONNECTED ) {
494       // handshake did not finish, more data to come.
495       return true;
496     }
497   }
498   CHECK_EQ(handshake_state_, CONNECTED);
499
500   // read & decode & handle as many rpc::Message-s as possible
501   while ( true ) {
502     // create a new rpc::Message to fill up
503     scoped_ptr<rpc::Message> msg(new rpc::Message());
504
505     in->MarkerSet();
506     DECODE_RESULT result = GetCodec().DecodePacket(*in, *msg);
507     if ( result == DECODE_RESULT_ERROR ) {
508       // no need to restore data. But do it for logging the "in" buffer.
509       in->MarkerRestore();
510       LOG_ERROR << "Failed to decode packet from...";
511       error_ = "Bad data. Decoder error.";
512       return false;            // close connection
513     }
514     if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) {
515       in->MarkerRestore();  // restore read data.
516       return true;            // not enough data, keep waiting.
517     }
518
519     // successfully received RPC message
520     CHECK_EQ(result, DECODE_RESULT_SUCCESS);
521     in->MarkerClear();
522
523     // send the received packet to IClientConnection interface
524     IClientConnection::HandleResponse(msg.release());
525   }
526 }
527
528 bool ClientConnectionTCP::ConnectionWriteHandler() {
529   if ( !net_connection_->outbuf()->IsEmpty() ) {
530     //
531     // wait for outbuf_ to be consumed
532     //
533     return true;
534   }
535
536   if ( handshake_state_ != CONNECTED ) {
537     // handhshake incomplete and outbuf_ empty
538     return true;
539   }
540
541   CHECK_EQ(handshake_state_, CONNECTED);
542   CHECK(net_connection_->outbuf()->IsEmpty());
543
544   //
545   // consume queries_
546   //
547   while ( true ) {
548     // maybe pop a new query
549     if ( !active_query_ && !queries_.empty() ) {
550       ClientQueryMap::iterator it = queries_.begin();
551       active_query_ = it->second;
552       queries_.erase(it);
553       LOG_INFO << "Sending query xid=" << active_query_->msg_->header_.xid_;
554     }
555
556     // no active query? nothing to send
557     if ( !active_query_ ) {
558       break;
559     }
560
561     // send active query
562     net_connection_->Write(&active_query_->buf_);
563
564     // query sent, delete it
565     active_query_->buf_.MarkerClear();
566     delete active_query_;
567     active_query_ = NULL;
568   }
569
570   return true;
571 }
572 void ClientConnectionTCP::ConnectionConnectHandler() {
573   LOG_INFO << "Connected to: " << net_connection_->remote_address();
574   EndConnect();   // connect process ended
575 }
576 void ClientConnectionTCP::ConnectionCloseHandler(
577     int err, net::NetConnection::CloseWhat what) {
578   if ( what != net::NetConnection::CLOSE_READ_WRITE ) {
579     return;
580   }
581   LOG_ERROR << "ConnectionCloseHandler"
582                " what=" << net::NetConnection::CloseWhatName(what)
583             << " , err: " << err;
584   if ( err != 0 ) {
585     error_ = GetSystemErrorDescription(err);
586   } else {
587     error_ = "Connection closed.";
588   }
589   if ( !shutdown_is_executing_ ) {
590     Shutdown();
591   }
592 }
593
594 void ClientConnectionTCP::TimeoutHandler(int64 timeout_id) {
595   switch ( timeout_id ) {
596     case kOpenEvent:
597       LOG_ERROR << "Timeout connecting to " << net_connection_->remote_address();
598       EndOpen("Open Timeout");
599       break;
600     default:
601       LOG_FATAL << "Unknown timeout_id: " << timeout_id;
602       break;
603   }
604 }
605
606 }
Note: See TracBrowser for help on using the browser.