root/trunk/whisperlib/net/http/http_client_protocol.cc

Revision 7, 29.7 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "net/http/http_client_protocol.h"
33 #include "common/base/errno.h"
34
35 #define LOG_HTTP \
36   LOG_INFO_IF(params_->dlog_level_) << " - HTTP[" << server_ << "]: "
37
38 namespace http {
39
40 //////////////////////////////////////////////////////////////////////
41 //
42 // ClientParams
43 //
44
45 ClientParams::ClientParams()
46     : version_(VERSION_1_1),
47       user_agent_("Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.1.6) "
48                   "Gecko/20060601 Firefox/2.0.0.6 (Ubuntu-edgy)"),
49       dlog_level_(false),
50       max_header_size_(1024),
51       max_body_size_(-1),
52       max_chunk_size_(1 << 18),
53       max_num_chunks_(-1),
54       accept_no_content_length_(true),
55       max_concurrent_requests_(1),
56       max_waiting_requests_(100),
57       default_request_timeout_ms_(120000),
58       connect_timeout_ms_(20000),
59       write_timeout_ms_(20000),
60       read_timeout_ms_(20000),
61       max_output_buffer_size_(1<<17),
62       keep_alive_sec_(300000) {
63 }
64
65 ClientParams::ClientParams(const string& user_agent,
66                            bool dlog_level,
67                            int32 max_header_size,
68                            int64 max_body_size,
69                            int64 max_chunk_size,
70                            int64 max_num_chunks,
71                            bool accept_no_content_length,
72                            int32 max_concurrent_requests,
73                            int32 max_waiting_requests,
74                            int32 default_request_timeout_ms,
75                            int32 connect_timeout_ms,
76                            int32 write_timeout_ms,
77                            int32 read_timeout_ms,
78                            int32 max_output_buffer_size,
79                            int32 keep_alive_sec)
80     : version_(VERSION_1_1),
81       user_agent_(user_agent),
82       dlog_level_(dlog_level),
83       max_header_size_(max_header_size),
84       max_body_size_(max_body_size),
85       max_chunk_size_(max_chunk_size),
86       max_num_chunks_(max_num_chunks),
87       accept_no_content_length_(accept_no_content_length),
88       max_concurrent_requests_(max_concurrent_requests),
89       max_waiting_requests_(max_waiting_requests),
90       default_request_timeout_ms_(default_request_timeout_ms),
91       connect_timeout_ms_(connect_timeout_ms),
92       write_timeout_ms_(write_timeout_ms),
93       read_timeout_ms_(read_timeout_ms),
94       max_output_buffer_size_(max_output_buffer_size),
95       keep_alive_sec_(keep_alive_sec) {
96 }
97
98 //////////////////////////////////////////////////////////////////////
99 //
100 // ClientRequest
101 //
102
103 ClientRequest::ClientRequest()
104     : error_(http::CONN_INCOMPLETE),
105       request_timeout_ms_(0),
106       request_id_(0),
107       is_pure_dumping_(false) {
108 }
109
110 ClientRequest::ClientRequest(HttpMethod http_method,
111                              const URL* url)
112     : error_(http::CONN_INCOMPLETE),
113       request_timeout_ms_(0),
114       request_id_(0),
115       is_pure_dumping_(false) {
116   request_.client_header()->PrepareRequestLine(
117       url->PathForRequest().c_str(), http_method);
118   request_.client_header()->AddField(http::kHeaderHost, url->host(), true);
119 }
120
121 ClientRequest::ClientRequest(HttpMethod http_method,
122                              const string& escaped_query_path)
123     : error_(http::CONN_INCOMPLETE),
124       request_timeout_ms_(0),
125       is_pure_dumping_(false) {
126   request_.client_header()->PrepareRequestLine(escaped_query_path.c_str(),
127                                                http_method);
128 }
129 ClientRequest::ClientRequest(
130     HttpMethod http_method,
131     const string& unescaped_path,
132     const vector< pair<string, string> >* unescaped_query_comp)
133     : error_(http::CONN_INCOMPLETE),
134       request_timeout_ms_(0),
135       is_pure_dumping_(false) {
136   string uri(URL::UrlEscape(unescaped_path));
137   if ( unescaped_query_comp != NULL ) {
138     uri += "?";
139     uri += EscapeQueryParameters(*unescaped_query_comp);
140   }
141   request_.client_header()->PrepareRequestLine(uri.c_str(),
142                                                http_method);
143 }
144
145 ClientRequest::ClientRequest(
146     HttpMethod http_method,
147     const string& unescaped_path,
148     const vector< pair<string, string> >* unescaped_query_comp,
149     const string& fragment)
150     : error_(http::CONN_INCOMPLETE),
151       request_timeout_ms_(0),
152       is_pure_dumping_(false) {
153   string uri(URL::UrlEscape(unescaped_path));
154   if ( unescaped_query_comp != NULL ) {
155     uri += "?";
156     uri += EscapeQueryParameters(*unescaped_query_comp);
157   }
158   uri += "#";
159   uri += fragment;
160   request_.client_header()->PrepareRequestLine(uri.c_str(),
161                                                http_method);
162 }
163
164 string ClientRequest::EscapeQueryParameters(
165     const vector< pair<string, string> >& unescaped_query_comp) {
166   string s;
167   for ( int i = 0; i < unescaped_query_comp.size(); ++i ) {
168     if ( i > 0 ) s += "&";
169     s += (URL::UrlEscape(unescaped_query_comp[i].first) + "=" +
170           URL::UrlEscape(unescaped_query_comp[i].second));
171   }
172   return s;
173 }
174
175 //////////////////////////////////////////////////////////////////////
176 //
177 // BaseClientConnection
178 //
179 BaseClientConnection::BaseClientConnection(net::Selector* selector,
180                                            net::NetFactory* net_factory,
181                                            net::PROTOCOL net_protocol)
182     : selector_(selector),
183       net_connection_(NULL),
184       protocol_(NULL) {
185   //net_connection_ = new net::TcpConnection(selector_);
186   net_connection_ = net_factory->CreateConnection(net_protocol);
187   net_connection_->SetReadHandler(NewPermanentCallback(
188       this, &BaseClientConnection::ConnectionReadHandler), true);
189   net_connection_->SetWriteHandler(NewPermanentCallback(
190       this, &BaseClientConnection::ConnectionWriteHandler), true);
191   net_connection_->SetConnectHandler(NewPermanentCallback(
192       this, &BaseClientConnection::ConnectionConnectHandler), true);
193   net_connection_->SetCloseHandler(NewPermanentCallback(
194       this, &BaseClientConnection::ConnectionCloseHandler), true);
195 }
196 BaseClientConnection::~BaseClientConnection() {
197   delete net_connection_;
198   net_connection_ = NULL;
199 }
200
201 void BaseClientConnection::NotifyWrite() {
202   net_connection_->RequestWriteEvents(true);
203 }
204
205 void BaseClientConnection::ConnectionConnectHandler() {
206   if ( !protocol_->NotifyConnected() ) {
207     ForceClose();
208   }
209 }
210
211 bool BaseClientConnection::ConnectionReadHandler() {
212   if ( !protocol_->NotifyConnectionRead() ) {
213     ForceClose();
214   }
215   return true;
216 }
217
218 bool BaseClientConnection::ConnectionWriteHandler() {
219   protocol_->NotifyConnectionWrite();
220   return true;
221 }
222 void BaseClientConnection::ConnectionCloseHandler(
223     int err, net::NetConnection::CloseWhat what) {
224   if ( what != net::NetConnection::CLOSE_READ_WRITE ) {
225     net_connection_->FlushAndClose();
226     return;
227   }
228   if ( protocol_ ) {
229     protocol_->NotifyConnectionDeletion();
230     protocol_ = NULL;
231   }
232   LOG_INFO << "HTTP BaseClientConnection completely closed,"
233               " deleting in select loop..";
234   selector_->DeleteInSelectLoop(this);
235 }
236
237 ///////////////////////////////////////////////////////////////////////
238 //
239 // SimpleClientConnection
240 //
241 SimpleClientConnection::SimpleClientConnection(net::Selector* selector,
242                                                net::NetFactory* net_factory,
243                                                net::PROTOCOL net_protocol)
244     : BaseClientConnection(selector, net_factory, net_protocol) {
245 }
246
247 SimpleClientConnection::~SimpleClientConnection() {
248 }
249
250 //////////////////////////////////////////////////////////////////////
251 //
252 //  BaseClientProtocol
253 //
254 BaseClientProtocol::BaseClientProtocol(
255     const ClientParams* params,
256     http::BaseClientConnection* connection,
257     net::HostPort server)
258     : name_(string("HTTP CLI[") + server.ToString() + "]"),
259       params_(params),
260       server_(server),
261       available_output_size_(params->max_output_buffer_size_),
262       conn_error_(http::CONN_INCOMPLETE),
263       connection_(connection),
264       current_request_(NULL),
265       timeouter_(connection->selector(),
266                  NewPermanentCallback(
267                      this, &BaseClientProtocol::HandleTimeoutEvent)),
268       parser_read_state_(0),
269       parser_(name_.c_str(),
270               params->max_header_size_,
271               params->max_body_size_,
272               params->max_chunk_size_,
273               params->max_num_chunks_,
274               false,   // accept_wrong_method
275               false,   // accept_wrong_version
276               params->accept_no_content_length_) {
277   connection_->set_protocol(this);
278   parser_.set_dlog_level(params->dlog_level_);
279 }
280
281 BaseClientProtocol::~BaseClientProtocol() {
282   if ( connection_ != NULL ) {
283     connection_->set_protocol(NULL);
284     connection_->ForceClose();
285     connection_ = NULL;
286   }
287   CHECK(current_request_ == NULL);
288 }
289
290 void BaseClientProtocol::Clear() {
291   if ( connection_ != NULL ) {
292     connection_->ForceClose();
293     connection_ = NULL;
294   }
295 }
296
297
298 bool BaseClientProtocol::NotifyConnected() {
299   conn_error_ = CONN_OK;
300   timeouter_.UnsetTimeout(kConnectTimeout);
301   SendRequestToServer(current_request_);
302   return true;
303 }
304
305 void BaseClientProtocol::SendRequestToServer(ClientRequest* req) {
306   // Write the first stuff..
307   if ( params_->keep_alive_sec_ > 0 ) {
308     req->request()->client_header()->AddField(
309         kHeaderKeepAlive,
310         strutil::StringPrintf("%d", static_cast<int>(params_->keep_alive_sec_)),
311         true);
312     req->request()->client_header()->AddField(
313         kHeaderConnection, "Keep-Alive", true);
314   }
315
316   if ( req->is_pure_dumping() ) {
317     req->request()->client_header()->AppendToStream(connection_->outbuf());
318     connection_->outbuf()->AppendStream(req->request()->client_data());
319   } else {
320     req->request()->AppendClientRequest(connection_->outbuf());
321   }
322   if ( params_->write_timeout_ms_ > 0 ) {
323     timeouter_.SetTimeout(kWriteTimeout, params_->write_timeout_ms_);
324   }
325   if ( params_->default_request_timeout_ms_ > 0 ||
326        req->request_timeout_ms() > 0 ) {
327     timeouter_.SetTimeout(kRequestTimeout + req->request_id(),
328                           max(params_->default_request_timeout_ms_,
329                               req->request_timeout_ms()));
330   } else if ( params_->read_timeout_ms_ > 0 ) {
331     timeouter_.SetTimeout(kWriteTimeout, params_->read_timeout_ms_);
332   }
333
334   connection_->NotifyWrite();
335   // TODO(cosmin): We should NEVER disable read events.
336   //connection_->RequestReadEvents(true);
337 }
338
339 void BaseClientProtocol::NotifyConnectionDeletion() {
340   LOG_HTTP << "Server closed on us - closing all stuff.. ";
341   connection_ = NULL;
342   timeouter_.UnsetAllTimeouts();
343   if ( conn_error_ == http::CONN_INCOMPLETE ||
344        conn_error_ == http::CONN_OK ) {
345     conn_error_ = http::CONN_CONNECTION_CLOSED;
346   }
347 }
348
349 bool BaseClientProtocol::NotifyConnectionRead() {
350   CHECK(current_request_ != NULL);
351   if ( parser_.InFinalState() ) {
352     LOG_HTTP << "Illegal data after complete response. Closing connection...";
353     connection_->ForceClose();
354     return true;
355   }
356
357   if ( params_->read_timeout_ms_ > 0 ) {
358     timeouter_.SetTimeout(kWriteTimeout, params_->read_timeout_ms_);
359   }
360   do {
361     parser_read_state_ = parser_.ParseServerReply(
362         connection_->inbuf(), current_request_->request());
363   } while ( parser_read_state_ & http::RequestParser::CONTINUE );
364   if ( !parser_.InFinalState() ) {
365     return true;
366   }
367   current_request_->set_error(CONN_OK);
368   timeouter_.UnsetAllTimeouts();
369
370   // We keep connection open anyway ..
371   return true;    // we keep it open anyway..
372 }
373
374 void BaseClientProtocol::NotifyConnectionWrite() {
375   available_output_size_ =
376       (params_->max_output_buffer_size_ - connection_->outbuf()->Size());
377   if ( available_output_size_ < 0 )
378     available_output_size_ = 0;
379   if ( params_->write_timeout_ms_ > 0 ) {
380     if ( connection_->outbuf()->IsEmpty() ) {
381       timeouter_.UnsetTimeout(kWriteTimeout);
382     } else {
383       timeouter_.SetTimeout(kWriteTimeout, params_->write_timeout_ms_);
384     }
385   }
386 }
387
388 void BaseClientProtocol::HandleTimeoutEvent(int64 timeout_id) {
389   if ( HandleTimeout(timeout_id) ) {
390     return;
391   }
392   LOG_HTTP << "Timeout encountered, id: " << timeout_id
393            << " closing connection";
394   connection_->ForceClose();
395 }
396
397 void BaseClientProtocol::PauseReading() {
398   if ( connection_ != NULL ) {
399     connection_->RequestReadEvents(false);
400   }
401 }
402 void BaseClientProtocol::UnpauseReading() {
403   if ( connection_ != NULL && current_request_ != NULL ) {
404     connection_->RequestReadEvents(true);
405   }
406 }
407
408 void BaseClientProtocol::PauseWriting() {
409   if ( connection_ != NULL ) {
410     connection_->RequestWriteEvents(false);
411   }
412 }
413 void BaseClientProtocol::UnpauseWriting() {
414   if ( connection_ != NULL && current_request_ != NULL ) {
415     connection_->RequestWriteEvents(true);
416   }
417 }
418
419
420 bool BaseClientProtocol::HandleTimeout(int64 timeout_id) {
421   if ( timeout_id >= kRequestTimeout ) {
422     conn_error_ = CONN_REQUEST_TIMEOUT;
423   } else {
424     switch ( timeout_id ) {
425       case kConnectTimeout: conn_error_ = CONN_CONNECT_TIMEOUT; break;
426       case kWriteTimeout:   conn_error_ = CONN_WRITE_TIMEOUT; break;
427       case kReadTimeout:    conn_error_ = CONN_READ_TIMEOUT; break;
428       default:              return true;
429     }
430   }
431   return false;
432 }
433
434 //////////////////////////////////////////////////////////////////////
435 //
436 //  ClientStreamingProtocol
437 //
438 ClientStreamingProtocol::ClientStreamingProtocol(
439     const ClientParams* params,
440     http::BaseClientConnection* connection,
441     net::HostPort server)
442     : BaseClientProtocol(params, connection, server),
443       source_stopped_(true),
444       streaming_callback_(NULL) {
445 }
446
447 ClientStreamingProtocol::~ClientStreamingProtocol() {
448   if ( current_request_ != NULL ) {
449     if ( conn_error_ != CONN_INCOMPLETE ) {
450       current_request_->set_error(conn_error_);
451     } else {
452       current_request_->set_error(CONN_CLIENT_CLOSE);
453     }
454     current_request_ = NULL;
455   }
456 }
457
458 void ClientStreamingProtocol::BeginStreaming(
459     ClientRequest* request,
460     StreamingCallback* streaming_callback) {
461   CHECK(current_request_ == NULL);
462   parser_read_state_ = 0;
463   parser_.Clear();
464   source_stopped_ = false;
465   current_request_ = request;
466   streaming_callback_ = streaming_callback;
467   CHECK(streaming_callback->is_permanent());
468   if ( params_->connect_timeout_ms_ > 0 ) {
469     timeouter_.SetTimeout(kConnectTimeout, params_->connect_timeout_ms_);
470   }
471   LOG_HTTP << " Connecting to " << server_;
472   connection_->Connect(server_);
473 }
474
475 void ClientStreamingProtocol::NotifyConnectionWrite() {
476   if ( source_stopped_ ) {
477     return;   // nothing to do ..
478   }
479   DCHECK(current_request_ != NULL);
480   BaseClientProtocol::NotifyConnectionWrite();
481   if ( available_output_size_ > params_->max_output_buffer_size_ / 2 &&
482        streaming_callback_ != NULL ) {
483     source_stopped_ = !streaming_callback_->Run(available_output_size_);
484   }
485   if ( available_output_size_ > 0 &&
486        !current_request_->request()->client_data()->IsEmpty() ) {
487     if ( current_request_->is_pure_dumping() ) {
488       if ( available_output_size_ <=
489            current_request_->request()->client_data()->Size() ) {
490         connection_->outbuf()->AppendStream(
491             current_request_->request()->client_data());
492       } else {
493         connection_->outbuf()->AppendStreamNonDestructive(
494             current_request_->request()->client_data(),
495             available_output_size_);
496         current_request_->request()->client_data()->Skip(
497             available_output_size_);
498       }
499     } else {
500       current_request_->request()->AppendClientChunk(
501           connection_->outbuf(), params_->max_chunk_size_);
502     }
503     if ( params_->write_timeout_ms_ > 0 ) {
504       timeouter_.SetTimeout(kWriteTimeout, params_->write_timeout_ms_);
505     }
506     // TODO(cpopescu): figure this out...
507     // If chunked (or even unchunked) we need to disable the request timeout
508     // as we expect to receive a stream
509     // if (current_request_->request()->server_header()->IsChunkedTransfer()) {
510     timeouter_.UnsetTimeout(kRequestTimeout + current_request_->request_id());
511     // }
512
513     connection_->NotifyWrite();
514   }
515 }
516
517 bool ClientStreamingProtocol::NotifyConnectionRead() {
518   bool ret = BaseClientProtocol::NotifyConnectionRead();
519   if ( current_request_->is_finalized() ) {
520     current_request_ = NULL;
521     LOG_HTTP << " Request is finalized - informing the streamer.";
522     if ( !source_stopped_ ) {
523       source_stopped_ = !streaming_callback_->Run(-1);
524     }
525   }
526   return ret && !source_stopped_;
527 }
528
529 void ClientStreamingProtocol::NotifyConnectionDeletion() {
530   BaseClientProtocol::NotifyConnectionDeletion();
531   if ( current_request_ ) {
532     current_request_->set_error(conn_error_);
533     if ( !source_stopped_ ) {
534       LOG_HTTP << " Connection is finalized - informing the streamer.";
535       source_stopped_ = !streaming_callback_->Run(-1);
536     }
537   }
538 }
539
540 //////////////////////////////////////////////////////////////////////
541 //
542 // ClientStreamReceiverProtocol
543 //
544 ClientStreamReceiverProtocol::ClientStreamReceiverProtocol(
545     const ClientParams* params,
546     http::BaseClientConnection* connection,
547     net::HostPort server)
548     : BaseClientProtocol(params, connection, server),
549       streaming_callback_(NULL) {
550 }
551
552 ClientStreamReceiverProtocol::~ClientStreamReceiverProtocol() {
553   if ( current_request_ != NULL ) {
554     if ( conn_error_ != CONN_INCOMPLETE ) {
555       current_request_->set_error(conn_error_);
556     } else {
557       current_request_->set_error(CONN_CLIENT_CLOSE);
558     }
559     current_request_->set_error(conn_error_);
560     current_request_ = NULL;
561   }
562 }
563
564 void ClientStreamReceiverProtocol::BeginStreamReceiving(
565     ClientRequest* request,
566     Closure* streaming_callback) {
567   CHECK(streaming_callback->is_permanent());
568   CHECK(current_request_ == NULL);
569   current_request_ = request;
570   streaming_callback_ = streaming_callback;
571   CHECK(streaming_callback_->is_permanent());
572   parser_read_state_ = 0;
573   parser_.Clear();
574   if ( params_->connect_timeout_ms_ > 0 ) {
575     timeouter_.SetTimeout(kConnectTimeout, params_->connect_timeout_ms_);
576   }
577   LOG_HTTP << " Connecting to " << server_;
578   connection_->Connect(server_);
579 }
580
581
582 bool ClientStreamReceiverProtocol::NotifyConnectionRead() {
583   const bool ret = BaseClientProtocol::NotifyConnectionRead();
584   if ( current_request_->is_finalized() ) {
585     current_request_ = NULL;
586     streaming_callback_->Run();
587   } else if ( ((parser_read_state_ & http::RequestParser::HEADER_READ) ==
588                http::RequestParser::HEADER_READ) ) {
589     // If chunked (or even unchunked) we need to disable the request timeout
590     // as we expect to receive a stream
591     // if (current_request_->request()->server_header()->IsChunkedTransfer()) {
592     timeouter_.UnsetTimeout(kRequestTimeout + current_request_->request_id());
593     // }
594
595     // Call for a new chunk of data - insure that the first happens
596     // after the header is completed.
597     streaming_callback_->Run();
598   }
599   return ret;
600 }
601
602 void ClientStreamReceiverProtocol::NotifyConnectionDeletion() {
603   BaseClientProtocol::NotifyConnectionDeletion();
604   if ( current_request_ ) {
605     current_request_->set_error(conn_error_);
606     current_request_ = NULL;
607     streaming_callback_->Run();
608   }
609 }
610
611 //////////////////////////////////////////////////////////////////////
612 //
613 // ClientProtocol
614 //
615
616 ClientProtocol::ClientProtocol(
617     const ClientParams* params,
618     http::BaseClientConnection* connection,
619     net::HostPort server)
620     : BaseClientProtocol(params, connection, server),
621       crt_id_(1LL),
622       active_requests_(params->max_concurrent_requests_),
623       callback_map_(params->max_concurrent_requests_ +
624                     params->max_waiting_requests_),
625       reading_request_(NULL) {
626 }
627
628 ClientProtocol::~ClientProtocol() {
629   // Things get cleared via:
630   // BaseClientProtocol::~BaseClientProtocol -->
631   //     --> close connection -->
632   //     --> NotifyConnectionDeletion
633 }
634
635 void ClientProtocol::SendRequest(
636     ClientRequest* request, Closure* done_callback) {
637   if ( connection_ == NULL ) {
638     request->set_error(CONN_CONNECTION_CLOSED);
639     done_callback->Run();
640     return;
641   }
642   request->set_request_id(crt_id_++);
643   if ( waiting_requests_.size() >= params_->max_waiting_requests_ ) {
644     request->set_error(CONN_TOO_MANY_REQUESTS);
645     connection_->selector()->RunInSelectLoop(done_callback);
646     return;
647   } else {
648     callback_map_[request] = done_callback;
649     waiting_requests_.push_back(request);
650   }
651   if ( connection_->state() == net::TcpConnection::DISCONNECTED ) {
652     if ( params_->connect_timeout_ms_ > 0 ) {
653       timeouter_.SetTimeout(kConnectTimeout,
654                             params_->connect_timeout_ms_);
655     }
656     LOG_HTTP << " Connecting to " << server_;
657     connection_->Connect(server_);
658   } else {
659     WriteRequestsToServer();
660   }
661 }
662
663 //////////////////////////////////////////////////////////////////////
664
665 bool ClientProtocol::NotifyConnected() {
666   // [COSMIN] Wouldn't it be nicer to call
667   //          BaseClientProtocol::NotifyConnected ??
668   conn_error_ = CONN_OK;
669   timeouter_.UnsetTimeout(kConnectTimeout);
670   WriteRequestsToServer();
671   return true;
672 }
673
674 bool ClientProtocol::NotifyConnectionRead() {
675   if ( reading_request_  == NULL ) {
676     parser_read_state_ = 0;
677     parser_.Clear();
678     parser_.set_max_num_chunks(params_->max_num_chunks_);
679     if ( params_->max_concurrent_requests_ == 1 &&
680          !active_requests_.empty() ) {
681       CHECK_EQ(active_requests_.size(), 1);
682       reading_request_ = active_requests_.begin()->second;
683     } else {
684       reading_request_ = new ClientRequest();
685     }
686   } else {
687     CHECK(!parser_.InFinalState());
688   }
689
690   do {
691     parser_read_state_ = parser_.ParseServerReply(
692         connection_->inbuf(), reading_request_->request());
693     if ( ((parser_read_state_ & http::RequestParser::HEADER_READ) ==
694           http::RequestParser::HEADER_READ) &&
695          reading_request_->request_id() == 0 ) {
696       IdentifyReadingRequest();
697     }
698   } while ( parser_read_state_ & http::RequestParser::CONTINUE );
699   // We need more data (and no error happened)
700   if ( !parser_.InFinalState() ) {
701     return true;
702   }
703   LOG_HTTP << "Request finished in state: "
704            << parser_.ParseStateName()
705            << " req[ " << reading_request_->name() << " ]";
706   reading_request_->set_error(
707       parser_.InErrorState()
708       ? http::CONN_HTTP_PARSING_ERROR
709       : http::CONN_OK);
710   http::Header::ParseError err =
711       reading_request_->request()->server_header()->parse_error();
712   if ( reading_request_->request_id() > 0 ) {
713     timeouter_.UnsetTimeout(kRequestTimeout +
714                             reading_request_->request_id());
715     CHECK(active_requests_.erase(reading_request_->request_id()));
716     const CallbackMap::iterator
717         it_cb = callback_map_.find(reading_request_);
718     CHECK(it_cb != callback_map_.end());
719     Closure* const closure = it_cb->second;
720     callback_map_.erase(it_cb);
721     closure->Run();
722   } else {
723     LOG_HTTP << "Deleting an orphaned request: "
724              << reading_request_->name();
725     delete reading_request_;
726   }
727   reading_request_ = NULL;
728   if ( parser_.InErrorState() ) {
729     LOG_WARNING << "HTTP[" << server_ << "]: "
730                 << "Exiting the connection due to broken parser state: "
731                 << parser_.ParseStateName()
732                 << " [ header parsing error: "
733                 << http::Header::ParseErrorName(err) << " ]";
734     conn_error_ = CONN_DEPENDENCY_FAILURE;
735     return false;
736   }
737   if ( !waiting_requests_.empty() ) {
738     // Putting more requests on line
739     WriteRequestsToServer();
740   }
741   return true;
742 }
743
744 void ClientProtocol::NotifyConnectionDeletion() {
745   BaseClientProtocol::NotifyConnectionDeletion();
746   ResolveAllRequestsWithError();
747 }
748
749 bool ClientProtocol::HandleTimeout(int64 timeout_id) {
750   if ( timeout_id >= kRequestTimeout ) {
751     const int64 req_id = timeout_id - kRequestTimeout;
752     const RequestMap::iterator it = active_requests_.find(req_id);
753     if ( it != active_requests_.end() ) {
754       const CallbackMap::iterator it_cb = callback_map_.find(it->second);
755       CHECK(it_cb != callback_map_.end());
756       Closure* const closure = it_cb->second;
757       it->second->set_error(CONN_REQUEST_TIMEOUT);
758       active_requests_.erase(it);
759       callback_map_.erase(it_cb);
760       closure->Run();
761     }
762     return true;
763   }
764   return BaseClientProtocol::HandleTimeout(timeout_id);
765 }
766
767 /////////////////////////////////////////////////////////////////////
768
769 bool ClientProtocol::IdentifyReadingRequest() {
770   CHECK(reading_request_ != NULL);
771   CHECK_EQ(reading_request_->request_id(), 0);
772   Header* const hs = reading_request_->request()->server_header();
773   const string req_id = hs->FindField(kHeaderXRequestId);
774   if ( !req_id.empty() ) {
775     char* endptr;
776     errno = 0;  // essential
777     const int64 id = strtoll(req_id.c_str(), &endptr, 10);
778     if ( errno || endptr == NULL || *endptr != '\0' ) {
779       LOG_WARNING << "HTTP[" << server_ << "]: "
780                   << " - Invalid request-id received: \n"
781                   << hs->ToString();
782       reading_request_->set_request_id(-1);
783     } else {
784       const RequestMap::const_iterator it = active_requests_.find(id);
785       if ( it == active_requests_.end() ) {
786         LOG_WARNING  << "HTTP[" << server_ << "]: "
787                      << " Orphaned response received from the server:\n"
788                      << hs->ToString();
789         reading_request_->set_request_id(-1);
790       } else {
791         Header* const hs2 = it->second->request()->server_header();
792         hs2->CopyHeaders(*hs, true);
793         hs2->set_http_version(hs->http_version());
794         hs2->set_status_code(hs->status_code());
795         hs2->set_reason(hs->reason());
796         hs2->set_first_line_type(hs->first_line_type());
797         it->second->request()->server_data()->AppendStream(
798             reading_request_->request()->server_data());
799         delete reading_request_;
800         reading_request_ = it->second;
801       }
802     }
803   } else {
804     LOG_WARNING << "HTTP[" << server_ << "]: "
805                 << " - Expecting a request-id header from "
806                 << " the server and got noting (header:\n"
807                 << hs->ToString();
808     reading_request_->set_request_id(-1);
809   }
810   CHECK_NE(reading_request_->request_id(), 0);
811   return reading_request_->request_id() > 0;
812 }
813
814 void ClientProtocol::ResolveAllRequestsWithError() {
815   timeouter_.UnsetAllTimeouts();
816   vector<Closure*> to_resolve;
817   LOG_HTTP << "Resolving all pending requests [active:"
818            << active_requests_.size()
819            << " waiting:" << waiting_requests_.size()
820            << " with error: "
821            << http::ClientErrorName(conn_error_);
822   if ( !active_requests_.empty() ) {
823     CHECK_NE(conn_error_, http::CONN_INCOMPLETE);
824     for ( RequestMap::const_iterator it = active_requests_.begin();
825           it != active_requests_.end(); ++it ) {
826       it->second->set_error(conn_error_);
827       to_resolve.push_back(callback_map_[it->second]);
828     }
829   }
830   for ( RequestsQueue::const_iterator it = waiting_requests_.begin();
831         it != waiting_requests_.end(); ++it ) {
832     (*it)->set_error(http::CONN_DEPENDENCY_FAILURE);
833     to_resolve.push_back(callback_map_[*it]);
834   }
835   waiting_requests_.clear();
836   active_requests_.clear();
837   callback_map_.clear();
838   for ( int i = 0; i < to_resolve.size(); ++i ) {
839     to_resolve[i]->Run();
840   }
841 }
842
843 void ClientProtocol::WriteRequestsToServer() {
844   while ( !waiting_requests_.empty() &&
845           active_requests_.size() < params_->max_concurrent_requests_ ) {
846     current_request_ = waiting_requests_.front();
847     waiting_requests_.pop_front();
848
849     Header* const hc = current_request_->request()->client_header();
850     CHECK(current_request_ != NULL);
851     hc->AddField(
852         kHeaderXRequestId,
853         strutil::StringPrintf(
854             "%lld", static_cast<long long int>(current_request_->request_id())),
855         true);
856     // TODO(cpopescu): figure out timeouts !!
857     CHECK(!hc->IsChunkedTransfer())
858         << " No chuncked transer encoding for client requests "
859         << " w/ ClientProtocol please.";
860     LOG_HTTP << " Sending request to server: "
861              << current_request_->name();
862     SendRequestToServer(current_request_);
863     CHECK(active_requests_.insert(
864               make_pair(current_request_->request_id(),
865                         current_request_)).second);
866     current_request_ = NULL;
867   }
868 }
869
870 //////////////////////////////////////////////////////////////////////
871
872 const char* ClientErrorName(ClientError err) {
873   switch ( err ) {
874     CONSIDER(CONN_INCOMPLETE);
875     CONSIDER(CONN_OK);
876     CONSIDER(CONN_CONNECT_ERROR);
877     CONSIDER(CONN_CONNECT_TIMEOUT);
878     CONSIDER(CONN_WRITE_TIMEOUT);
879     CONSIDER(CONN_READ_TIMEOUT);
880     CONSIDER(CONN_CONNECTION_CLOSED);
881     CONSIDER(CONN_REQUEST_TIMEOUT);
882     CONSIDER(CONN_DEPENDENCY_FAILURE);
883     CONSIDER(CONN_TOO_MANY_REQUESTS);
884     CONSIDER(CONN_HTTP_PARSING_ERROR);
885     CONSIDER(CONN_CLIENT_CLOSE);
886     CONSIDER(CONN_TOO_MANY_RETRIES);
887   }
888   return "UNKNOWN";
889 }
890 }
Note: See TracBrowser for help on using the browser.