root/trunk/whisperlib/net/base/connection.cc

Revision 7, 56.2 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Authors: Cosmin Tudorache & Catalin Popescu
31
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <linux/types.h>
36 #include <linux/errqueue.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39
40 #include "common/base/errno.h"
41 #include "common/base/log.h"
42 #include "common/base/timer.h"
43
44 #include "net/base/connection.h"
45
46 DEFINE_bool(net_connection_debug,
47             false,
48             "Enable debug messages in NetConnection");
49
50 namespace net {
51
52 #define IF_NET_DEBUG if ( !FLAGS_net_connection_debug ); else
53
54 #define ICONNLOG  IF_NET_DEBUG LOG_INFO << this->PrefixInfo()
55 #define WCONNLOG  IF_NET_DEBUG LOG_WARNING << this->PrefixInfo()
56 #define ECONNLOG              LOG_ERROR << this->PrefixInfo()
57 #define FCONNLOG              LOG_FATAL << this->PrefixInfo()
58
59 // log always
60 #define AICONNLOG             LOG_INFO << this->PrefixInfo()
61
62 #ifdef _DEBUG
63 #  define DCONNLOG IF_NET_DEBUG LOG_DEBUG << this->PrefixInfo()
64 #  define D10CONNLOG IF_NET_DEBUG LOG(10) << this->PrefixInfo()
65 #else
66 #  define DCONNLOG if ( false ) LOG_DEBUG
67 #  define D10CONNLOG if ( false ) LOG(10)
68 #endif
69
70
71 void NetAcceptor::SetFilterHandler(FilterHandler* filter_handler, bool own) {
72   CHECK(filter_handler->is_permanent());
73   DetachFilterHandler();
74   filter_handler_ = filter_handler;
75   own_filter_handler_ = own;
76 }
77 void NetAcceptor::DetachFilterHandler() {
78   if ( own_filter_handler_ ) {
79     delete filter_handler_;
80   }
81   filter_handler_ = NULL;
82   own_filter_handler_ = false;
83 }
84 void NetAcceptor::SetAcceptHandler(AcceptHandler* accept_handler, bool own) {
85   CHECK(accept_handler->is_permanent());
86   DetachAcceptHandler();
87   accept_handler_ = accept_handler;
88   own_accept_handler_ = own;
89 }
90 void NetAcceptor::DetachAcceptHandler() {
91   if ( own_accept_handler_ ) {
92     delete accept_handler_;
93   }
94   accept_handler_ = NULL;
95   own_accept_handler_ = false;
96 }
97 void NetAcceptor::DetachAllHandlers() {
98   DetachFilterHandler();
99   DetachAcceptHandler();
100 }
101
102 bool NetAcceptor::InvokeFilterHandler(const net::HostPort& peer_address) {
103   return !filter_handler_ ||
104           filter_handler_->Run(peer_address);
105 }
106 void NetAcceptor::InvokeAcceptHandler(NetConnection* new_connection) {
107   CHECK_NOT_NULL(accept_handler_) << "missing accept_handler_ !";
108   accept_handler_->Run(new_connection);
109 }
110
111 //////////////////////////////////////////////////////////////////////
112 //////////////////////////////////////////////////////////////////////
113 //////////////////////////////////////////////////////////////////////
114
115 void NetConnection::SetConnectHandler(ConnectHandler* connect_handler,
116                                       bool own) {
117   CHECK(connect_handler->is_permanent());
118   DetachConnectHandler();
119   connect_handler_ = connect_handler;
120   own_connect_handler_ = own;
121 }
122 void NetConnection::DetachConnectHandler() {
123   if ( own_connect_handler_ ) {
124     delete connect_handler_;
125   }
126   connect_handler_ = NULL;
127   own_connect_handler_ = false;
128 }
129 void NetConnection::SetReadHandler(ReadHandler* read_handler, bool own) {
130   CHECK(read_handler->is_permanent());
131   DetachReadHandler();
132   read_handler_ = read_handler;
133   own_read_handler_ = own;
134 }
135 void NetConnection::DetachReadHandler() {
136   if ( own_read_handler_ ) {
137     delete read_handler_;
138   }
139   read_handler_ = NULL;
140   own_read_handler_ = false;
141 }
142 void NetConnection::SetWriteHandler(WriteHandler* write_handler, bool own) {
143   CHECK(write_handler->is_permanent());
144   DetachWriteHandler();
145   write_handler_ = write_handler;
146   own_write_handler_ = own;
147 }
148 void NetConnection::DetachWriteHandler() {
149   if ( own_write_handler_ ) {
150     delete write_handler_;
151   }
152   write_handler_ = NULL;
153   own_write_handler_ = false;
154 }
155 void NetConnection::SetCloseHandler(CloseHandler* close_handler, bool own) {
156   CHECK(close_handler->is_permanent());
157   DetachCloseHandler();
158   close_handler_ = close_handler;
159   own_close_handler_ = own;
160 }
161 void NetConnection::DetachCloseHandler() {
162   if ( own_close_handler_ ) {
163     delete close_handler_;
164   }
165   close_handler_ = NULL;
166   own_close_handler_ = false;
167 }
168 void NetConnection::DetachAllHandlers() {
169   DetachConnectHandler();
170   DetachReadHandler();
171   DetachWriteHandler();
172   DetachCloseHandler();
173 }
174
175 void NetConnection::InvokeConnectHandler() {
176   AICONNLOG << "Connected! invoking application connect handler.. ";
177   CHECK_NOT_NULL(connect_handler_) << "no connect_handler found";
178   connect_handler_->Run();
179 }
180 bool NetConnection::InvokeReadHandler() {
181   CHECK_NOT_NULL(read_handler_) << "no read_handler found";
182   return read_handler_->Run();
183   // TODO(cosmin): check return value on read_handler_ or make it return void
184 }
185 bool NetConnection::InvokeWriteHandler() {
186   CHECK_NOT_NULL(write_handler_) << "no write_handler found";
187   return write_handler_->Run();
188   // TODO(cosmin): check return value on write_handler_ or make it return void
189 }
190 void NetConnection::InvokeCloseHandler(int err, CloseWhat what) {
191   if ( !close_handler_ ) {
192     // TODO(cosmin): remove fatal/warning.
193     //               Default behavior should FlushAndClose.
194     FCONNLOG << "No close_handler_ found";
195     FlushAndClose();
196     return;
197   }
198   close_handler_->Run(err, what);
199 }
200
201 //////////////////////////////////////////////////////////////////////
202 //////////////////////////////////////////////////////////////////////
203 //////////////////////////////////////////////////////////////////////
204
205 TcpAcceptor::TcpAcceptor(Selector* selector,
206                          const TcpAcceptorParams& tcp_params)
207   : NetAcceptor(tcp_params),
208     Selectable(selector),
209     tcp_params_(tcp_params),
210     fd_(INVALID_FD_VALUE) {
211 }
212 TcpAcceptor::~TcpAcceptor() {
213   InternalClose(0);
214   CHECK_EQ(fd_, INVALID_FD_VALUE);
215   CHECK_EQ(state(), DISCONNECTED);
216 }
217
218 bool TcpAcceptor::Listen(const HostPort& local_addr) {
219   struct sockaddr_storage addr;
220   local_addr.SockAddr(&addr);
221
222   CHECK_EQ(fd_, INVALID_FD_VALUE) << "Attempting Listen on valid socket";
223   CHECK_EQ(state(), DISCONNECTED) << "Attempting Listen on listening socket";
224
225   // create socket
226   fd_ = ::socket(addr.ss_family, SOCK_STREAM, 0);
227   if ( fd_ < 0 ) {
228     ECONNLOG << "::socket failed, err: " << GetLastSystemErrorDescription();
229     return false;
230   }
231
232   // set socket options
233   if ( !SetSocketOptions() ) {
234     ECONNLOG << "SetSocketOptions failed, closing socket fd: " << fd_;
235     int result = ::close(fd_);
236     if ( result != 0 ) {
237       ECONNLOG << "::close failed for fd=" << fd_
238                << " with error: " << GetLastSystemErrorDescription();
239     }
240     fd_ = INVALID_FD_VALUE;
241     return false;
242   }
243
244   // bind socket
245   if ( ::bind(fd_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) ) {
246     ECONNLOG << "Error binding fd: " << fd_ << " to "
247              << local_addr << " : " << GetLastSystemErrorDescription();
248     int result = ::close(fd_);
249     if ( result != 0 ) {
250       ECONNLOG << "::close failed for fd=" << fd_
251                << " with error: " << GetLastSystemErrorDescription();
252     }
253     fd_ = INVALID_FD_VALUE;
254     return false;
255   }
256
257   // listen on socket
258   if ( ::listen(fd_, tcp_params_.backlog_) ) {
259     ECONNLOG << "::listen failed for fd: " << fd_
260              << " , backlog: " << tcp_params_.backlog_
261              << " , local_address: " << local_addr
262              << " , err: " << GetLastSystemErrorDescription();
263     int result = ::close(fd_);
264     if ( result != 0 ) {
265       ECONNLOG << "::close failed for fd=" << fd_
266                << " with error: " << GetLastSystemErrorDescription();
267     }
268     fd_ = INVALID_FD_VALUE;
269     return false;
270   }
271
272   // register to selector
273   if ( !selector_->Register(this) ) {
274     ECONNLOG << "selector_->Register failed, closing socket fd: " << fd_;
275     int result = ::close(fd_);
276     if ( result != 0 ) {
277       ECONNLOG << "::close failed for fd=" << fd_
278                << " with error: " << GetLastSystemErrorDescription();
279     }
280     fd_ = INVALID_FD_VALUE;
281     return false;
282   }
283
284   set_local_address(local_addr);
285   set_state(LISTENING);
286   AICONNLOG << "Bound and listening on " << local_address();
287
288   // TODO(cosmin): remove these, Read should be enabled by default
289   //selector_->EnableReadCallback(this, true);
290   //selector_->EnableWriteCallback(this, false);
291
292   return true;
293 }
294 void TcpAcceptor::Close() {
295   InternalClose(0);
296 }
297 string TcpAcceptor::PrefixInfo() const {
298   ostringstream oss;
299   oss << StateName() << " : [" << local_address() << " (fd: " << fd_ << ")] ";
300   return oss.str();
301 }
302
303 bool TcpAcceptor::SetSocketOptions() {
304   CHECK_NE(fd_, INVALID_FD_VALUE);
305   // Enable non blocking (critical for using selector)
306   const int flags = fcntl(fd_, F_GETFL, 0);
307   if ( flags < 0 ) {
308     ECONNLOG << "::fcntl failed for fd=" << fd_
309              << " err: " << GetLastSystemErrorDescription();
310     return false;
311   }
312   const int new_flags = flags | O_NONBLOCK;
313   int result = fcntl(fd_, F_SETFL, new_flags);
314   if ( result < 0 ) {
315     ECONNLOG << "::fcntl failed for fd=" << fd_
316              << " new_flags=" << new_flags
317              << " err: " << GetLastSystemErrorDescription();
318     return false;
319   }
320   // Enable fast bind reusing (without this option, closing the socket
321   //  will switch OS port to CLOSE_WAIT state for ~1 minute, during which
322   //  bind fails with EADDRINUSE)
323   const int true_flag = 1;
324   if ( setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR,
325                   reinterpret_cast<const char *>(&true_flag),
326                   sizeof(true_flag)) < 0 ) {
327     ECONNLOG << "::setsockopt failed for fd_=" << fd_
328              << " err: " << GetLastSystemErrorDescription();
329     return false;
330   }
331   return true;
332 }
333
334 int TcpAcceptor::ExtractSocketErrno() {
335   return TcpConnection::ExtractSocketErrno(fd_);
336 }
337
338 void TcpAcceptor::InternalClose(int err) {
339   if ( fd_ == INVALID_FD_VALUE ) {
340     CHECK_EQ(state(), DISCONNECTED);
341     return;
342   }
343   D10CONNLOG << "Unregistering acceptor (fd: " << fd_ << ")...";
344   selector_->Unregister(this);
345   D10CONNLOG << "Performing ::close... ";
346   int result = ::close(fd_);
347   if ( result != 0 ) {
348     ECONNLOG << "::close failed for fd=" << fd_
349              << " with error: " << GetLastSystemErrorDescription();
350   }
351   fd_ = INVALID_FD_VALUE;
352   set_state(DISCONNECTED);
353   set_last_error_code(err);
354 }
355
356 bool TcpAcceptor::HandleReadEvent(int events) {
357   // new client connection
358
359   //perform ::accept
360   struct sockaddr_storage address;
361   socklen_t addrlen = sizeof(address);
362   const int client_fd = ::accept(fd_,
363                                  reinterpret_cast<sockaddr*>(&address),
364                                  &addrlen);
365   if ( client_fd < 0 ) {
366     if ( errno == EAGAIN || errno == EWOULDBLOCK ) {
367       // This could happen if the connecting client goes away just before
368       // we execute "accept".
369       ECONNLOG << "HandleReadEvent with no pending connection request!: "
370                << GetLastSystemErrorDescription();
371       return true;
372     }
373     ECONNLOG << "::accept failed: " << GetLastSystemErrorDescription();
374     return false;
375   }
376
377   HostPort hp(&address);
378   AICONNLOG << "TcpConnection accepted from " << hp;
379
380   // filter client
381   if ( !InvokeFilterHandler(hp) ) {
382     ECONNLOG << "Dumping connection from " << hp
383              << " because filter_handler_ refused it";
384     if ( ::close(client_fd) ) {
385       ECONNLOG << "::close fd: " << fd_ << " failed: "
386                << GetLastSystemErrorDescription();
387     }
388     return true;
389   }
390   Selector* const selector_to_use = tcp_params_.GetNextSelector();
391   if ( selector_to_use != NULL ) {
392     selector_to_use->RunInSelectLoop(
393         NewCallback(this, &TcpAcceptor::InitializeAceptedConnection,
394                     selector_to_use, client_fd));
395   } else {
396     InitializeAceptedConnection(selector_, client_fd);
397   }
398   return true;
399 }
400
401 void TcpAcceptor::InitializeAceptedConnection(Selector* selector,
402                                               int client_fd) {
403   DCHECK(selector->IsInSelectThread());
404
405   // create a TcpConnection object for this client
406   TcpConnection* client = new TcpConnection(selector_,
407                                             tcp_params_.tcp_connection_params_);
408   if ( !client->Wrap(client_fd) ) {
409     ECONNLOG << "Failed to Wrap incoming client fd: " << client_fd
410              << " dumping connection..";
411     if ( ::close(client_fd) ) {
412       ECONNLOG << "::close fd: " << fd_ << " failed: "
413                << GetLastSystemErrorDescription();
414     }
415   }
416
417   // for TCP an accepted fd is already fully connected
418   CHECK_EQ(client->state(), TcpConnection::CONNECTED);
419
420   // deliver this new client to application
421   InvokeAcceptHandler((NetConnection*)client);
422 }
423
424 bool TcpAcceptor::HandleWriteEvent(int events) {
425   FCONNLOG << "Erroneous call to HandleWriteEvent on server socket";
426   return false;
427 }
428 bool TcpAcceptor::HandleErrorEvent(int events) {
429   ECONNLOG << "HandleErrorEvent: 0x" << std::hex << events;
430   if ( events & EPOLLHUP ) {
431     FCONNLOG << "HUP on server socket";
432     return false;
433   }
434   if ( events & EPOLLRDHUP ) {
435     FCONNLOG << "RDHUP on server socket";
436     return false;
437   }
438   if ( events & EPOLLERR ) {
439     int err = ExtractSocketErrno();
440     ECONNLOG << "HandleErrorEvent err: " << GetSystemErrorDescription(err)
441              << " closing socket.";
442     InternalClose(err);
443     return false;
444   }
445   FCONNLOG << "HandleErrorEvent: unknown event: 0x" << std::hex << events;
446   return false;
447 }
448
449 //////////////////////////////////////////////////////////////////////
450 //////////////////////////////////////////////////////////////////////
451 //////////////////////////////////////////////////////////////////////
452
453 TcpConnection::TcpConnection(Selector* selector,
454                              const TcpConnectionParams& tcp_params)
455     : NetConnection(tcp_params),
456       Selectable(selector),
457       tcp_params_(tcp_params),
458       fd_(INVALID_FD_VALUE),
459       local_address_(),
460       remote_address_(),
461       write_closed_(true),
462       read_closed_(true),
463       timeouter_(selector,
464           NewPermanentCallback(this, &TcpConnection::HandleTimeoutEvent)) {
465 }
466
467 TcpConnection::~TcpConnection() {
468   InternalClose(0, true);
469   CHECK_EQ(state(), DISCONNECTED);
470   DetachAllHandlers();
471 }
472
473 void TcpConnection::Close(CloseWhat what) {
474   if ( fd_ == INVALID_FD_VALUE ) {
475     CHECK_EQ(state(), DISCONNECTED);
476     return;
477   }
478
479   ///////////////////////////////////////////
480   // Ignore CLOSE_READ, we should never need it.
481
482   ////////////////////////////////////////////
483   // If CLOSE_WRITE requested , go to FLUSHING state
484   if ( what == CLOSE_WRITE ||
485        what == CLOSE_READ_WRITE ) {
486     if ( !write_closed() ) {
487       set_state(FLUSHING);
488       RequestWriteEvents(true);
489       // NOTE: when outbuf_ gets empty we execute ::shutdown(write)
490       //       and set write_closed_ = true
491     }
492   }
493 }
494
495 //////////////////////////////////////////////////////////////////////
496
497 bool TcpConnection::Wrap(int fd) {
498   fd_ = fd;
499   if ( !SetSocketOptions() ) {
500     return false;
501   }
502   if ( !selector_->Register(this) ) {
503     fd_ = INVALID_FD_VALUE;
504     return false;
505   }
506   set_state(TcpConnection::CONNECTED);
507   set_read_closed(false);
508   set_write_closed(false);
509   InitializeLocalAddress();
510   InitializeRemoteAddress();
511   RequestReadEvents(true);
512   return true;
513 }
514
515 bool TcpConnection::Connect(const HostPort& remote_addr) {
516   struct sockaddr_storage addr;
517   remote_addr.SockAddr(&addr);
518   CHECK_EQ(state(), DISCONNECTED) << "Cannot connect in this state: "
519                                   << StateName();
520   CHECK_EQ(fd_, INVALID_FD_VALUE) << "FD already created?!";
521   // create socket
522   fd_ = ::socket(addr.ss_family, SOCK_STREAM, 0);
523   if ( fd_ < 0 ) {
524     ECONNLOG << "::socket failed: " << GetLastSystemErrorDescription();
525     fd_ = INVALID_FD_VALUE;
526     return false;
527   }
528   // set socket options: non-blocking, ...
529   if ( !SetSocketOptions() ) {
530     ECONNLOG << "SetSocketOptions failed, aborting Connect.. ";
531     ::close(fd_);
532     fd_ = INVALID_FD_VALUE;
533     return false;
534   }
535   // register with selector
536   if ( !selector_->Register(this) ) {
537     ECONNLOG << "Failed to register with selector, aborting Connect.. ";
538     ::close(fd_);
539     fd_ = INVALID_FD_VALUE;
540     return false;
541   }
542
543   // begin connect
544   set_state(CONNECTING);
545   set_read_closed(false);
546   set_write_closed(false);
547   remote_address_ = remote_addr;
548
549   if ( ::connect(fd_,
550                  reinterpret_cast<const sockaddr*>(&addr),
551                  sizeof(addr)) ) {
552     CHECK_NE(errno, EALREADY) << "a previous connection attempt has not yet"
553                                  " been completed";
554     if ( errno == EINPROGRESS ) {
555       AICONNLOG << "Connecting to " << remote_address();
556       RequestWriteEvents(true);
557       RequestReadEvents(true);
558       return true;
559     } else {
560       ECONNLOG << "Error connecting fd: " << fd_
561                << " to addr: " << remote_addr
562                << ", family: " << addr.ss_family
563                << ", error: " << GetLastSystemErrorDescription();
564       InternalClose(GetLastSystemError(), false);
565       return false;
566     }
567   }
568
569   // TODO(cosmin): connect already completed.
570   //               But to simplify logic, we wait for the first HandleWrite
571   //               and call InvokeConnectHandler there.
572   RequestWriteEvents(true);
573   RequestReadEvents(true);
574   return true;
575   /*
576   // connect completed
577   set_state(CONNECTED);
578   InitializeLocalAddress();
579
580   // Even in this unlikely event, we do not run HandleConnect from inside
581   // Connect - it may deadlock: the application usually synchronizes
582   // Connect and HandleConnect.
583   selector_->RunInSelectLoop(
584       NewCallback(this, &TcpConnection::InvokeConnectHandler));
585   return true;
586   */
587 }
588
589 void TcpConnection::FlushAndClose() {
590   Close(CLOSE_WRITE);
591 }
592 void TcpConnection::ForceClose() {
593   InternalClose(0, true);
594 }
595
596 bool TcpConnection::SetSendBufferSize(int size) {
597   if ( ::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF,
598                     reinterpret_cast<const char *>(&size), sizeof(size)) ) {
599     ECONNLOG << "::setsockopt failed: " << GetLastSystemErrorDescription();
600     return false;
601   }
602   return true;
603 }
604 bool TcpConnection::SetRecvBufferSize(int size) {
605   if ( ::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF,
606                     reinterpret_cast<const char *>(&size), sizeof(size)) ) {
607     ECONNLOG << "::setsockopt failed: " << GetLastSystemErrorDescription();
608     return false;
609   }
610   return true;
611 }
612
613 void TcpConnection::RequestReadEvents(bool enable) {
614   D10CONNLOG << "RequestReadEvents => " << std::boolalpha << enable;
615   selector_->EnableReadCallback(this, enable);
616 }
617 void TcpConnection::RequestWriteEvents(bool enable) {
618   D10CONNLOG << "RequestWriteEvents => " << std::boolalpha << enable;
619   selector_->EnableWriteCallback(this, enable);
620 }
621
622 const HostPort& TcpConnection::local_address() const {
623   return local_address_;
624 }
625 const HostPort& TcpConnection::remote_address() const {
626   return remote_address_;
627 }
628
629 string TcpConnection::PrefixInfo() const {
630   ostringstream oss;
631   oss << StateName() << " : ["
632       << local_address() << " => " << remote_address()
633       << " (fd: " << fd_ << ")] ";
634   return oss.str();
635 }
636
637
638 //////////////////////////////////////////////////////////////////////
639
640 bool TcpConnection::HandleReadEvent(int events) {
641   CHECK(state() != DISCONNECTED) << "Invalid state: " << StateName();
642   D10CONNLOG << "HandleReadEvent: " << std::hex << events;
643
644   if ( state() == CONNECTING ) {
645     set_state(CONNECTED);
646     // read and write events should be enabled
647     InitializeLocalAddress();
648     InvokeConnectHandler();
649     CHECK(state() == CONNECTED ||
650           state() == DISCONNECTED ||
651           state() == FLUSHING);
652     // either the application closed the connection in "ConnectHandler"
653     // or the connection goes on in CONNECTED state
654     return state() == CONNECTED;
655   }
656
657   CHECK(state() == CONNECTED ||
658         state() == FLUSHING) << "Illegal state: " << StateName();
659
660   // Read from network into inbuf_
661   int32 cb = Selectable::Read(inbuf());
662   if ( cb < 0 ) {
663     ECONNLOG << "Closing connection because Read failed: "
664              << GetLastSystemErrorDescription();
665     InternalClose(ExtractSocketErrno(), true);
666     return false;
667   }
668
669   D10CONNLOG << "HandleReadEvent: #" << cb << " bytes read,"
670              << " #" << inbuf()->Size() << " total bytes in inbuf_";
671   inc_bytes_read(cb);
672
673   if ( cb > 0 ) {
674     // call application read_handler_
675     if ( !InvokeReadHandler() ) {
676       WCONNLOG << "Closing TcpConnection because read_handler_ said so";
677       InternalClose(0, true);
678       return false;
679     }
680     D10CONNLOG << "HandleReadEvent: after read_handler_"
681                << " #" << inbuf()->Size() << " bytes still remaining in inbuf_";
682   }
683
684   if ( cb == 0 ) {
685     WCONNLOG << "Previous read returned 0 bytes, READ half closed";
686     set_read_closed(true);
687   }
688
689   if ( read_closed() ) {
690     InvokeCloseHandler(0, CLOSE_READ);
691     if ( fd_ != INVALID_FD_VALUE ) {
692       // we need this because EPOLLIN continuously fires
693       RequestReadEvents(false);
694       // TODO(cosmin): remove, application should close WRITE
695       Close(CLOSE_WRITE);
696     }
697     return true;
698   }
699   return true;
700 }
701
702 bool TcpConnection::HandleWriteEvent(int events) {
703   CHECK(state() != DISCONNECTED) << "Invalid state: " << StateName();
704   D10CONNLOG << "HandleWriteEvent: " << std::hex << events;
705
706   if ( state() == CONNECTING ) {
707     set_state(CONNECTED);
708     // read and write events should be enabled
709     InitializeLocalAddress();
710     InvokeConnectHandler();
711     CHECK(state() == CONNECTED ||
712           state() == DISCONNECTED ||
713           state() == FLUSHING);
714     // either the application closed the connection in "ConnectHandler"
715     // or the connection goes on in CONNECTED state
716     return state() == CONNECTED;
717   }
718
719   CHECK(state() == CONNECTED ||
720         state() == FLUSHING) << "Illegal state: " << StateName();
721
722   // write data from outbuf_ to network
723   const int32 cb = Selectable::Write(outbuf());
724   if ( cb < 0 ) {
725     ECONNLOG << "Closing connection because Write failed: "
726              << GetLastSystemErrorDescription();
727     InternalClose(ExtractSocketErrno(), true);
728     return false;
729   }
730   D10CONNLOG << "HandleWriteEvent: #" << cb << " bytes written"
731              << " to: " << remote_address();
732   inc_bytes_written(cb);
733
734   if ( state() != FLUSHING ) {
735     // call application write_handler_
736     if ( !InvokeWriteHandler() ) {
737       WCONNLOG << "Closing connection because write_handler_ said so";
738       InternalClose(0, true);
739       return false;
740     }
741   }
742
743   if ( outbuf()->IsEmpty() ) {
744     RequestWriteEvents(false);   // stop write events.
745
746     if ( state() == FLUSHING ) {
747       // FLUSHING finished sending all buffered data.
748       // Execute ::shutdown write half.
749       WCONNLOG << "Flushing finished, executing shutdown WRITE half.";
750       int result = ::shutdown(fd_, SHUT_WR);
751       if ( result != 0 ) {
752         ECONNLOG << "::shutdown failed with fd=" << fd_ << " how=SHUT_WR"
753                  << " err: " << GetLastSystemErrorDescription();
754         InternalClose(0, true);
755         return false;
756       }
757       set_write_closed(true);
758       // We closed the write half, the peer is notified by RDHUP.
759       // Now we wait him to close the connection too, and when it does
760       // we get a HUP.
761       // In case of linger_timeout we force close the connection.
762       timeouter_.SetTimeout(kShutdownTimeoutId,
763                             tcp_params_.shutdown_linger_timeout_ms_);
764       return true;
765     }
766   }
767
768   return true;
769 }
770
771 bool TcpConnection::HandleErrorEvent(int events) {
772   // Possible error events, according to epoll_ctl(2) manual page:
773   // ("events" is a combination of one or more of these)
774   //
775   // EPOLLRDHUP  Stream socket peer closed connection, or shut down
776   //             writing half of connection. (This flag is especially useful
777   //             for writing simple code to detect peer shutdown when using
778   //             Edge Triggered monitoring.)
779   //
780   // EPOLLERR    Error condition happened on the associated file descriptor.
781   //
782   // EPOLLHUP    Hang up happened on the associated file descriptor.
783   //
784
785   CHECK_NE(state(), DISCONNECTED);
786
787   if ( (events & EPOLLERR) == EPOLLERR ) {
788     ECONNLOG << "HandleErrorEvent errno=" << GetLastSystemErrorDescription();
789     const int err = ExtractSocketErrno();
790     InternalClose(err, true);
791     return false;
792   }
793
794   // IMPORTANT:
795   // The chain of events on a connected socket:
796   //        A                            B
797   //    -----------                 -----------
798   // executes:
799   // a) close fd, or
800   // b) shutdown write
801   //     ========================>
802   //                               receives RDHUP, and executes
803   //                               c) close fd, or
804   //                               d) shutdown write
805   //     <========================
806   // a) nothing happens
807   // b) receives HUP,
808   //    and executes close fd
809   //     ========================>
810   //                               c) nothing happens
811   //                               d) receives HUP,
812   //                                  and executes close fd
813
814   if ( (events & EPOLLHUP) == EPOLLHUP ) {
815     // peer completely closed the connection
816     WCONNLOG << "HandleErrorEvent: EPOLLHUP, both READ and WRITE halves closed";
817     set_write_closed(true);
818     if ( events & EPOLLIN ) {
819       // don't close here, let the next HandleReadEvent read pending data.
820       // EPOLLHUP is continuously generated.
821       return true;
822     }
823     WCONNLOG << "Closing connection because both"
824                 " READ and WRITE halves are closed.";
825     InternalClose(0, true);
826     return false;
827   }
828   if ( (events & EPOLLRDHUP) == EPOLLRDHUP ) {
829     WCONNLOG << "HandleErrorEvent: EPOLLRDHUP, READ half closed";
830     if ( events & EPOLLIN ) {
831       // peer closed write half of the connection
832       // there may be pending data on read. So wait until recv() returns 0,
833       // then set read_closed_ = true;
834       return true;
835     }
836     // no EPOLLIN means READ disabled ..
837     ECONNLOG << "Peer closed on us - treat as error !";
838     InternalClose(0, true);
839     return false;
840   }
841   FCONNLOG << "Unknown error event: " << events;
842   return true;
843 }
844
845 void TcpConnection::Close() {
846   // this call comes from Selectable interface
847   InternalClose(0, true);
848 }
849
850 //////////////////////////////////////////////////////////////////////
851
852 //////////////////////////////////////////////////////////////////////
853
854 bool TcpConnection::SetSocketOptions() {
855   // enable non-blocking
856   CHECK_GE(fd_, 0);
857   const int flags = fcntl(fd_, F_GETFL, 0);
858   if ( flags < 0 ) {
859     ECONNLOG << "::fcntl failed for fd=" << fd_
860              << " err: " << GetLastSystemErrorDescription();
861     return false;
862   }
863   const int new_flags = flags | O_NONBLOCK;
864   int result = ::fcntl(fd_, F_SETFL, new_flags);
865   if ( result < 0 ) {
866     ECONNLOG << "::fcntl failed for fd=" << fd_
867              << " new_flags=" << new_flags
868              << " err: " << GetLastSystemErrorDescription();
869     return false;
870   }
871   // disable Nagel buffering algorithm
872   const int true_flag = 1;
873   if ( ::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY,
874                     reinterpret_cast<const char *>(&true_flag),
875                     sizeof(true_flag)) < 0 ) {
876     ECONNLOG << "::setsockopt failed for fd=" << fd_
877              << " err: " << GetLastSystemErrorDescription();
878     return false;
879   }
880   // set tcp parameters
881   if ( tcp_params_.send_buffer_size_ != -1 ) {
882     if ( !SetSendBufferSize(tcp_params_.send_buffer_size_) ) {
883       return false;
884     }
885   }
886   if ( tcp_params_.recv_buffer_size_ != -1 ) {
887     if ( !SetRecvBufferSize(tcp_params_.recv_buffer_size_) ) {
888       return false;
889     }
890   }
891   return true;
892 }
893
894 // static
895 int TcpConnection::ExtractSocketErrno(int fd) {
896   // TODO(cosmin): implement
897   LOG_ERROR << "ExtractSocketErrno not implemented! using global errno: "
898             << errno;
899   return errno;
900 }
901
902 void TcpConnection::InitializeLocalAddress() {
903   struct sockaddr_storage addr;
904   socklen_t len = sizeof(addr);
905   if ( !::getsockname(fd_,
906                       reinterpret_cast<sockaddr*>(&addr), &len) ) {
907     local_address_ = HostPort(&addr);
908   } else {
909     ECONNLOG << "::getsockname failed: " << GetLastSystemErrorDescription();
910   }
911 }
912 void TcpConnection::InitializeRemoteAddress() {
913   struct sockaddr_storage addr;
914   socklen_t len = sizeof(addr);
915   if ( !::getpeername(fd_,
916                       reinterpret_cast<sockaddr*>(&addr), &len) ) {
917     remote_address_ = HostPort(&addr);
918   } else {
919     ECONNLOG << "::getpeername failed: " << GetLastSystemErrorDescription();
920   }
921 }
922
923 void TcpConnection::InternalClose(int err, bool call_close_handler) {
924   if ( fd_ == INVALID_FD_VALUE ) {
925     CHECK_EQ(state(), DISCONNECTED);
926     return;
927   }
928   D10CONNLOG << "Unregistering connection.. ";
929   selector_->Unregister(this);
930   ::shutdown(fd_, SHUT_RDWR);
931   DCONNLOG << "Performing the ::close... ";
932   if ( ::close(fd_) < 0 ) {
933     ECONNLOG << "Error closing fd: " << fd_ << " err: "
934              << GetLastSystemErrorDescription();
935   }
936   fd_ = INVALID_FD_VALUE;
937   set_state(DISCONNECTED);
938   set_read_closed(true);
939   set_write_closed(true);
940   set_last_error_code(err);
941   timeouter_.UnsetAllTimeouts();
942   inbuf()->Clear();
943   outbuf()->Clear();
944   if ( call_close_handler ) {
945     InvokeCloseHandler(err, CLOSE_READ_WRITE);
946   }
947 }
948
949 void TcpConnection::HandleTimeoutEvent(int64 timeout_id) {
950   if ( timeout_id == kShutdownTimeoutId ) {
951     ECONNLOG << "Shutdown linger timeout. Forcing close...";
952     InternalClose(0, true);
953     return;
954   }
955   FCONNLOG << "Unknown timeout_id: " << timeout_id;
956   InternalClose(0, true);
957 }
958
959 //////////////////////////////////////////////////////////////////////
960 //////////////////////////////////////////////////////////////////////
961 //////////////////////////////////////////////////////////////////////
962
963 SslAcceptor::SslAcceptor(Selector* selector,
964                          const SslAcceptorParams& ssl_params)
965   : NetAcceptor(ssl_params),
966     selector_(selector),
967     params_(ssl_params),
968     tcp_acceptor_(selector) {
969   tcp_acceptor_.SetFilterHandler(NewPermanentCallback(
970       this, &SslAcceptor::TcpAcceptorFilterHandler), true);
971   tcp_acceptor_.SetAcceptHandler(NewPermanentCallback(
972       this, &SslAcceptor::TcpAcceptorAcceptHandler), true);
973 }
974 SslAcceptor::~SslAcceptor() {
975   SslClear();
976 }
977
978 bool SslAcceptor::TcpAcceptorFilterHandler(const net::HostPort& peer_addr) {
979   return InvokeFilterHandler(peer_addr);
980 }
981 void SslAcceptor::TcpAcceptorAcceptHandler(NetConnection* net_connection) {
982   TcpConnection* tcp_connection = static_cast<TcpConnection*>(net_connection);
983   SslConnection* ssl_connection =
984       new SslConnection(tcp_connection->selector(),
985                         params_.ssl_connection_params_);
986   AICONNLOG << "SslConnection allocated: " << ssl_connection
987             << " staring SSL setup";
988   // Set temporary handlers in the new ssl_connection. We'll be notified
989   // when the ssl connect is completed. Only after the ssl connection
990   // is fully established we pass it to application.
991   ssl_connection->SetConnectHandler(NewPermanentCallback(
992       this, &SslAcceptor::SslConnectionConnectHandler, ssl_connection), true);
993   ssl_connection->SetCloseHandler(NewPermanentCallback(
994       this, &SslAcceptor::SslConnectionCloseHandler, ssl_connection), true);
995   ssl_connection->Wrap(tcp_connection);
996 }
997
998 void SslAcceptor::SslConnectionConnectHandler(SslConnection* ssl_connection) {
999   AICONNLOG << "SslConnection setup done: " << ssl_connection
1000            << " forwarding to application";
1001   // Ssl connection ready, we detach our temporary handlers
1002   // to let the application attach and use it.
1003   ssl_connection->DetachAllHandlers();
1004   // pass ssl connection to application
1005   InvokeAcceptHandler(ssl_connection);
1006 }
1007 void SslAcceptor::SslConnectionCloseHandler(SslConnection* ssl_connection,
1008     int err, NetConnection::CloseWhat what) {
1009   if ( what != NetConnection::CLOSE_READ_WRITE ) {
1010     // ignore partial close
1011     return;
1012   }
1013   // ssl connection broken, we have to delete it
1014   // NOTE: we are called from SslConnection !! don't use "delete" here
1015   ECONNLOG << "SslConnection setup failed: " << ssl_connection
1016            << " deleting..";
1017   ssl_connection->selector()->DeleteInSelectLoop(ssl_connection);
1018 }
1019
1020 bool SslAcceptor::Listen(const net::HostPort& local_addr) {
1021   return SslInitialize() && tcp_acceptor_.Listen(local_addr);
1022 }
1023 void SslAcceptor::Close() {
1024   tcp_acceptor_.Close();
1025 }
1026 string SslAcceptor::PrefixInfo() const {
1027   return tcp_acceptor_.PrefixInfo() + " [SSL]: ";
1028 }
1029 // TODO(cosmin): test, remove
1030 bool g_ssl_initialized  = false;
1031 bool SslAcceptor::SslInitialize() {
1032   if ( params_.ssl_connection_params_.ssl_context_ == NULL ) {
1033     ECONNLOG << "Missing SSL context";
1034     return false;
1035   }
1036   if ( SSL_CTX_check_private_key(
1037          params_.ssl_connection_params_.ssl_context_) != 1) {
1038     ECONNLOG << "SslAcceptor needs an SSL certificate & key";
1039     //return false;
1040   }
1041
1042   return true;
1043 }
1044 void SslAcceptor::SslClear() {
1045 }
1046
1047 //////////////////////////////////////////////////////////////////////
1048
1049 SslConnection::SslConnection(Selector* selector,
1050                              const SslConnectionParams& ssl_params)
1051   : NetConnection(ssl_params),
1052     selector_(selector),
1053     ssl_params_(ssl_params),
1054     tcp_connection_(NULL),
1055     is_server_side_(false),
1056     p_ctx_(NULL),
1057     p_bio_read_(NULL),
1058     p_bio_write_(NULL),
1059     p_ssl_(NULL),
1060     handshake_finished_(false),
1061     read_blocked_(false),
1062     read_blocked_on_write_(false),
1063     write_blocked_on_read_(false),
1064     ssl_out_count_(0),
1065     ssl_in_count_(0),
1066     timeouter_(selector,
1067         NewPermanentCallback(this, &SslConnection::HandleTimeoutEvent)) {
1068 }
1069 SslConnection::~SslConnection() {
1070   ForceClose();
1071   delete tcp_connection_;
1072   tcp_connection_ = NULL;
1073 }
1074
1075 void SslConnection::Wrap(TcpConnection* tcp_connection) {
1076   CHECK_NULL(tcp_connection_);
1077   tcp_connection_ = tcp_connection;
1078   tcp_connection_->SetConnectHandler(NewPermanentCallback(
1079       this, &SslConnection::TcpConnectionConnectHandler), true);
1080   tcp_connection_->SetCloseHandler(NewPermanentCallback(
1081       this, &SslConnection::TcpConnectionCloseHandler), true);
1082   tcp_connection_->SetReadHandler(NewPermanentCallback(
1083       this, &SslConnection::TcpConnectionReadHandler), true);
1084   tcp_connection_->SetWriteHandler(NewPermanentCallback(
1085       this, &SslConnection::TcpConnectionWriteHandler), true);
1086   set_state(CONNECTING);
1087   is_server_side_ = true;
1088   // resume from the point where the TCP is connected and SSL handshake should start
1089   TcpConnectionConnectHandler();
1090 }
1091 bool SslConnection::Connect(const HostPort& remote_addr) {
1092   CHECK_NULL(tcp_connection_);
1093   tcp_connection_ = new TcpConnection(selector_);
1094   tcp_connection_->SetConnectHandler(NewPermanentCallback(
1095       this, &SslConnection::TcpConnectionConnectHandler), true);
1096   tcp_connection_->SetCloseHandler(NewPermanentCallback(
1097       this, &SslConnection::TcpConnectionCloseHandler), true);
1098   tcp_connection_->SetReadHandler(NewPermanentCallback(
1099       this, &SslConnection::TcpConnectionReadHandler), true);
1100   tcp_connection_->SetWriteHandler(NewPermanentCallback(
1101       this, &SslConnection::TcpConnectionWriteHandler), true);
1102   set_state(CONNECTING);
1103   is_server_side_ = false;
1104   AICONNLOG << "Connecting to " << remote_addr;
1105   if ( !tcp_connection_->Connect(remote_addr) ) {
1106     ECONNLOG << "Connect failed for remote address: " << remote_addr;
1107     delete tcp_connection_;
1108     tcp_connection_ = NULL;
1109     set_state(DISCONNECTED);
1110     return false;
1111   }
1112   // The next thing will be:
1113   //  - TcpConnectionConnectHandler: TCP is connected and SSL handshake should start
1114   //  - TcpConnectionCloseHandler: TCP broken, clear everything.
1115   return true;
1116 }
1117 void SslConnection::FlushAndClose() {
1118   SslShutdown();
1119   tcp_connection_->FlushAndClose();
1120 }
1121 void SslConnection::ForceClose() {
1122   SslClear();
1123   tcp_connection_->ForceClose();
1124 }
1125 bool SslConnection::SetSendBufferSize(int size) {
1126   CHECK_NOT_NULL(tcp_connection_);
1127   return tcp_connection_->SetSendBufferSize(size);
1128 }
1129 bool SslConnection::SetRecvBufferSize(int size) {
1130   CHECK_NOT_NULL(tcp_connection_);
1131   return tcp_connection_->SetRecvBufferSize(size);
1132 }
1133 void SslConnection::RequestReadEvents(bool enable) {
1134   CHECK_NOT_NULL(tcp_connection_);
1135   return tcp_connection_->RequestReadEvents(enable);
1136 }
1137 void SslConnection::RequestWriteEvents(bool enable) {
1138   CHECK_NOT_NULL(tcp_connection_);
1139   return tcp_connection_->RequestWriteEvents(enable);
1140 }
1141 const HostPort& SslConnection::local_address() const {
1142   static const HostPort empty_address;
1143   return tcp_connection_ == NULL ? empty_address :
1144                                    tcp_connection_->local_address();
1145 }
1146 const HostPort& SslConnection::remote_address() const {
1147   static const HostPort empty_address;
1148   return tcp_connection_ == NULL ? empty_address :
1149                                    tcp_connection_->remote_address();
1150 }
1151 string SslConnection::PrefixInfo() const {
1152   CHECK_NOT_NULL(tcp_connection_);
1153   return tcp_connection_->PrefixInfo() +
1154          "[SSL: " + StateName() + "]: ";
1155 }
1156
1157
1158 bool SslConnection::HandleReadEvent(int events) {
1159   FCONNLOG << "TODO(cosmin): this function should never be called";
1160   return true;
1161 }
1162 bool SslConnection::HandleWriteEvent(int events) {
1163   FCONNLOG << "TODO(cosmin): this function should never be called";
1164   return true;
1165 }
1166 bool SslConnection::HandleErrorEvent(int events) {
1167   FCONNLOG << "TODO(cosmin): this function should never be called";
1168   return true;
1169 }
1170 void SslConnection::Close() {
1171   CHECK_NOT_NULL(tcp_connection_);
1172   tcp_connection_->ForceClose();
1173 }
1174 int SslConnection::GetFd() const {
1175   return INVALID_FD_VALUE;
1176 }
1177
1178
1179 void SslConnection::TcpConnectionConnectHandler() {
1180   AICONNLOG << "TcpConnection established, initializing SSL layer..";
1181   // initialize SSL structures here
1182   if ( !SslInitialize(is_server_side_) ) {
1183     ECONNLOG << "SslInitialize failed, closing underlying TCP connection..";
1184     ForceClose();
1185     return;
1186   }
1187   // Our state is still CONNECTING, the next thing is:
1188   //  - TCP invokes TcpConnectionWriteHandler, and the SSL handshake will begin
1189 }
1190 bool SslConnection::TcpConnectionReadHandler() {
1191   // Read from TCP --> write to SSL
1192   while ( !tcp_connection_->inbuf()->IsEmpty() ) {
1193     char buf[1024];
1194     int32 read = tcp_connection_->inbuf()->Read(buf, sizeof(buf));
1195     int32 write = BIO_write(p_bio_read_, buf, read);
1196     if ( write < read ) {
1197       // we use memory BIO, no reason for BIO_write to fail
1198       ECONNLOG << "BIO_write failed, closing connection";
1199       ForceClose();
1200       return false;
1201     }
1202
1203     ssl_in_count_ += write;
1204     //WCONNLOG << "BIO write: " << write << " bytes"
1205     //            ", BIO total: in " << ssl_in_count_
1206     //         << " / out " << ssl_out_count_
1207     //         << " TCP buffers: in " << tcp_connection_->inbuf()->Size()
1208     //         << " / out " << tcp_connection_->outbuf()->Size();
1209     DCONNLOG << "TCP >>>> " << write << " bytes >>>> SSL";
1210   }
1211   if ( state() != CONNECTED ) {
1212     // still in handshake
1213     SslHandshake();
1214     return true;
1215   }
1216
1217   if ( write_blocked_on_read_ ) {
1218     // an SSL_write is in progress, we cannot SSL_read.
1219     RequestWriteEvents(true);
1220     return true;
1221   }
1222
1223   //NOTE: SSL_pending looks only inside SSL layer, and not into BIO buffer.
1224   //      So even if you have tons of data in BIO, SSL_pending still returns 0.
1225
1226   // Read from SSL --> write to inbuf()
1227   while ( BIO_pending(p_bio_read_) ) {
1228     // If there is no data in p_bio_read_ then avoid calling SSL_read because
1229     // it would return WANT_READ and we'll get read_blocked.
1230     //WCONNLOG << "Going to SSL_read from bio_data: " << SslPrintableBio(p_bio_read_);
1231     char buf[1024];
1232     int32 read = SSL_read(p_ssl_, buf, sizeof(buf));
1233     //WCONNLOG << "SSL read: " << read << " bytes"
1234     //            " => " << SslErrorName(SSL_get_error(p_ssl_, read));
1235     //WCONNLOG << "After SSL_read remaining bio_data: " << SslPrintableBio(p_bio_read_);
1236     read_blocked_ = false;
1237     read_blocked_on_write_ = false;
1238     if ( read < 0 ) {
1239       int error = SSL_get_error(p_ssl_, read);
1240       switch(error) {
1241         case SSL_ERROR_NONE:
1242           break;
1243         case SSL_ERROR_WANT_READ:
1244           read_blocked_ = true;
1245           break;
1246         case SSL_ERROR_WANT_WRITE:
1247           read_blocked_on_write_ = true;
1248           RequestWriteEvents(true);
1249           break;
1250         case SSL_ERROR_ZERO_RETURN:
1251           // End of data. We need to SSL_shutdown.
1252           FlushAndClose();
1253           return true;
1254         default:
1255           // TODO(cosmin): make error, non fatal
1256           ECONNLOG << "SSL_read fatal, SSL_get_error => "
1257                    << error << " " << SslErrorName(error)
1258                    << " , " << SslLastError()
1259                    << " , closing connection";
1260           ForceClose();
1261           return false;
1262       };
1263       break;
1264     }
1265     // SSL_read was successful
1266     int32 write = inbuf()->Write(buf, read);
1267     CHECK_EQ(write, read);
1268     DCONNLOG << "SSL >>>> " << read << " bytes >>>> APP";
1269   }
1270
1271   if ( !read_blocked_ && !outbuf()->IsEmpty() ) {
1272     // the write has been stopped due to read_blocked_
1273     RequestWriteEvents(true);
1274   }
1275
1276   // skip InvokeReadHandler if inbuf() is empty
1277   if ( inbuf()->IsEmpty() ) {
1278     return true;
1279   }
1280
1281   // ask application to read data from our inbuf()
1282   return InvokeReadHandler();
1283 }
1284 bool SslConnection::TcpConnectionWriteHandler() {
1285   bool success = true;
1286
1287   if ( state() != CONNECTED ) {
1288     SslHandshake();
1289   } else if (read_blocked_ || read_blocked_on_write_) {
1290     // A partial SSL_read is in progress. DON'T use SSL_write! or it will
1291     //  corrupt internal ssl structures.
1292     // If we don't write anything to TCP, the write event will be stopped.
1293     // The ReadHandler will test outbuf non empty and re-enable write.
1294   } else {
1295     // ask application to write something in our outbuf()
1296     success = InvokeWriteHandler();
1297
1298     // Read from outbuf() --> write to SSL
1299     while ( !outbuf()->IsEmpty() ) {
1300       outbuf()->MarkerSet();
1301       char buf[1024];
1302       int32 read = outbuf()->Read(buf, sizeof(buf));
1303       WCONNLOG << "APP read: " << read << " bytes";
1304       CHECK_GT(read, 0); // SSL_write() behavior is undefined on 0 bytes
1305                          // besides, we've already checked !outbuf()->IsEmpty()
1306       int write = SSL_write(p_ssl_, buf, read);
1307       // write = the number of encrypted bytes written in BIO, always > read
1308       //WCONNLOG << "SSL write: " << write << " bytes"
1309       //            " => " << SslErrorName(SSL_get_error(p_ssl_, write));
1310       //WCONNLOG << "After SSL_write BIO contains data: " << SslPrintableBio(p_bio_write_);
1311       DCONNLOG << "SSL <<<< " << write << " bytes <<<< APP";
1312       write_blocked_on_read_ = false;
1313       if ( write <= 0 ) {
1314         outbuf()->MarkerRestore();
1315         int error = SSL_get_error(p_ssl_, write);
1316         ECONNLOG << "SSL_write failed, SSL_get_error => "
1317                  << error << " " << SslErrorName(error)
1318                  << " , " << SslLastError();
1319         switch(error) {
1320           case SSL_ERROR_WANT_READ:
1321             write_blocked_on_read_ = true;
1322             // we need more data in p_bio_read_ so we're just gonna wait for
1323             // ReadHandler to happen
1324             return true;
1325           case SSL_ERROR_WANT_WRITE:
1326             // p_bio_write_ is probably full.
1327             // But we use memory BIO, so it should never happen.
1328             break;
1329           default:
1330             ECONNLOG << "SSL_write fatal, closing connection";
1331             ForceClose();
1332             return false;
1333         };
1334         break;
1335       }
1336       // SSL_write was successful
1337       outbuf()->MarkerClear();
1338     }
1339   }
1340
1341   //WCONNLOG << "After all SSL_write BIO contains data: " << SslPrintableBio(p_bio_write_);
1342
1343   // Read from SSL --> write to TCP
1344   while ( BIO_pending(p_bio_write_) > 0 ) {
1345     char buf[1024];
1346     int32 read = BIO_read(p_bio_write_, buf, sizeof(buf));
1347     ssl_out_count_ += (read < 0 ? 0 : read);
1348     //WCONNLOG << "BIO read: " << read << " bytes, BIO total:"
1349     //            " in " << ssl_in_count_ << " / out " << ssl_out_count_;
1350     if ( read <= 0 ) {
1351       // should never happen, we use plain memory BIO
1352       ECONNLOG << "BIO_read failed, closing connection";
1353       ForceClose();
1354       return false;
1355     }
1356     int32 write = tcp_connection_->outbuf()->Write(buf, read);
1357     CHECK_EQ(write, read) << "Memory stream should be unlimited";
1358     DCONNLOG << "TCP <<<< " << read << " bytes <<<< SSL";
1359   }
1360   return success;
1361 }
1362 void SslConnection::TcpConnectionCloseHandler(int err, CloseWhat what) {
1363   if ( what != CLOSE_READ_WRITE ) {
1364     // ignore partial close of TCP layer
1365     SslShutdown();
1366     return;
1367   }
1368   // TCP completely closed
1369   WCONNLOG << "Underlying TcpConnection closed."
1370               " err: " << err << " , what: " << CloseWhatName(what);
1371   set_state(DISCONNECTED);
1372   InvokeCloseHandler(err, what);
1373 }
1374
1375
1376 void SslConnection::HandleTimeoutEvent(int64 timeout_id) {
1377 }
1378 //static
1379 const std::string& SslConnection::SslErrorName(int err) {
1380   switch ( err ) {
1381     case SSL_ERROR_NONE: {
1382       static const std::string str("SSL_ERROR_NONE"); return str; }
1383     case SSL_ERROR_SSL: {
1384       static const std::string str("SSL_ERROR_SSL"); return str; }
1385     case SSL_ERROR_WANT_READ: {
1386       static const std::string str("SSL_ERROR_WANT_READ"); return str; }
1387     case SSL_ERROR_WANT_WRITE: {
1388       static const std::string str("SSL_ERROR_WANT_WRITE"); return str; }
1389     case SSL_ERROR_WANT_X509_LOOKUP: {
1390       static const std::string str("SSL_ERROR_WANT_X509_LOOKUP"); return str; }
1391     case SSL_ERROR_SYSCALL: {
1392       static const std::string str("SSL_ERROR_SYSCALL"); return str; }
1393     case SSL_ERROR_ZERO_RETURN: {
1394       static const std::string str("SSL_ERROR_ZERO_RETURN"); return str; }
1395     case SSL_ERROR_WANT_CONNECT: {
1396       static const std::string str("SSL_ERROR_WANT_CONNECT"); return str; }
1397     case SSL_ERROR_WANT_ACCEPT: {
1398       static const std::string str("SSL_ERROR_WANT_ACCEPT"); return str; }
1399     default: {
1400       static const std::string str("UNKNOWN"); return str; }
1401   }
1402 }
1403 //static
1404 const std::string& SslConnection::SslWantName(int want) {
1405   switch ( want ) {
1406     case SSL_NOTHING: {
1407       static const std::string str("SSL_NOTHING"); return str; }
1408     case SSL_WRITING: {
1409       static const std::string str("SSL_WRITING"); return str; }
1410     case SSL_READING: {
1411       static const std::string str("SSL_READING"); return str; }
1412     case SSL_X509_LOOKUP: {
1413       static const std::string str("SSL_X509_LOOKUP"); return str; }
1414     default: {
1415       static const std::string str("UNKNOWN"); return str; }
1416   }
1417 }
1418 //static
1419 std::string SslConnection::SslLastError() {
1420   std::ostringstream oss;
1421   oss << "Error stack:" << std::endl;
1422   while ( true ) {
1423     int line;
1424     const char* file;
1425     const int e = ERR_get_error_line(&file, &line);
1426     if ( e == 0 ) {
1427       break;
1428     }
1429     char text[512] = {0,};
1430     ERR_error_string_n(e, text, sizeof(text));
1431     oss << "  " << text << ":" << file << ":" << line << std::endl;
1432   }
1433   oss << "And errno is " << GetLastSystemErrorDescription() << std::endl;
1434   return oss.str();
1435 }
1436 //static
1437 const void SslConnection::SslLibraryInit() {
1438   if ( !g_ssl_initialized ) {
1439     SSL_library_init();                      /* initialize library */
1440     SSL_load_error_strings();                /* readable error messages */
1441     ERR_load_SSL_strings();
1442     ERR_load_CRYPTO_strings();
1443     ERR_load_crypto_strings();
1444     //actions_to_seed_PRNG();
1445     g_ssl_initialized = true;
1446   }
1447 }
1448 //static
1449 X509* SslConnection::SslLoadCertificateFile(const string& filename) {
1450   // Load certificate file.
1451   FILE * f = ::fopen(filename.c_str(), "r");
1452   if ( f == NULL ) {
1453     LOG_ERROR << "Cannot find certificate file: [" << filename << "]";
1454     return NULL;
1455   }
1456   X509* certificate = NULL;
1457   if ( NULL == PEM_read_X509(f, &certificate, NULL, NULL) ) {
1458     LOG_ERROR << "PEM_read_X509 failed to load certificate from file: ["
1459               << filename << "]";
1460     fclose(f);
1461     return NULL;
1462   }
1463   fclose(f);
1464   CHECK_NOT_NULL(certificate);
1465   LOG_INFO << "SSL Loaded certificate file: [" << filename << "]";
1466   return certificate;
1467 }
1468 //static
1469 EVP_PKEY* SslConnection::SslLoadPrivateKeyFile(const string& filename) {
1470   // Load private key file.
1471   FILE * f = ::fopen(filename.c_str(),"r");
1472   if ( f == NULL ) {
1473     LOG_ERROR << "Cannot find key file: [" << filename << "]";
1474     return NULL;
1475   }
1476   EVP_PKEY* key = NULL;
1477   if ( NULL == PEM_read_PrivateKey(f, &key, NULL, NULL) ) {
1478     LOG_ERROR << "PEM_read_PrivateKey failed to load key from file: ["
1479              << filename << "]";
1480     fclose(f);
1481     return NULL;
1482   }
1483   fclose(f);
1484   CHECK_NOT_NULL(key);
1485   LOG_INFO << "SSL Loaded private key file: [" << filename << "]";
1486   return key;
1487 }
1488
1489 //static
1490 X509* SslConnection::SslDuplicateX509(const X509& src) {
1491   X509* s = const_cast<X509*>(&src);
1492   X509* d = X509_dup(s);
1493   CHECK_NOT_NULL(d);
1494   return d;
1495 }
1496 //static
1497 EVP_PKEY* SslConnection::SslDuplicateEVP_PKEY(const EVP_PKEY& src) {
1498   // TODO(cosmin): code copied from:
1499   //  http://www.mail-archive.com/openssl-users@openssl.org/msg17614.html
1500   EVP_PKEY* k = const_cast<EVP_PKEY*>(&src);
1501   k->references++;
1502   return k;
1503
1504   // TODO(cosmin): code copied from:
1505   //  http://www.mail-archive.com/openssl-users@openssl.org/msg17680.html
1506   /*
1507   EVP_PKEY* pKey = const_cast<EVP_PKEY*>(&src);
1508   EVP_PKEY* pDupKey = EVP_PKEY_new();
1509   RSA* pRSA = EVP_PKEY_get1_RSA(pKey);
1510   RSA* pRSADupKey = NULL;
1511   if( eKeyType == eKEY_PUBLIC ) // Determine the type of the "source" EVP_PKEY
1512     pRSADupKey = RSAPublicKey_dup(pRSA);
1513   else
1514     pRSADupKey = RSAPrivateKey_dup(pRSA);
1515   RSA_free(pRSA);
1516   EVP_PKEY_set1_RSA(pDupKey, pRSADupKey);
1517   RSA_free(pRSADupKey);
1518   return pDupKey;
1519   */
1520 }
1521 //static
1522 string SslConnection::SslPrintableBio(BIO* bio) {
1523   char * bio_data = NULL;
1524   long bio_data_size = BIO_get_mem_data(bio, &bio_data);
1525   return strutil::PrintableDataBufferHexa(bio_data, bio_data_size);
1526 }
1527 //static
1528 SSL_CTX* SslConnection::SslCreateContext(const string& certificate_filename,
1529                                          const string& key_filename) {
1530   SslLibraryInit();
1531   X509* ssl_certificate = NULL;
1532   if ( certificate_filename != "" ) {
1533     ssl_certificate = SslLoadCertificateFile(certificate_filename);
1534     if ( ssl_certificate == NULL ) {
1535       LOG_ERROR << "SslLoadCertificateFile failed for file: ["
1536                 << certificate_filename << "]";
1537       return NULL;
1538     }
1539   }
1540   EVP_PKEY* ssl_key = NULL;
1541   if ( key_filename != "" ) {
1542     ssl_key = SslLoadPrivateKeyFile(key_filename);
1543     if ( ssl_key == NULL ) {
1544       LOG_ERROR << "SslLoadPrivateKeyFile failed for file: ["
1545                 << key_filename << "]";
1546       X509_free(ssl_certificate);
1547       return NULL;
1548     }
1549   }
1550   SSL_CTX* ssl_ctx = SSL_CTX_new(SSLv23_method());
1551   if ( ssl_ctx == NULL ) {
1552     LOG_ERROR << "SSL_CTX_new failed: " << SslLastError();
1553     X509_free(ssl_certificate);
1554     EVP_PKEY_free(ssl_key);
1555     return NULL;
1556   }
1557   const long ssl_ctx_mode = SSL_CTX_get_mode(ssl_ctx);
1558   const long ssl_new_ctx_mode = ssl_ctx_mode |
1559                                 SSL_MODE_ENABLE_PARTIAL_WRITE |
1560                                 SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER;
1561   const long result = SSL_CTX_set_mode(ssl_ctx, ssl_new_ctx_mode);
1562   CHECK_EQ(result, ssl_new_ctx_mode);
1563
1564   // The server needs certificate and key.
1565   // The client may optionally use certificate and key.
1566   if ( ssl_certificate != NULL ) {
1567     if ( SSL_CTX_use_certificate(ssl_ctx, ssl_certificate) <= 0 ) {
1568       LOG_ERROR << "SSL_CTX_use_certificate failed: " << SslLastError();
1569       X509_free(ssl_certificate);
1570       EVP_PKEY_free(ssl_key);
1571       SSL_CTX_free(ssl_ctx);
1572       return NULL;
1573     }
1574     // Now the 'ssl_certificate' is part of 'context'.
1575     // It will get freed when the 'context' is freed.
1576     ssl_certificate = NULL;
1577   }
1578   if ( ssl_key != NULL ) {
1579     if ( SSL_CTX_use_PrivateKey(ssl_ctx, ssl_key) <= 0 ) {
1580       LOG_ERROR << "SSL_CTX_use_PrivateKey failed: " << SslLastError();
1581       X509_free(ssl_certificate);
1582       EVP_PKEY_free(ssl_key);
1583       SSL_CTX_free(ssl_ctx);
1584       return NULL;
1585     }
1586     // Now the 'ssl_key' is part of 'context'.
1587     // It will get freed when the 'context' is freed.
1588     ssl_key = NULL;
1589   }
1590
1591   return ssl_ctx;
1592 }
1593 //static
1594 void SslConnection::SslDeleteContext(SSL_CTX* ssl_ctx) {
1595   if ( ssl_ctx == NULL ) {
1596     return;
1597   }
1598   SSL_CTX_free(ssl_ctx);
1599 }
1600
1601
1602
1603 bool SslConnection::SslInitialize(bool is_server) {
1604   DCONNLOG << "Initializing SSL ...";
1605
1606   CHECK_NULL(p_ctx_);
1607   p_ctx_ = ssl_params_.ssl_context_;
1608   if ( p_ctx_ == NULL ) {
1609     ECONNLOG << "no SSL_CTX provided";
1610     return false;
1611   }
1612
1613   CHECK_NULL(p_ssl_);
1614   p_ssl_ = SSL_new(p_ctx_);
1615   CHECK_NOT_NULL(p_ssl_);
1616
1617   p_bio_read_ = BIO_new(BIO_s_mem());
1618   CHECK_NOT_NULL(p_bio_read_);
1619   p_bio_write_ = BIO_new(BIO_s_mem());
1620   CHECK_NOT_NULL(p_bio_write_);
1621
1622   SSL_set_bio(p_ssl_, p_bio_read_, p_bio_write_);
1623   if ( is_server ) {
1624     SSL_set_accept_state(p_ssl_);
1625   } else {
1626     SSL_set_connect_state(p_ssl_);
1627   }
1628   return true;
1629 }
1630 void SslConnection::SslClear() {
1631   if ( p_ssl_ ) {
1632     // SSL_free also deletes the associated BIOs
1633     SSL_free(p_ssl_);
1634     p_ssl_ = NULL;
1635     //BIO_free_all(p_bio_read_);
1636     p_bio_read_ = NULL;
1637     //BIO_free_all(p_bio_write_);
1638     p_bio_write_ = NULL;
1639   }
1640   if ( p_bio_read_ ) {
1641     BIO_free_all(p_bio_read_);
1642     p_bio_read_ = NULL;
1643   }
1644   if ( p_bio_write_ ) {
1645     BIO_free_all(p_bio_write_);
1646     p_bio_write_ = NULL;
1647   }
1648   // We do not own the SSL_CTX.
1649   // We only received this pointer by SslConnectionParams.
1650   p_ctx_ = NULL;
1651
1652   CHECK_NULL(p_ssl_);
1653   CHECK_NULL(p_bio_read_);
1654   CHECK_NULL(p_bio_write_);
1655   CHECK_NULL(p_ctx_);
1656   //TODO(cosmin): the SslConnection is not reusable, for the time being.
1657   //handshake_finished_ = false;
1658 }
1659 void SslConnection::SslHandshake() {
1660   if ( handshake_finished_ ) {
1661     return;
1662   }
1663   DCONNLOG << "SslHandshake...";
1664   if ( SSL_is_init_finished(p_ssl_) ) {
1665     // SSL completed the handshake but did we empty the ssl buffers?
1666     if ( BIO_pending(p_bio_write_) > 0 ) {
1667       DCONNLOG << "SslHandshake finished. Delaying connect handler... because"
1668                   " BIO_pending(p_bio_write_): " << BIO_pending(p_bio_write_);
1669       RequestWriteEvents(true);
1670       return;
1671     }
1672     DCONNLOG << "SslHandshake finished. Invoking connect handler.";
1673     handshake_finished_ = true;
1674     set_state(CONNECTED);
1675     selector_->RunInSelectLoop(NewCallback(this, &SslConnection::InvokeConnectHandler));
1676     return;
1677   }
1678   int result = SSL_do_handshake(p_ssl_);
1679   DCONNLOG << "SslHandshake SSL_do_handshake => " << result;
1680   if ( result < 1 ) {
1681     int error = SSL_get_error(p_ssl_, result);
1682     DCONNLOG << "SslHandshake SSL_get_error => "
1683              << error << " " << SslErrorName(error)
1684              << " , BIO_pending(p_bio_write_): " << BIO_pending(p_bio_write_)
1685              << " , BIO_pending(p_bio_read_): " << BIO_pending(p_bio_read_)
1686              << " , error: " << SslLastError();
1687     if ( error != SSL_ERROR_WANT_READ &&
1688          error != SSL_ERROR_WANT_WRITE ) {
1689         ECONNLOG << "SSL_do_handshake failed: " << SslErrorName(error)
1690                  << " , error: " << SslLastError();
1691         ForceClose();
1692         return;
1693     }
1694     // Handshake still in progress..
1695     RequestWriteEvents(true);
1696     // Next thing:
1697     //  - TcpConnectionWriteHandler will read from SSl --> write to TCP and
1698     //                              will call SslHandshake again.
1699     return;
1700   }
1701   // The handshake is completed for this endpoint(SSL_do_handshake returned 1).
1702   // But maybe we need to send some data to the other endpoint.
1703   int ssl_want = SSL_want(p_ssl_);
1704   DCONNLOG << "ssl_want: " << ssl_want << " " << SslWantName(ssl_want)
1705            << " , BIO_pending(p_bio_write_): " << BIO_pending(p_bio_write_)
1706            << " , BIO_pending(p_bio_read_): " << BIO_pending(p_bio_read_);
1707   RequestWriteEvents(true);
1708   // Next thing:
1709   //  - TcpConnectionWriteHandler will read from SSl --> write to TCP and
1710   //                              will call SslHandshake again.
1711 }
1712 void SslConnection::SslShutdown() {
1713   if ( p_ssl_ == NULL ) {
1714     return;
1715   }
1716   int result = SSL_shutdown(p_ssl_);
1717   if ( result < 0 ) {
1718     int error = SSL_get_error(p_ssl_, result);
1719     ECONNLOG << "SSL_shutdown => " << SslErrorName(error)
1720              << " , error: " << SslLastError();
1721   }
1722 }
1723 }
1724
1725 //////////////////////////////////////////////////////////////////////
Note: See TracBrowser for help on using the browser.