root/trunk/whisperlib/net/rpc/lib/server/rpc_http_server.cc

Revision 7, 22.7 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "common/base/errno.h"
33 #include "net/rpc/lib/server/rpc_http_server.h"
34 #include "common/io/file/file_input_stream.h"
35
36 //////////////////////////////////////////////////////////////////////
37
38 // TODO(cpopescu): make this a parameter rather than a flag ...
39 DEFINE_string(rpc_js_form_path,
40               "",
41               "If specified, we export RPC accesing via auto-generated forms "
42               "and we read the extra js sources from here (we need files "
43               "from //depot/.../libs/net/rpc/js/");
44
45 DEFINE_bool(rpc_enable_http_get,
46             false,
47             "If enabled we process HTTP GET requests. By default only"
48             " HTTP POST requests are enabled.");
49
50 //////////////////////////////////////////////////////////////////////
51
52 namespace rpc {
53 HttpServer::HttpServer(net::Selector* selector,
54                        http::Server* server,
55                        net::UserAuthenticator* authenticator,
56                        const string& path,
57                        bool auto_js_forms,
58                        bool is_public,
59                        int max_concurrent_requests,
60                        const string& ip_class_restriction)
61     : selector_(selector),
62       authenticator_(authenticator),
63       accepted_clients_(
64           ip_class_restriction.empty()
65           ? NULL
66           : net::IpClassifier::CreateClassifier(ip_class_restriction)),
67       path_(path),
68       max_concurrent_requests_(max_concurrent_requests),
69       current_requests_(0) {
70   if ( auto_js_forms && !FLAGS_rpc_js_form_path.empty() ) {
71     const string rpc_base_js = io::FileInputStream::ReadFileOrDie(
72         (FLAGS_rpc_js_form_path + "/rpc_base.js").c_str());
73     const string rpc_standard_js = io::FileInputStream::ReadFileOrDie(
74         (FLAGS_rpc_js_form_path + "/rpc_standard.js").c_str());
75     js_prolog_ = ("<script language=\"JavaScript1.1\">\n" +
76                   rpc_base_js + "\n" +
77                   rpc_standard_js + "\n" +
78                   "</script>\n");
79   }
80   server->RegisterProcessor(
81       path,
82       NewPermanentCallback(this, &HttpServer::ProcessRequest), is_public);
83 }
84
85 HttpServer::~HttpServer() {
86   for ( MirrorMap::const_iterator it_mirror = mirror_map_.begin();
87         it_mirror != mirror_map_.end();
88         ++it_mirror ) {
89     delete it_mirror->second;
90   }
91   delete accepted_clients_;
92   LOG_INFO_IF(current_requests_ > 0)
93       << " Exiting the RPC server with current_requests_ "
94       << "requests in processing !!";
95 }
96
97 void HttpServer::RegisterServiceMirror(const string& sub_path_src,
98                                        const string& sub_path_dest) {
99   string normalized_sub_path_src(strutil::NormalizePath(sub_path_src + '/'));
100   string normalized_sub_path_dest(strutil::NormalizePath(sub_path_dest + '/'));
101   MirrorMap::iterator it_mirror = mirror_map_.find(normalized_sub_path_src);
102   if ( it_mirror != mirror_map_.end() ) {
103     it_mirror->second->insert(normalized_sub_path_dest);
104   } else {
105     hash_set<string>* s = new hash_set<string>;
106     s->insert(normalized_sub_path_dest);
107     mirror_map_.insert(make_pair(normalized_sub_path_src, s));
108   }
109   reverse_mirror_map_[normalized_sub_path_dest] = normalized_sub_path_src;
110 }
111
112 bool HttpServer::RegisterService(const string& sub_path,
113                                  rpc::ServiceInvoker* invoker) {
114   const string full_path(sub_path + invoker->GetName());
115   ServicesMap::const_iterator it = services_.find(full_path);
116   if ( it != services_.end() ) {
117     LOG_WARNING << " Tried to double register " << full_path;
118     return false;
119   }
120   LOG_INFO << " Registering: " << invoker->ToString() << " on path: "
121            << full_path;
122   services_.insert(make_pair(full_path, invoker));
123   return true;
124 }
125 bool HttpServer::UnregisterService(const string& sub_path,
126                                    rpc::ServiceInvoker* invoker) {
127   const string full_path(sub_path + invoker->GetName());
128   ServicesMap::iterator it = services_.find(full_path);
129   if ( it == services_.end() ) {
130     LOG_WARNING << " Tried to unregister " << full_path
131                 << " not found.";
132     return false;
133   }
134   if ( invoker != it->second ) {
135     LOG_WARNING << " Different service registered under " << full_path
136                 << invoker->ToString() << " vs. " << it->second->ToString()
137                 << " --> " << invoker << " vs. " << it->second;
138     return false;
139   }
140   string normalized_sub_path(strutil::NormalizePath(sub_path + '/'));
141   MirrorMap::iterator it_mirror = mirror_map_.find(normalized_sub_path);
142   if ( it_mirror != mirror_map_.end() ) {
143     for ( hash_set<string>::const_iterator it_rev = it_mirror->second->begin();
144           it_rev != it_mirror->second->end(); ++it_rev ) {
145       reverse_mirror_map_.erase(*it_rev);
146     }
147     delete it_mirror->second;
148     mirror_map_.erase(it_mirror);
149     LOG_INFO << " RPC forward mapping removed for " << normalized_sub_path;
150   }
151   ReverseMirrorMap::iterator
152       it_rev = reverse_mirror_map_.find(normalized_sub_path);
153   if ( it_rev != reverse_mirror_map_.end() ) {
154     it_mirror = mirror_map_.find(it_rev->second);
155     if ( it_mirror != mirror_map_.end() ) {
156       it_mirror->second->erase(normalized_sub_path);
157       if ( it_mirror->second->empty() ) {
158         delete it_mirror->second;
159         mirror_map_.erase(it_mirror);
160       }
161     }
162     LOG_INFO << " RPC backward mapping removed for " << normalized_sub_path
163              << " from " << it_rev->second;
164     reverse_mirror_map_.erase(it_rev);
165   }
166   services_.erase(full_path);
167   return true;
168 }
169 bool HttpServer::RegisterService(rpc::ServiceInvoker* invoker) {
170   return RegisterService("", invoker);
171 }
172
173 bool HttpServer::UnregisterService(rpc::ServiceInvoker* invoker) {
174   return UnregisterService("", invoker);
175 }
176
177 // Small helper
178 void InvokeCall(rpc::ServiceInvoker* invoker, rpc::Query* q) {
179   CHECK(invoker->Call(q)) << " We do not accept failing calls here";
180 }
181
182 void HttpServer::ProcessRequest(http::ServerRequest* req) {
183   const string req_id =
184       req->request()->client_header()->FindField(http::kHeaderXRequestId);
185   if ( !req_id.empty() ) {
186     req->request()->server_header()->AddField(
187         http::kHeaderXRequestId, req_id, true);
188   }
189   if ( current_requests_ >= max_concurrent_requests_ ) {
190     LOG_ERROR << " Too many concurrent requests: " << current_requests_;
191     // TODO(cpopescu): stats
192     req->ReplyWithStatus(http::INTERNAL_SERVER_ERROR);
193     return;
194   }
195   if ( accepted_clients_ != NULL &&
196        !accepted_clients_->IsInClass(req->remote_address().ip_object()) ) {
197     // TODO(cpopescu): stats
198     req->request()->server_data()->Write("Bad IP");
199     req->ReplyWithStatus(http::UNAUTHORIZED);
200     return;
201   }
202
203
204   ///////////////////////////////////////////////////////////////
205   //
206   // Split url into: service_path / service_name / method_name
207   //
208   const string url_path = URL::UrlUnescape(req->request()->url()->path());
209
210   std::string sub_path = url_path.substr(path_.size());
211   sub_path = strutil::StrTrimChars(sub_path, "/");
212   LOG_DEBUG << "url: [" << url_path << "] , path_: [" << path_
213             << "], sub_path: [" << sub_path << "]";
214
215   // sub_path should be "a/b/c/service_name/method_name"
216   //
217   // We extract:
218   //   service_full_path = "a/b/c/service_name"
219   //   service_path = "a/b/c";
220   //   service_name = "service_name";
221   //   method_name = "method_name";
222   //
223   string service_full_path;
224   string* method_name = new string();
225   int last_slash_index = sub_path.rfind('/');
226   if ( last_slash_index == std::string::npos ) {
227     service_full_path = sub_path;
228     *method_name = "";
229   } else {
230     service_full_path = sub_path.substr(0, last_slash_index);
231     *method_name = sub_path.substr(last_slash_index + 1);
232   }
233
234   string service_path;
235   string* service_name = new string();
236   last_slash_index = service_full_path.rfind('/');
237   if ( last_slash_index == std::string::npos ) {
238     service_path = "";
239     *service_name = service_full_path;
240   } else {
241     service_path = service_full_path.substr(0, last_slash_index);
242     *service_name = service_full_path.substr(last_slash_index + 1);
243   }
244
245   ////////////////////////////////////////////////////////////////////
246
247   // maybe process auto-forms request
248   do {
249     if ( js_prolog_.empty() ||
250          url_path.length() <= path_.length() ) {
251       break;
252     }
253
254     if ( !strutil::StrStartsWith(*method_name, "__form") ) {
255       break;
256     }
257
258     ServicesMap::const_iterator it = services_.find(service_full_path);
259     rpc::ServiceInvoker* service = (it == services_.end() ? NULL : it->second);
260     if ( !service ) {
261       req->request()->server_data()->Write(
262           "<h1>404 Not Found</h1>"
263           " could not find RPC service: [" + service_full_path + "] on "
264           "suburl: [" + sub_path +"]");
265       req->request()->server_data()->Write(
266           "<br/> Known services are: {");
267       for ( ServicesMap::const_iterator it = services_.begin();
268             it != services_.end(); ) {
269         req->request()->server_data()->Write(it->first.c_str());
270         ++it;
271         if ( it != services_.end() ) {
272           req->request()->server_data()->Write(", ");
273         }
274       }
275       req->request()->server_data()->Write("}");
276       req->ReplyWithStatus(http::NOT_FOUND);
277       delete service_name;
278       delete method_name;
279       return;
280     }
281
282     const string auto_form_data = service->GetTurntablePage(
283         strutil::JoinPaths(path_, service_path), *method_name);
284     req->request()->server_data()->Write("<html>\n");
285     req->request()->server_data()->Write(js_prolog_);
286     req->request()->server_data()->Write("\n<body>\n");
287     req->request()->server_data()->Write(auto_form_data);
288     req->request()->server_data()->Write("\n</body></html>\n");
289     req->Reply();
290     delete service_name;
291     delete method_name;
292     return;
293   } while ( false );
294
295   req->AuthenticateRequest(
296       authenticator_,
297       NewCallback(this, &HttpServer::ProcessAuthenticatedRequest,
298                   req, service_name, method_name));
299 }
300
301 void HttpServer::ProcessAuthenticatedRequest(
302     http::ServerRequest* req,
303     string* service_name, string* method_name,
304     net::UserAuthenticator::Answer auth_answer) {
305   if ( auth_answer != net::UserAuthenticator::Authenticated ) {
306     LOG_INFO << "Unauthenticated request: " << req->ToString()
307              << " answer: " << auth_answer;
308     req->AnswerUnauthorizedRequest(authenticator_);
309     delete service_name;
310     delete method_name;
311     return;
312   }
313   // Authenticated - OK !
314
315   // receives the RPC packet
316   rpc::Message p;
317
318   // will be set to appropriate codec
319   rpc::Codec* codec = NULL;
320
321   URL * url = req->request()->url();
322   const string url_path = URL::UrlUnescape(req->request()->url()->path());
323   std::string sub_path = url_path.substr(path_.size());
324   sub_path = strutil::StrTrimChars(sub_path, "/");
325
326   ////////////////////////////////////////////////////////////////////
327   //
328   // read RPC packet from HTTP GET
329   //
330   if ( req->request()->client_header()->method() == http::METHOD_GET ) {
331
332     // is HTTP GET enabled for rpc ?
333     //
334     if ( !FLAGS_rpc_enable_http_get ) {
335       LOG_ERROR << "Bad request method: "
336                 << req->request()->client_header()->method()
337                 << ". RPC HTTP Get not enabled.";
338       req->ReplyWithStatus(http::BAD_REQUEST);
339       delete service_name;
340       delete method_name;
341       return;
342     }
343
344     // codec is always JSON
345     //
346     codec = &json_codec_;
347
348     // params are encode inside URL
349     //
350
351     // receives call parameters
352     io::MemoryStream& params = p.cbody_.params_;
353
354     // GET parameters should be: ?params=[...]
355     //
356     std::vector< std::pair<std::string, std::string> > http_get_params;
357     url->GetQueryParameters(&http_get_params, true);
358     for ( std::vector< std::pair<std::string, std::string> >::const_iterator
359         it = http_get_params.begin(); it != http_get_params.end(); ++it) {
360       const std::string& key = (*it).first;
361       const std::string& value = (*it).second;
362       if ( key == RPC_HTTP_FIELD_PARAMS ) {
363         params.Write(value);
364         continue;
365       }
366       LOG_ERROR << "Ignoring unknown parameter: [" << key << "]"
367                    " , value: [" << value << "]";
368     }
369     if ( params.IsEmpty() ) {
370       LOG_ERROR << "Cannot find parameter: [" << RPC_HTTP_FIELD_PARAMS << "]."
371                    " in url: [" << url->path() << "]";
372       req->request()->server_data()->Write("Cannot find parameter: ["
373           + string(RPC_HTTP_FIELD_PARAMS) + "] in url: [" + url->path() + "]");
374       req->ReplyWithStatus(http::BAD_REQUEST);
375       delete service_name;
376       delete method_name;
377       return;
378     }
379
380     p.header_.msgType_ = RPC_CALL;
381     p.header_.xid_ = 0;
382     p.cbody_.service_ = *service_name;
383     p.cbody_.method_ = *method_name;
384     // p.cbody_.params_ were filled directly
385   }
386   delete service_name;
387   service_name = NULL;
388   delete method_name;
389   method_name = NULL;
390
391
392   /////////////////////////////////////////////////////////////////
393   //
394   // read RPC packet from HTTP POST
395   //
396   if ( req->request()->client_header()->method() == http::METHOD_POST ) {
397     // find codec ID in HTTP header (every request should specify the rpc codec)
398     //
399     do {
400       string strCodecID;
401       bool success = req->request()->client_header()->FindField(
402           string(RPC_HTTP_FIELD_CODEC_ID), &strCodecID);
403       if ( !success ) {
404         //  LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID
405         //            << "'. Cannot decode rpc query from http request.";
406         //  req->ReplyWithStatus(http::BAD_REQUEST);
407         //  return;
408         LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID
409                   << "'. Assuming rpc::CID_JSON(" << rpc::CID_JSON << ").";
410         codec = &json_codec_;
411         break;
412       }
413       // create codec
414       //
415       errno = 0; // required, to detect ::strtol failure
416       uint32 nCodecID = ::strtol(strCodecID.c_str(), NULL, 10);
417       if ( errno != 0 ) {
418         LOG_ERROR << "invalid codec_id, not a number: [" << strCodecID << "]";
419         req->ReplyWithStatus(http::BAD_REQUEST);
420         return;
421       }
422       switch ( nCodecID ) {
423         case rpc::CID_BINARY:
424           codec = &binary_codec_;
425           break;
426         case rpc::CID_JSON:
427           codec = &json_codec_;
428           break;
429         default:
430           LOG_ERROR << "invalid codec_id value: [" << strCodecID << "]";
431           req->ReplyWithStatus(http::BAD_REQUEST);
432           return;
433       }
434     } while( false );
435
436     // HTTP client data should contain the RPC packet
437     //
438     // decode rpc message
439     //
440     DECODE_RESULT result = codec->DecodePacket(*req->request()->client_data(),
441                                                p);
442     if ( result == DECODE_RESULT_ERROR ) {
443       LOG_ERROR << "Error decoding RPC message. Bad data.";
444       req->ReplyWithStatus(http::BAD_REQUEST);
445       return;
446     }
447     if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) {
448       LOG_ERROR << "Incomplete RPC message. Bad data.";
449       req->ReplyWithStatus(http::BAD_REQUEST);
450       return;
451     }
452     CHECK_EQ(result, DECODE_RESULT_SUCCESS);
453
454     if ( p.header_.msgType_ != RPC_CALL ) {
455       LOG_ERROR << "Received a non-CALL message! ignoring: " << p;
456       req->request()->server_data()->Write("Ignoring no-CALL message!");
457       req->ReplyWithStatus(http::BAD_REQUEST);
458       return;
459     }
460   }
461
462   // "codec" should be set
463   CHECK_NOT_NULL(codec);
464   // "p" should be filled
465   CHECK_EQ(p.header_.msgType_, RPC_CALL);
466
467
468   ///////////////////////////////////////////////////////////
469   //
470   // handle RPC message
471   //
472   LOG_DEBUG << "Handle received packet: " << p;
473
474   if ( p.header_.msgType_ != RPC_CALL ) {
475     // TODO(cpopescu): stats
476     LOG_ERROR << "Received a non-CALL message! ignoring: " << p;
477     req->ReplyWithStatus(http::BAD_REQUEST);
478     return;
479   }
480
481   // extract transport information
482   //
483   rpc::Transport transport(rpc::Transport::HTTP,
484                            net::HostPort(),
485                            req->remote_address());
486   if ( authenticator_ != NULL ) {
487     string user, passwd;
488     if ( req->request()->client_header()->GetAuthorizationField(&user,
489                                                                 &passwd) ) {
490       transport.set_user_passwd(user, passwd);
491     }
492   }
493
494   // Any timeouts ?
495   string timeout_str;
496   int timeout = 0;
497   if ( req->request()->client_header()->FindField("Timeout", &timeout_str) ) {
498     errno = 0;   // essential as strtol would not set a 0 errno
499     timeout = strtol(timeout_str.c_str(), NULL, 10);
500     if ( timeout < 0 ) {
501       timeout = 0;
502     }
503   }
504   // TODO(cosmin): support timeouts in the rpc processing !
505
506   // Now see if secondary paths are involved
507   string normalized_sub_path(strutil::NormalizePath(sub_path + '/'));
508   MirrorMap::const_iterator it_mirror =  mirror_map_.find(normalized_sub_path);
509   if ( it_mirror != mirror_map_.end() ) {
510     // WELL -- this is importnant - the called mirror request should not
511     //         cause the removal of the service
512     for ( hash_set<string>::const_iterator it_dup = it_mirror->second->begin();
513           it_dup != it_mirror->second->end(); ++it_dup ) {
514       LOG_DEBUG << " RPC mirror processing for : " << *it_dup << " / "
515                 << p.cbody_.service_.StdStr()
516                 << " per request on " << sub_path;
517       ProcessRequestCore(NULL, *it_dup, transport, p, codec, timeout);
518     }
519   }
520
521   ProcessRequestCore(req, sub_path, transport, p, codec, timeout);
522 }
523
524
525 void HttpServer::ProcessRequestCore(http::ServerRequest* req,
526                                     const string& sub_path,
527                                     const rpc::Transport& transport,
528                                     const rpc::Message& p,
529                                     rpc::Codec* codec,
530                                     int timeout) {
531   // extract call: service, method and arguments
532   //
533   const uint32 xid = p.header_.xid_;
534   string service = p.cbody_.service_.StdStr();
535   const string method = p.cbody_.method_.StdStr();
536
537
538   ServicesMap::const_iterator it =
539       services_.find(strutil::JoinPaths(sub_path, service));
540   if ( it == services_.end() ) {
541     LOG_ERROR << "Cannot serve service: " << service;
542     if ( req ) {
543       req->ReplyWithStatus(http::NOT_FOUND);
544     }
545     return;
546   }
547   rpc::ServiceInvoker* const invoker = it->second;
548   if ( service != invoker->GetName() ) {
549     service = invoker->GetName();  // may miss a '/'
550   }
551   // check if we can serve this one:
552   io::MemoryStream* params = NULL;
553
554   if ( req ) {
555     params = const_cast<io::MemoryStream*>(&p.cbody_.params_);
556   } else {
557     params = new io::MemoryStream(common::kByteOrder,
558                                   p.cbody_.params_.Size());
559     params->AppendStreamNonDestructive(&p.cbody_.params_);
560   }
561
562   // create an internal query.
563   if ( req ) {
564     req->request_mutex()->Lock();
565     if ( req->is_orphaned() ) {
566       req->request_mutex()->Unlock();
567       return;
568     }
569     // TODO(cpopescu): parametrize this one !!
570     req->request()->set_server_use_gzip_encoding(false);
571   }
572
573   rpc::Query* const query =
574       new rpc::Query(transport, xid, service, method, *params, *codec, 0);
575   if ( req ) {
576     query->SetCompletionCallback(
577         NewCallback(this, &HttpServer::RpcCallback, req));
578   } else {
579     query->SetCompletionCallback(
580         NewCallback(this, &HttpServer::RpcSecondaryCallback, params));
581   }
582
583   if ( selector_ != NULL && !selector_->IsInSelectThread() ) {
584     selector_->RunInSelectLoop(NewCallback(&InvokeCall, invoker, query));
585   } else if ( !invoker->Call(query) ) {
586     delete query;
587     if ( req ) {
588       current_requests_--;
589       // TODO(cpopescu): stats
590       req->ReplyWithStatus(http::INTERNAL_SERVER_ERROR);
591     }
592   }
593   if ( req ) {
594     req->request_mutex()->Unlock();
595   } else {
596     current_requests_--;
597   }
598 }
599
600 void HttpServer::RpcSecondaryCallback(io::MemoryStream* params,
601                                       const rpc::Query& query) {
602   LOG_DEBUG << " Secondary rpc call completed for: " << query.ToString();
603   delete params;
604 }
605
606 void HttpServer::RpcCallback(http::ServerRequest* req,
607                              const rpc::Query& query) {
608   // On several errors we want to reply HTTP directly
609   if ( query.status() == RPC_UNAUTHORIZED ) {
610     // This is safe from any thread..
611     if ( !authenticator_->realm().empty() ) {
612       req->request()->server_header()->AddField(
613           http::kHeaderWWWAuthenticate,
614           strutil::StringPrintf("Basic realm=\"%s\"",
615                                 authenticator_->realm().c_str()),
616           true);
617     }
618     req->ReplyWithStatus(http::UNAUTHORIZED);
619     return;
620   }
621   // create a RPC result message, and serialize it in
622   // http request -> server_data
623   //
624   rpc::Message p;
625   rpc::Message::Header& header = p.header_;
626   header.xid_ = query.qid();
627   header.msgType_ = RPC_REPLY;
628
629   rpc::Message::ReplyBody& body = p.rbody_;
630   body.replyStatus_ = query.status();
631   body.result_.AppendStreamNonDestructive(&query.result());
632
633   // encode the RPC result message
634   //
635   io::MemoryStream * reply = new io::MemoryStream();
636   query.codec().EncodePacket(*reply, p);
637
638   RpcReply(req, reply);
639
640   // TODO(cpopescu): stats
641   // -- including req->is_orphaned()
642   // -- TODO(cpopescu): break request on orphaned requests
643 }
644
645 void HttpServer::RpcReply(http::ServerRequest* req,
646                           io::MemoryStream* reply) {
647   if ( !selector_->IsInSelectThread() ) {
648     selector_->RunInSelectLoop(NewCallback(this, &HttpServer::RpcReply,
649                                            req, reply));
650     return;
651   }
652   current_requests_--;
653   req->request()->server_data()->AppendStreamNonDestructive(reply);
654   req->Reply();
655
656   delete reply;
657 }
658 }
Note: See TracBrowser for help on using the browser.