root/trunk/whisperlib/net/http/http_server_protocol.cc

Revision 7, 38.9 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "net/http/http_server_protocol.h"
33 #include "common/io/ioutil.h"
34
35 #define LOG_HTTP LOG_INFO_IF(dlog_level_) << name() << ": "
36
37 namespace http {
38
39 ServerParams::ServerParams()
40     : root_url_("http://localhost/"),
41       dlog_level_(false),
42       strict_request_headers_(true),
43       allow_all_methods_(false),
44       max_header_size_(2048),
45       max_body_size_(1 << 20),
46       max_chunk_size_(1 << 18),
47       max_num_chunks_(20),
48       worst_accepted_header_error_(Header::READ_NO_STATUS_REASON),
49       max_concurrent_connections_(800),
50       max_concurrent_requests_(10000),
51       max_concurrent_requests_per_connection_(10),
52       request_read_timeout_ms_(10000),
53       keep_alive_timeout_sec_(15),
54       reply_write_timeout_ms_(20000),
55       max_reply_buffer_size_(1 << 20),
56       reply_full_buffer_policy_(ServerParams::POLICY_CLOSE),
57       default_content_type_("text/html; charset=UTF-8"),
58       ignore_different_http_hosts_(true),
59       ready_signal_limit_(1<<15) {
60 }
61
62 ServerParams::ServerParams(
63     const char* root_url,
64     bool dlog_level,
65     bool strict_request_headers,
66     bool allow_all_methods,
67     int32 max_header_size,
68     int32 max_body_size,
69     int32 max_chunk_size,
70     int64 max_num_chunks,
71     http::Header::ParseError worst_accepted_header_error,
72     int32 max_concurrent_connections,
73     int32 max_concurrent_requests,
74     int32 max_concurrent_requests_per_connection,
75     int32 request_read_timeout_ms,
76     int32 keep_alive_timeout_sec,
77     int32 reply_write_timeout_ms,
78     int32 max_reply_buffer_size,
79     FlowControlPolicy reply_full_buffer_policy,
80     const char* default_content_type,
81     bool ignore_different_http_hosts,
82     int32 ready_signal_limit)
83     : root_url_(root_url),
84       dlog_level_(dlog_level),
85       strict_request_headers_(strict_request_headers),
86       allow_all_methods_(allow_all_methods),
87       max_header_size_(max_header_size),
88       max_body_size_(max_body_size),
89       max_chunk_size_(max_chunk_size),
90       max_num_chunks_(max_num_chunks),
91       worst_accepted_header_error_(worst_accepted_header_error),
92       max_concurrent_connections_(max_concurrent_connections),
93       max_concurrent_requests_(max_concurrent_requests),
94       max_concurrent_requests_per_connection_(
95           max_concurrent_requests_per_connection),
96       request_read_timeout_ms_(request_read_timeout_ms),
97       keep_alive_timeout_sec_(keep_alive_timeout_sec),
98       reply_write_timeout_ms_(reply_write_timeout_ms),
99       max_reply_buffer_size_(max_reply_buffer_size),
100       reply_full_buffer_policy_(reply_full_buffer_policy),
101       default_content_type_(default_content_type),
102       ignore_different_http_hosts_(ignore_different_http_hosts),
103       ready_signal_limit_(ready_signal_limit) {
104 }
105
106
107 //////////////////////////////////////////////////////////////////////
108 //
109 // http::ServerAcceptor
110 //
111 ServerAcceptor::ServerAcceptor(net::Selector* selector,
112                                net::NetFactory* net_factory,
113                                net::PROTOCOL net_protocol,
114                                const net::HostPort& local_addr,
115                                http::Server* server,
116                                FactoryCallback* factory_callback)
117     : selector_(selector),
118       acceptor_(net_factory->CreateAcceptor(net_protocol)),
119       local_addr_(local_addr),
120       server_(server),
121       factory_callback_(factory_callback) {
122   CHECK(factory_callback_->is_permanent());
123   acceptor_->SetFilterHandler(NewPermanentCallback(
124       this, &ServerAcceptor::AcceptorFilterHandler), true);
125   acceptor_->SetAcceptHandler(NewPermanentCallback(
126       this, &ServerAcceptor::AcceptorAcceptHandler), true);
127 }
128
129 ServerAcceptor::~ServerAcceptor() {
130   delete acceptor_;
131   acceptor_ = NULL;
132 }
133
134 bool ServerAcceptor::Listen() {
135   return acceptor_->Listen(local_addr_);
136 }
137 void ServerAcceptor::Close() {
138   acceptor_->Close();
139 }
140
141 bool ServerAcceptor::AcceptorFilterHandler(const net::HostPort& peer_address) {
142   LOG_INFO << acceptor_->PrefixInfo()
143            << "Filtering an accept from: " << peer_address
144            << " with: " << server_->num_connections() << " active connections"
145            << " and a max of: "
146            << server_->protocol_params()->max_concurrent_connections_;
147   if ( server_->num_connections() >=
148        server_->protocol_params()->max_concurrent_connections_ ) {
149     LOG_ERROR << acceptor_->PrefixInfo() << "Too many connections ! - refusing";
150     return false;
151   }
152   return true;
153 }
154 void ServerAcceptor::AcceptorAcceptHandler(net::NetConnection* net_connection) {
155   http::ServerProtocol* const protocol = new ServerProtocol(selector_, server_);
156   http::BaseServerConnection* const http_connection =
157     factory_callback_->Run(selector_, net_connection, protocol);
158   protocol->set_connection(http_connection);
159   server_->AddClient(protocol);
160 }
161
162 BaseServerConnection::BaseServerConnection(net::Selector* selector,
163                                            net::NetConnection* net_connection,
164                                            http::ServerProtocol* protocol)
165     : selector_(selector),
166       net_connection_(net_connection),
167       protocol_(protocol) {
168   net_connection_->SetReadHandler(NewPermanentCallback(
169       this, &BaseServerConnection::ConnectionReadHandler), true);
170   net_connection_->SetWriteHandler(NewPermanentCallback(
171       this, &BaseServerConnection::ConnectionWriteHandler), true);
172   net_connection_->SetCloseHandler(NewPermanentCallback(
173       this, &BaseServerConnection::ConnectionCloseHandler), true);
174 }
175 BaseServerConnection::~BaseServerConnection() {
176   delete net_connection_;
177   net_connection_ = NULL;
178 }
179 void BaseServerConnection::NotifyWrite() {
180   net_connection_->RequestWriteEvents(true);
181 }
182 bool BaseServerConnection::ConnectionReadHandler() {
183   if ( !protocol_->ProcessMoreData() ) {
184     ForceClose();
185   }
186   return true;
187 }
188 bool BaseServerConnection::ConnectionWriteHandler() {
189   protocol_->NotifyConnectionWrite();
190   return true;
191 }
192 void BaseServerConnection::ConnectionCloseHandler(
193     int err, net::NetConnection::CloseWhat what) {
194   if ( what != net::NetConnection::CLOSE_READ_WRITE ) {
195     net_connection_->FlushAndClose();
196     return;
197   }
198   // We have to call NotifyConnectionDeletion here because we need to stick to
199   // this original pattern:
200   //   [protocol] calls
201   //     -> BaseServerConnection::ForceClose
202   //        ->BaseServerConnection::ConnectionCloseHandler
203   //          ->[procotol]::NotifyConnectionDeletion
204   // When ForceClose completes the NotifyConnectionDeletion must be already run.
205   if ( protocol_ != NULL ) {
206     protocol_->NotifyConnectionDeletion();
207     protocol_ = NULL;
208   }
209   LOG_INFO << "HTTP SimpleServerConnection completely closed,"
210               " deleting in select loop..";
211   selector_->DeleteInSelectLoop(this);
212 }
213
214 //////////////////////////////////////////////////////////////////////
215 //
216 // http::Server
217 //
218
219 Server::Server(const char* name,
220                net::Selector* selector,
221                net::NetFactory* net_factory,
222                const ServerParams* protocol_params,
223                ServerAcceptor::FactoryCallback* conn_factory_callback,
224                int thread_pool_size)
225     : name_(name),
226       selector_(selector),
227       net_factory_(net_factory),
228       protocol_params_(protocol_params),
229       conn_factory_callback_(conn_factory_callback),
230       thread_pool_size_(thread_pool_size),
231       thread_pool_(NULL),
232       //port_(0),
233       acceptors_(),
234       default_processor_(NewPermanentCallback(
235                              this, &Server::DefaultRequestProcessor)),
236       error_processor_(NewPermanentCallback(
237                            this, &Server::ErrorRequestProcessor)) {
238   CHECK(conn_factory_callback_->is_permanent());
239 }
240
241 Server::~Server() {
242   for ( uint32 i = 0; i < acceptors_.size(); i++ ) {
243     delete acceptors_[i];
244   }
245   acceptors_.clear();
246
247   delete conn_factory_callback_;
248
249   // TODO(cpopescu): force close all -
250   //                 see how this interacts w/ processor stuff..
251   // CHECK(!protocols_.empty());
252   //
253   for ( ProcessorMap::const_iterator it = processors_.begin();
254         it != processors_.end(); ++it ) {
255     delete it->second;
256   }
257 }
258
259 void Server::AddAcceptor(net::PROTOCOL net_protocol,
260                          const net::HostPort& local_address) {
261   if ( thread_pool_size_ > 0 ) {
262     thread_pool_ = new thread::ThreadPool(
263         thread_pool_size_,
264         protocol_params_->max_concurrent_requests_);
265   }
266   acceptors_.push_back(new ServerAcceptor(selector_,
267                                           net_factory_,
268                                           net_protocol,
269                                           local_address,
270                                           this,
271                                           conn_factory_callback_));
272 }
273 void Server::StartServing() {
274   if ( thread_pool_size_ > 0 ) {
275     thread_pool_ = new thread::ThreadPool(
276         thread_pool_size_,
277         protocol_params_->max_concurrent_requests_);
278   }
279   for ( uint32 i = 0; i < acceptors_.size(); i++ ) {
280     ServerAcceptor& acceptor = *acceptors_[i];
281     CHECK(acceptor.Listen());
282     LOG_INFO << "HTTP Turned on listening on: " << acceptor.local_addr();
283   }
284 }
285 void Server::StopServing() {
286   for ( uint32 i = 0; i < acceptors_.size(); i++ ) {
287     ServerAcceptor& acceptor = *acceptors_[i];
288     acceptor.Close();
289   }
290 }
291
292 bool Server::RegisterProcessor(const string& path,
293                                ServerCallback* callback,
294                                bool is_public) {
295   const string reg_path(strutil::NormalizeUrlPath(path));
296   const ProcessorMap::iterator it = processors_.find(reg_path);
297   bool replaced = true;
298   if ( it != processors_.end() ) {
299     delete it->second;
300     replaced = true;
301     if ( callback == NULL ) {
302       LOG_INFO << "HTTP processor deleted for path: " << reg_path;
303       processors_.erase(it);
304     } else {
305       LOG_INFO << "HTTP processor replaced for path: " << reg_path;
306       CHECK(callback->is_permanent());
307       it->second = callback;
308     }
309   } else if ( callback == NULL ) {
310     LOG_INFO << "No HTTP processor found to be deleted for path: " << reg_path;
311   } else {
312     LOG_INFO << "HTTP listening on path: " << reg_path;
313     CHECK(callback->is_permanent());
314     processors_[reg_path] = callback;
315     LOG_INFO << "HTTP processor registered for path: " << reg_path;
316   }
317   const AllowedIpsMap::iterator it_ips = allowed_ips_.find(reg_path);
318   if ( it_ips != allowed_ips_.end() ) {
319     if ( callback == NULL || !is_public ) {
320       allowed_ips_.erase(it_ips);
321     } else {
322       it_ips->second = NULL;  // all alowed
323     }
324   } else if ( is_public ) {
325     allowed_ips_[reg_path] = NULL;  // all alowed
326   }
327   return replaced;
328 }
329
330 void Server::RegisterAllowedIpAddresses(const string& path,
331                                         const net::IpV4Filter* ips) {
332   const string reg_path(strutil::NormalizeUrlPath(path));
333   const AllowedIpsMap::iterator it_ips = allowed_ips_.find(reg_path);
334   if ( it_ips != allowed_ips_.end() ) {
335     it_ips->second = ips;
336   } else {
337     allowed_ips_[reg_path] = ips;
338   }
339 }
340
341 void Server::RegisterClientStreaming(const string& path,
342                                      bool is_client_streaming) {
343   const string reg_path(strutil::NormalizeUrlPath(path));
344   is_streaming_client_map_[reg_path] = is_client_streaming;
345 }
346
347 void Server::AddClient(ServerProtocol* proto) {
348   LOG_INFO << name() << " HTTP Adding client: " << proto->name()
349            << " to: " << protocols_.size() << " clients already serving";
350   const ProtocolSet::const_iterator it = protocols_.find(proto);
351   CHECK(it == protocols_.end());
352   protocols_.insert(proto);
353 }
354
355 void Server::DeleteClient(ServerProtocol* proto) {
356   LOG_INFO << name() << " HTTP Deleting client: " << proto->name()
357            << " from: " << protocols_.size() << " clients already serving";
358   CHECK(protocols_.erase(proto));
359 }
360
361 void Server::DefaultRequestProcessor(ServerRequest* req) {
362   req->request()->server_data()->Write(
363       "<h1>404 Not Found</h1>"
364       "The server has not found anything matching the Request-URI.");
365   req->ReplyWithStatus(http::NOT_FOUND);
366 }
367
368 void Server::ErrorRequestProcessor(ServerRequest* req) {
369   // See if we already got a status - else set a 500
370   http::HttpReturnCode code = req->request()->server_header()->status_code();
371   if ( code == http::UNKNOWN ) {
372     code = http::INTERNAL_SERVER_ERROR;
373   }
374   req->request()->server_data()->Write(
375       strutil::StringPrintf("<h1>%d %s</h1>",
376                             static_cast<int>(code),
377                             GetHttpReturnCodeDescription(code)));
378   req->ReplyWithStatus(code);
379 }
380
381
382 //////////////////////////////////////////////////////////////////////
383 //
384 // some utilitities:
385 namespace {
386 static bool VerifyHost(http::Header* client_header, URL* url) {
387   string host;
388   if ( client_header->FindField(kHeaderHost, &host) ) {
389     if ( url->has_host() &&
390          !strutil::StrCaseEqual(host, url->host()) ) {
391       return false;
392     }
393   }
394   return true;
395 }
396 }
397
398 void Server::SetSpecificProtocolParams(http::ServerRequest* req) {
399   req->request()->InitializeUrlFromClientRequest(&protocol_params_->root_url_);
400   URL* const url = req->request()->url();
401   if ( url ) {
402     string url_path(url->UrlUnescape(url->path().c_str(),
403                                      url->path().size()));
404     req->is_client_streaming_ = io::FindPathBased(&is_streaming_client_map_,
405                                                   url_path);
406   }
407   req->is_initialized_ = true;
408 }
409
410 void Server::ProcessRequest(http::ServerRequest* req) {
411   if ( req->server_callback_ != NULL ) {
412     PerformProcessing(req);
413     return;
414   }
415
416   Header* const hc = req->request()->client_header();
417   Header* const hs = req->request()->server_header();
418   URL* const url = req->request()->url();
419
420   // Should come from a registered protocol
421   DCHECK(protocols_.find(req->protocol()) != protocols_.end());
422
423   // See if we already got a status set - and reply
424   if ( hs->status_code() == http::UNKNOWN ) {
425     if ( hc->http_version() != VERSION_1_1 &&
426          hc->http_version() != VERSION_1_0 ) {
427       // Bad version
428       hs->set_status_code(HTTP_VERSION_NOT_SUPPORTED);
429     } else if ( hc->method() == METHOD_UNKNOWN ||
430                 url == NULL || !url->is_valid() ) {
431       // Bad request
432       hs->set_status_code(BAD_REQUEST);
433     } else if ( !protocol_params_->allow_all_methods_ &&
434                 hc->method() != METHOD_GET &&
435                 hc->method() != METHOD_HEAD &&
436                 hc->method() != METHOD_POST ) {
437       // Bad method.
438       hs->set_status_code(METHOD_NOT_ALLOWED);
439     } else if ( !protocol_params_->ignore_different_http_hosts_ &&
440                 !VerifyHost(hc, url) ) {
441       hs->set_status_code(BAD_REQUEST);
442     }
443   }
444   if ( hs->status_code() != http::UNKNOWN ) {
445     error_processor_->Run(req);
446     return;
447   }
448
449   // Accepted request - looks OK !
450   string url_path(url->UrlUnescape(url->path().c_str(),
451                                    url->path().size()));
452   string url_path2(url_path);
453   ServerCallback* const proc = io::FindPathBased(&processors_, url_path);
454   if ( proc == NULL ) {
455     req->server_callback_ = default_processor_;
456   } else {
457     // Check if the ip is authorized
458     const net::IpV4Filter* const
459         ipfilter = io::FindPathBased(&allowed_ips_, url_path2);
460     if ( ipfilter != NULL &&
461          !ipfilter->Matches(
462              net::IpAddress(
463                  req->protocol()->remote_address().ip_object())) ) {
464       hs->set_status_code(FORBIDDEN);
465       req->server_callback_ = error_processor_;
466       return;
467     } else {
468       req->server_callback_ = proc;
469     }
470   }
471   PerformProcessing(req);
472 }
473
474 void Server::PerformProcessing(http::ServerRequest* req) {
475   if ( thread_pool_ == NULL ) {
476     req->server_callback_->Run(req);
477   } else {
478     Closure* closure = NewCallback(req, &http::ServerRequest::ProcessRequest);
479     if ( !thread_pool_->jobs()->Put(closure, 0) ) {
480       delete closure;
481       Header* const hs = req->request()->server_header();
482       hs->set_status_code(SERVICE_UNAVAILABLE);
483       error_processor_->Run(req);
484       return;
485     }
486   }
487 }
488
489 //////////////////////////////////////////////////////////////////////
490 //
491 // http::ServerProtocol
492 //
493
494 ServerProtocol::ServerProtocol(net::Selector* selector, Server* server)
495     : dlog_level_(server->protocol_params()->dlog_level_),
496       selector_(selector),
497       server_(server),
498       protocol_params_(server->protocol_params()),
499       timeouter_(selector, NewPermanentCallback(
500                      this, &ServerProtocol::HandleTimeout)),
501       parser_("",
502               server->protocol_params()->max_header_size_,
503               server->protocol_params()->max_body_size_,
504               server->protocol_params()->max_chunk_size_,
505               server->protocol_params()->max_num_chunks_,
506               false, false,
507               server->protocol_params()->worst_accepted_header_error_),
508       connection_(NULL),
509       crt_recv_(NULL),
510       crt_send_(NULL) {
511 }
512
513 ServerProtocol::~ServerProtocol() {
514   CHECK(active_requests_.empty());
515   CHECK(connection_ == NULL);
516   timeouter_.UnsetAllTimeouts();
517   delete crt_recv_;
518   CHECK(crt_send_ == NULL);
519 }
520
521 void ServerProtocol::set_connection(BaseServerConnection* conn) {
522   CHECK(connection_ == NULL);
523   connection_ = conn;
524   local_address_ = conn->local_address();
525   remote_address_ = conn->remote_address();
526   name_ = strutil::StringPrintf(
527       "HTTP[%d](%s:%d)",
528       static_cast<int>(conn->local_address().port()),
529       conn->remote_address().ip_object().ToString().c_str(),
530       static_cast<int>(conn->remote_address().port()));
531   parser_.set_name(name_);
532   timeouter_.SetTimeout(kRequestTimeout,
533                         protocol_params_->request_read_timeout_ms_);
534 }
535
536 net::NetConnection* ServerProtocol::DetachFromFd(ServerRequest* req) {
537   LOG_INFO << name() << " - Detaching FD.";
538   CHECK(req->protocol() == this);
539   active_requests_.erase(req);
540   CHECK(active_requests_.empty())
541       << "Cannot detach if you accept multiple concurrent requests!";
542   //  TODO(cosmin): fix, original code is commented
543   // int fd = INVALID_FD_VALUE;
544   // if ( connection_ != NULL ) {
545   //   fd = connection_->GetFd();
546   //   connection_->DetachFromFd();
547   //   delete connection_;
548   // }
549   net::NetConnection * net_connection = connection_->DetachFromFd();
550   req->ResetProtocol();
551   NotifyConnectionDeletion();
552   return net_connection;
553 }
554
555 void ServerProtocol::NotifyConnectionDeletion() {
556   LOG_HTTP << "Client closed on us - closing all stuff.. ";
557   connection_ = NULL;
558   timeouter_.UnsetAllTimeouts();
559   server_->DeleteClient(this);
560   if ( active_requests_.empty() ) {
561     LOG_INFO << "HTTP :" << name()
562              << " No active requests - deleting client";
563     // LOG_HTTP << " No active requests - deleting client";
564     selector_->DeleteInSelectLoop(this);
565   } else {
566     LOG_INFO << "HTTP :" << name()
567              << " Waking all " << active_requests_.size()
568              << " pending requests.";
569     selector_->RunInSelectLoop(
570         NewCallback(this, &ServerProtocol::CloseAllActiveRequests));
571   }
572 }
573
574 void ServerProtocol::CloseAllActiveRequests() {
575   CHECK(connection_ == NULL);
576   vector<http::ServerRequest*> reqs;
577   for ( RequestSet::const_iterator it = active_requests_.begin();
578         it != active_requests_.end(); ++it ) {
579     reqs.push_back(*it);
580   }
581   for ( int i = 0; i < reqs.size(); ++i ) {
582     reqs[i]->SignalClosed();
583   }
584   if ( !active_requests_.empty() ) {
585     LOG_WARNING << " WARNING : one of the requests did not close correctly"
586                 << " so far -- keep an eye if they save it somewhere";
587   }
588 }
589
590 void ServerProtocol::HandleTimeout(int64 timeout_id) {
591   LOG_HTTP << "Timeout encountered - closing: " << timeout_id;
592   if ( connection_ ) {
593     connection_->ForceClose();
594     connection_ = NULL;
595   }
596 }
597
598 bool ServerProtocol::ProcessMoreData() {
599   if ( crt_recv_ == NULL ) {
600     parser_.Clear();
601     parser_.set_max_num_chunks(protocol_params_->max_num_chunks_);
602     crt_recv_ = new ServerRequest(this);
603   } else {
604     CHECK(!parser_.InFinalState());
605   }
606   int read_state = 0;
607   do {
608     read_state = parser_.ParseClientRequest(connection_->inbuf(),
609                                             crt_recv_->request());
610     LOG_HTTP << "In state: " << read_state << " - "
611              << parser_.ParseStateName();
612     crt_recv_->is_parsing_finished_ = parser_.InFinalState();
613
614     if ( read_state & http::RequestParser::HEADER_READ ) {
615       if ( !crt_recv_->is_initialized_ )
616         server_->SetSpecificProtocolParams(crt_recv_);
617
618       if ( !parser_.InFinalState() && crt_recv_->is_client_streaming() ) {
619         parser_.set_max_num_chunks(-1);
620         parser_.set_max_body_size(-1);
621         active_requests_.insert(crt_recv_);
622         server_->ProcessRequest(crt_recv_);
623         if ( crt_recv_ == NULL || crt_recv_->server_callback_ == NULL ) {
624           return false;
625         }
626         timeouter_.UnsetTimeout(kRequestTimeout);
627       }
628     }
629     if ( parser_.InFinalState() ) {
630       break;
631     }
632   } while ( read_state & http::RequestParser::CONTINUE );
633
634   // We need more data (and no error happened)
635   if ( !parser_.InFinalState() ) {
636     return true;
637   }
638   timeouter_.UnsetTimeout(kRequestTimeout);
639   if ( parser_.InErrorState() ) {
640     LOG_HTTP << "Fully parsed an error request " << parser_.ParseStateName();
641     PrepareErrorRequest(crt_recv_);
642     return true;
643   } else {
644     LOG_HTTP << "Fully parsed an OK request ["
645              << strutil::StrTrim(
646                  crt_recv_->request()->client_header()->ComposeFirstLine())
647              << "]";
648   }
649   active_requests_.insert(crt_recv_);
650   if ( active_requests_.size() >
651        protocol_params_->max_concurrent_requests_per_connection_ ) {
652       crt_recv_->request()->server_header()->PrepareStatusLine(
653           SERVICE_UNAVAILABLE);
654       crt_recv_->request()->server_data()->Write(
655           "Too many concurrent requests.");
656   }
657   server_->ProcessRequest(crt_recv_);
658   crt_recv_ = NULL;
659   return true;
660 }
661
662 void ServerProtocol::PrepareErrorRequest(ServerRequest* req) {
663   CHECK(parser_.InErrorState());
664   HttpReturnCode code = UNKNOWN;
665   switch ( parser_.parse_state() ) {
666     case RequestParser::ERROR_HEADER_TOO_LONG:
667     case RequestParser::ERROR_CHUNK_HEADER_TOO_LONG:
668       code = http::REQUEST_URI_TOO_LARGE;
669       break;
670     case RequestParser::ERROR_CONTENT_TOO_LONG:
671     case RequestParser::ERROR_CONTENT_GZIP_TOO_LONG:
672     case RequestParser::ERROR_CHUNK_TOO_LONG:
673     case RequestParser::ERROR_CHUNK_TOO_MANY:
674     case RequestParser::ERROR_CHUNK_CONTENT_GZIP_TOO_LONG:
675     case RequestParser::ERROR_CHUNK_TRAILER_TOO_LONG:
676       code = http::REQUEST_ENTITY_TOO_LARGE;
677       break;
678     case RequestParser::ERROR_HEADER_BAD:
679     case RequestParser::ERROR_HEADER_LINE:
680     case RequestParser::ERROR_CONTENT_GZIP_ERROR:
681     case RequestParser::ERROR_CONTENT_GZIP_UNFINISHED:
682     case RequestParser::ERROR_CHUNK_TRAIL_HEADER:
683     case RequestParser::ERROR_CHUNK_BAD_CHUNK_TERMINATION:
684     case RequestParser::ERROR_CHUNK_BIGGER_THEN_DECLARED:
685     case RequestParser::ERROR_CHUNK_UNFINISHED_GZIP_CONTENT:
686     case RequestParser::ERROR_CHUNK_CONTINUED_FINISHED_GZIP_CONTENT:
687     case RequestParser::ERROR_CHUNK_CONTENT_GZIP_ERROR:
688       code = http::BAD_REQUEST;
689       break;
690     case RequestParser::ERROR_TRANSFER_ENCODING_UNKNOWN:
691     case RequestParser::ERROR_CONTENT_ENCODING_UNKNOWN:
692       code = http::INTERNAL_SERVER_ERROR;
693       break;
694     case RequestParser::ERROR_CHUNK_BAD_CHUNK_LENGTH:
695     case RequestParser::ERROR_HEADER_BAD_CONTENT_LEN:
696       code = http::LENGTH_REQUIRED;
697       break;
698     default:
699       code = http::INTERNAL_SERVER_ERROR;
700   }
701   req->request()->server_header()->PrepareStatusLine(code);
702   req->request()->server_data()->Write(
703       strutil::StringPrintf("<p>Request Error: %d [detail: %s].</p>",
704                             static_cast<int>(code),
705                             parser_.ParseStateName()));
706   req->ReplyWithStatus(code);
707 }
708
709
710 bool ServerProtocol::PrepareResponseLocked(ServerRequest* req,
711                                            HttpReturnCode status) {
712   if ( crt_send_ == NULL ) {
713     crt_send_ = req;
714   }
715   bool should_close = false;
716   // We never orhpan a connection here - causes loads of trouble ..
717   if ( connection_ == NULL )
718     return true;
719   // Prepare the reply parameters
720   http::Header* const hs = req->request()->server_header();  // shortcut
721   hs->PrepareStatusLine(
722       status, req->request()->client_header()->http_version());
723   hs->SetChunkedTransfer(
724       req->request()->client_header()->http_version() >= VERSION_1_1 &&
725       req->is_server_streaming() &&
726       req->is_server_streaming_chunks());
727
728   // Set some necessary headers *if not set*
729   if ( !hs->HasField(http::kHeaderDate) ) {
730     time_t crt_time;
731     hs->SetDateField(http::kHeaderDate, time(&crt_time));
732   }
733   if ( !hs->HasField(http::kHeaderServer) ) {
734     hs->AddField(http::kHeaderServer, server_->name(), true);
735   }
736   if ( !hs->HasField(http::kHeaderContentType) ) {
737     hs->AddField(http::kHeaderContentType,
738                  req->protocol()->protocol_params()->default_content_type_,
739                  true);
740   }
741   // Determine keep-alive stuff
742   if ( protocol_params_->keep_alive_timeout_sec_ > 0 &&
743        crt_send_->request()->client_header()->HasField(
744            http::kHeaderKeepAlive) ) {
745     hs->AddField(http::kHeaderConnection, "Keep-Alive", true);
746     hs->AddField(
747         http::kHeaderKeepAlive,
748         strutil::IntToString(protocol_params_->keep_alive_timeout_sec_),
749         true);
750     crt_send_->is_keep_alive_ = true;
751   } else {
752     hs->AddField(http::kHeaderConnection, "Close", true);
753     crt_send_->is_keep_alive_ = false;
754   }
755   // Write the data out
756   if ( connection_->outbuf()->Size() >
757        protocol_params_->max_reply_buffer_size_ ) {
758     switch ( protocol_params_->reply_full_buffer_policy_ ) {
759       case ServerParams::POLICY_CLOSE:
760         LOG_HTTP << "connection outbuf full -> Closing";
761         should_close = true;
762         crt_send_->is_keep_alive_ = false;
763         break;
764       case ServerParams::POLICY_DROP_OLD_DATA:
765         LOG_HTTP << "connection outbuf full -> Dropping old data";
766         connection_->outbuf()->Clear();
767         crt_send_->request()->AppendServerReply(
768             connection_->outbuf(),
769             req->is_server_streaming(),
770             req->is_server_streaming_chunks());
771         break;
772       case ServerParams::POLICY_DROP_NEW_DATA:
773         LOG_HTTP << "connection outbuf full -> Dropping new data";
774         break;
775     }
776   } else {
777     crt_send_->request()->AppendServerReply(
778         connection_->outbuf(),
779         req->is_server_streaming(),
780         req->is_server_streaming_chunks());
781   }
782   return should_close;
783 }
784
785 void ServerProtocol::ReplyForRequestLocked(ServerRequest* req,
786                                            HttpReturnCode status) {
787   bool should_close = false;
788   if ( crt_send_ != NULL ) {
789     LOG_INFO << " Dumping invalid request received : " << req;
790     // TODO(miancule): This seems to fix the persistent HTTP bugs,
791     // but I'm pretty sure that this is somehow incorrect...
792     if ( req == crt_send_ ) {
793       crt_send_ = NULL;
794     }
795     selector_->DeleteInSelectLoop(req);
796     return;
797   }
798   crt_send_ = req;
799   if ( connection_ == NULL ) {
800     // Orphaned request:
801     crt_send_->is_orphaned_ = true;
802     should_close = true;
803   } else {
804     should_close = PrepareResponseLocked(req, status);
805   }
806   EndRequestProcessing(crt_send_,
807                        !req->is_server_streaming() || should_close);
808 }
809
810 void ServerProtocol::StreamDataLocked(ServerRequest* req, bool is_eos) {
811   if ( crt_send_ != req ) {
812     LOG_INFO << " Dumping invalid request received : " << req;
813     selector_->DeleteInSelectLoop(req);
814     return;
815   }
816   CHECK(crt_send_ == req);
817   if ( connection_ == NULL ) {
818     // Orphaned request:
819     req->is_orphaned_ = true;
820   } else {
821     // We need this protection to insure that we don't output empty chunks
822     // (which signal EOS) when we don't want. Because of multithreading,
823     // two calls to ContinueStreamingData from a worker thread
824     // can determine two calls to this function. In the first we output all,
825     // while in the second we will have nothing to output..
826     LOG_HTTP << "Preparing to stream: "
827              << req->request()->server_data()->Size();
828     if ( !req->request()->server_data()->IsEmpty() ) {
829       // Write the data out
830       if ( connection_->outbuf()->Size() >
831            protocol_params_->max_reply_buffer_size_ ) {
832         switch ( protocol_params_->reply_full_buffer_policy_ ) {
833           case ServerParams::POLICY_CLOSE:
834             LOG_HTTP <<" Buffer full - closing connection.";
835             is_eos = true;
836             req->is_keep_alive_ = false;
837             break;
838           case ServerParams::POLICY_DROP_OLD_DATA:
839             LOG_HTTP << "Buffer full - dropping "
840                      << connection_->outbuf()->Size()
841                      << " bytes from connection.";
842             connection_->outbuf()->Clear();
843             req->request()->AppendServerChunk(
844                 connection_->outbuf(), req->is_server_streaming_chunks());
845             break;
846           case ServerParams::POLICY_DROP_NEW_DATA:
847             break;
848         }
849       } else {
850         req->request()->AppendServerChunk(
851             connection_->outbuf(), req->is_server_streaming_chunks());
852       }
853       connection_->NotifyWrite();
854     }
855     if ( is_eos ) {
856       // The finishing touch
857       req->request()->AppendServerChunk(
858           connection_->outbuf(), req->is_server_streaming_chunks());
859       connection_->NotifyWrite();
860     }
861   }
862   EndRequestProcessing(req, is_eos);
863 }
864
865 void ServerProtocol::EndRequestProcessing(ServerRequest* req, bool is_eos) {
866   CHECK(crt_send_ == req);
867   if ( is_eos ) {
868     crt_send_ = NULL;
869   }
870   const bool should_close = is_eos && (!req->is_keep_alive() ||
871                                        req->is_orphaned());
872   if ( connection_ != NULL ) {
873     req->free_output_bytes_ = (protocol_params()->max_reply_buffer_size_ -
874                                connection_->outbuf()->Size() -
875                                protocol_params()->max_header_size_);
876     if ( !connection_->outbuf()->IsEmpty() ) {
877       timeouter_.SetTimeout(kWriteTimeout,
878                             protocol_params_->reply_write_timeout_ms_);
879       connection_->NotifyWrite();
880     }
881   }
882   if ( is_eos || req->is_orphaned() ) {
883     req->SignalClosed();
884   } else if ( connection_->outbuf()->IsEmpty() ) {
885     selector_->RunInSelectLoop(
886         NewCallback(req, &ServerRequest::SignalReady));
887   }
888
889   //////////////////////////////////////////////////////////////////////
890   //
891   // NOTE(cosmin) HERE WAS A BUG:
892   // ServerProtocol was deleted twice !
893   //   See the two DeleteInSelectLoop(this) lines below!
894   // Solution:
895   //   The first DeleteInSelectLoop is executed on conditions:
896   //      is_eos==true, connection==NULL, active_requests_.empty()==true
897   //   The second DeleteInSelectLoop is executed on conditions:
898   //      should_close==true(implies is_eos==true),
899   //      connection==NULL, active_requests_.empty()==true
900   //
901   //   So if the second DeleteInSelectLoop is to be executed then the
902   //   first DeleteInSelectLoop has surely been executed.
903   //
904   //   We Removed the second DeleteInSelectLoop (which was here)
905   //
906   //////////////////////////////////////////////////////////////////////
907
908   if ( is_eos ) {
909     LOG_HTTP
910         << "Request completed: ["
911         << strutil::StrTrim(
912             req->request()->client_header()->ComposeFirstLine()) << "] => ["
913         << strutil::StrTrim(
914             req->request()->server_header()->ComposeFirstLine()) << "]"
915         << " conn sent bytes so far: "
916         << (connection_ != NULL ?
917             connection_->count_bytes_written() : -1)
918         << " conn received bytes so far: "
919         << (connection_ != NULL ?
920             connection_->count_bytes_read() : -1)
921         << " should_close: " << should_close;
922     active_requests_.erase(req);
923     if ( crt_recv_ == req ) {
924       crt_recv_ = NULL;
925     }
926     selector_->DeleteInSelectLoop(req);
927     if ( active_requests_.empty() && connection_ == NULL ) {
928       selector_->DeleteInSelectLoop(this);
929     }
930   }
931   if ( should_close ) {
932     if ( connection_ != NULL ) {
933       LOG_HTTP << "Closing connection.";
934       connection_->FlushAndClose();
935     }
936     // [COSMIN] Removed: see BUG comment above.
937     // else if ( active_requests_.empty() ) {
938     //   LOG_HTTP << "Deleting client and protocol "
939     //           << "as connection is closed already.";
940     //   selector_->DeleteInSelectLoop(this);
941     // }
942   } else if ( is_eos && connection_ != NULL ) {
943     timeouter_.SetTimeout(kRequestTimeout,
944                           protocol_params_->keep_alive_timeout_sec_ * 1000);
945   }
946 }
947
948 void ServerProtocol::NotifyConnectionWrite() {
949   if ( crt_send_ != NULL ) {
950     bool is_ready = false;
951     {
952       synch::MutexLocker l(&crt_send_->request_mutex_);
953       crt_send_->free_output_bytes_ =
954           protocol_params()->max_reply_buffer_size_
955           - connection_->outbuf()->Size() - protocol_params()->max_header_size_;
956       is_ready = (crt_send_->free_output_bytes_ >
957                   protocol_params_->ready_signal_limit_);
958     }
959     if ( is_ready ) {
960       crt_send_->SignalReady();
961     }
962   }
963   if ( !connection_->outbuf()->IsEmpty() ) {
964     timeouter_.SetTimeout(kWriteTimeout,
965                           protocol_params_->reply_write_timeout_ms_);
966   } else {
967     timeouter_.UnsetTimeout(kWriteTimeout);
968   }
969 }
970
971 ///////////////////////////////////////////////////////////////////////
972 //
973 // SimpleServerConnection
974 //
975 SimpleServerConnection::SimpleServerConnection(
976     net::Selector* selector,
977     net::NetConnection * net_connection,
978     http::ServerProtocol* protocol)
979     : BaseServerConnection(selector, net_connection, protocol) {
980 }
981
982 SimpleServerConnection::~SimpleServerConnection() {
983 }
984
985 http::BaseServerConnection*
986 SimpleServerConnectionFactory(net::Selector* selector,
987                               net::NetConnection* net_connection,
988                               http::ServerProtocol* proto) {
989   return new SimpleServerConnection(selector, net_connection, proto);
990 }
991
992 //////////////////////////////////////////////////////////////////////
993
994 void ServerRequest::ReplyWithStatus(HttpReturnCode status) {
995   CHECK(!is_server_streaming_);
996   CHECK(protocol_ != NULL);
997   // As this can come from another thread, we need to move it all
998   // to the main thread.
999   if ( !protocol_->selector()->IsInSelectThread() ) {
1000     protocol_->selector()->RunInSelectLoop(
1001         NewCallback(this, &ServerRequest::ReplyWithStatus, status));
1002     return;
1003   }
1004   // We do not lock on this one - as the reply happens once
1005   synch::MutexLocker l(&request_mutex_);
1006   CHECK(!is_server_streaming_);
1007   is_server_streaming_ = false;
1008   protocol_->ReplyForRequestLocked(this, status);
1009 }
1010
1011 void ServerRequest::BeginStreamingData(HttpReturnCode status,
1012                                        Closure* ready_callback,
1013                                        Closure* closed_callback,
1014                                        bool chunked) {
1015   // As this can come from another thread, we need to move it all
1016   // to the main thread.
1017   if ( !protocol_->selector()->IsInSelectThread() ) {
1018     protocol_->selector()->RunInSelectLoop(
1019         NewCallback(this, &ServerRequest::BeginStreamingData,
1020                     status, ready_callback, closed_callback, chunked));
1021     return;
1022   }
1023   synch::MutexLocker l(&request_mutex_);
1024   CHECK(!is_server_streaming_)
1025       << "Bug - Multiple calls to BeginStreamingData";
1026   is_server_streaming_ = true;
1027   is_server_streaming_chunks_ = chunked;
1028   ready_callback_ = ready_callback;
1029   closed_callback_ = closed_callback;
1030   // We never orhpan a connection here - causes loads of trouble ..
1031   protocol_->PrepareResponseLocked(this, status);
1032 }
1033
1034 void ServerRequest::ContinueStreamingData(Closure* ready_callback) {
1035   // As this can come from another thread, we need to move it all
1036   // to the main thread.
1037   if ( !protocol_->selector()->IsInSelectThread() ) {
1038     protocol_->selector()->RunInSelectLoop(
1039         NewCallback(this, &ServerRequest::ContinueStreamingData,
1040                     ready_callback));
1041     return;
1042   }
1043   synch::MutexLocker l(&request_mutex_);
1044   if ( ready_callback_ != ready_callback &&
1045        ready_callback_ != NULL && !ready_callback_->is_permanent() ) {
1046     delete ready_callback_;
1047   }
1048   ready_callback_ = ready_callback;
1049   CHECK(is_server_streaming_)
1050       << "Bug - verify that your request is not orphaned.";
1051   protocol_->StreamDataLocked(this, false);
1052 }
1053
1054 void ServerRequest::EndStreamingData() {
1055   // As this can come from another thread, we need to move it all
1056   // to the main thread.
1057   if ( !protocol_->selector()->IsInSelectThread() ) {
1058     protocol_->selector()->RunInSelectLoop(
1059         NewCallback(this, &ServerRequest::EndStreamingData));
1060     return;
1061   }
1062   synch::MutexLocker l(&request_mutex_);
1063   CHECK(is_server_streaming_)
1064       << "Bug - verify that your request is not orphaned.";
1065   protocol_->StreamDataLocked(this, true);
1066 }
1067
1068 void ServerRequest::AnswerUnauthorizedRequest(
1069     net::UserAuthenticator* authenticator) {
1070   request()->server_header()->AddField(
1071       http::kHeaderWWWAuthenticate,
1072       strutil::StringPrintf("Basic realm=\"%s\"",
1073                             authenticator->realm().c_str()),
1074       true);
1075   ReplyWithStatus(http::UNAUTHORIZED);
1076 }
1077
1078 bool ServerRequest::AuthenticateRequest(net::UserAuthenticator* authenticator) {
1079   if ( authenticator == NULL ) {
1080     return true;
1081   }
1082   string user, passwd;
1083   if ( !request()->client_header()->GetAuthorizationField(
1084            &user, &passwd) ||
1085        (authenticator->Authenticate(user, passwd) !=
1086         net::UserAuthenticator::Authenticated) ) {
1087     return false;
1088   }
1089   return true;
1090 }
1091
1092 void ServerRequest::AuthenticateRequest(
1093     net::UserAuthenticator* authenticator,
1094     net::UserAuthenticator::AnswerCallback* answer_callback) {
1095   if ( authenticator == NULL ) {
1096     answer_callback->Run(net::UserAuthenticator::Authenticated);
1097     return;
1098   }
1099   string user, passwd;
1100   if ( !request()->client_header()->GetAuthorizationField(&user, &passwd) ) {
1101     answer_callback->Run(net::UserAuthenticator::MissingCredentials);
1102     return;
1103   }
1104   authenticator->Authenticate(user, passwd, answer_callback);
1105 }
1106 }
Note: See TracBrowser for help on using the browser.