root/trunk/whisperlib/net/rpc/lib/server/rpc_http_processor.cc

Revision 7, 17.1 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #include "common/base/errno.h"
33 #include "common/base/scoped_ptr.h"
34 #include "common/io/file/file_input_stream.h"
35 #include "net/rpc/lib/server/rpc_http_processor.h"
36 #include "net/rpc/lib/server/rpc_service_invoker.h"
37 #include "net/rpc/lib/rpc_constants.h"
38
39 // Defined in rpc_http_server.cc
40 DECLARE_string(rpc_js_form_path);
41 // Defined in rpc_http_server.cc
42 DECLARE_bool(rpc_enable_http_get);
43
44 DEFINE_bool(rpc_enable_http_gzip, true,
45             "If true, RPC Http Responses are gzipped, taking less bandwidth.\n"
46             "False is useful for debugging.");
47
48 namespace rpc {
49
50 //////////////////////////////////////////////////////////////////////
51
52 rpc::HttpProcessor::ExecutingRequest::ExecutingRequest(
53     http::ServerRequest* req, rpc::Codec& codec, uint32 xid)
54     : req_(req),
55       codec_(codec),
56       xid_(xid) {
57   CHECK_NOT_NULL(req);
58 }
59 rpc::HttpProcessor::ExecutingRequest::~ExecutingRequest() {
60 }
61
62 //////////////////////////////////////////////////////////////////////
63
64 HttpProcessor::HttpProcessor(rpc::ServicesManager& manager,
65                              rpc::IAsyncQueryExecutor& executor,
66                              bool enable_auto_forms)
67     : rpc::IResultHandler(),
68       httpServer_(NULL),
69       http_path_(),
70       servicesManager_(manager),
71       asyncQueryExecutor_(executor),
72       registeredToQueryExecutor_(false),
73       requestsInExecution_(),
74       accessRequestsInExecution_(),
75       auto_forms_js_() {
76   asyncQueryExecutor_.RegisterResultHandler(*this);
77   registeredToQueryExecutor_ = true;
78
79   if ( enable_auto_forms && !FLAGS_rpc_js_form_path.empty() ) {
80     const string rpc_base_js = io::FileInputStream::ReadFileOrDie(
81         (FLAGS_rpc_js_form_path + "/rpc_base.js").c_str());
82     const string rpc_standard_js = io::FileInputStream::ReadFileOrDie(
83         (FLAGS_rpc_js_form_path + "/rpc_standard.js").c_str());
84     auto_forms_js_ = ("<script language=\"JavaScript1.1\">\n" +
85                       rpc_base_js + "\n" +
86                       rpc_standard_js + "\n" +
87                       "</script>\n");
88   }
89 }
90
91 HttpProcessor::~HttpProcessor() {
92   if ( registeredToQueryExecutor_ ) {
93     asyncQueryExecutor_.UnregisterResultHandler(*this);
94     registeredToQueryExecutor_ = false;
95   }
96 }
97
98 bool rpc::HttpProcessor::AttachToServer(http::Server* server,
99                                         const string& process_path) {
100   CHECK_NULL(httpServer_) << "Duplicate attach to http server";
101   httpServer_ = server;
102   httpServer_->RegisterProcessor(
103       process_path,
104       NewPermanentCallback(this,
105                            &rpc::HttpProcessor::CallbackProcessHTTPRequest),
106       true);   // TODO(cosmin): what does is_public(=true) mean!?
107
108   if ( strutil::StrEndsWith(process_path, "/") ) {
109     LOG_WARNING << "AttachToServer on a path ending with '/'";
110     http_path_ = process_path.substr(0, process_path.length() - 1);
111   } else {
112     http_path_ = process_path;
113   }
114   return true;
115 }
116
117
118 //////////////////////////////////////////////////////////////////////
119 //
120 //     Methods available only from the selector thread.
121 //
122 //
123 // TODO(cosmin): this function is too long - split it ..
124 void rpc::HttpProcessor::CallbackProcessHTTPRequest(http::ServerRequest* req) {
125
126   // We accept all requests here.
127   // Parallel requests count is limited in IRPCAsyncQueryExecutor !
128
129   ///////////////////////////////////////////////////////////////
130   //
131   // Split url into: service_path / service_name / method_name
132   //
133   const string url_path = URL::UrlUnescape(req->request()->url()->path());
134   LOG_DEBUG << "url: [" << url_path << "]"
135                " , method: " << GetHttpMethodName(
136                    req->request()->client_header()->method())
137             << " , path we're listening: [" << http_path_ << "]";
138
139   std::string sub_path = url_path.substr(http_path_.size());
140   sub_path = strutil::StrTrimChars(sub_path, "/");
141   LOG_DEBUG << "sub_path = " << sub_path;
142
143   // sub_path should be "a/b/c/service_name/method_name"
144   //
145   // We extract:
146   //   service_full_path = "a/b/c/service_name"
147   //   service_path = "a/b/c";
148   //   service_name = "service_name";
149   //   method_name = "method_name";
150   //
151   string service_full_path;
152   string method_name;
153   int last_slash_index = sub_path.rfind('/');
154   if ( last_slash_index == std::string::npos ) {
155     service_full_path = sub_path;
156     method_name = "";
157   } else {
158     service_full_path = sub_path.substr(0, last_slash_index);
159     method_name = sub_path.substr(last_slash_index + 1);
160   }
161
162   string service_path;
163   string service_name;
164   last_slash_index = service_full_path.rfind('/');
165   if ( last_slash_index == std::string::npos ) {
166     service_path = "";
167     service_name = service_full_path;
168   } else {
169     service_path = service_full_path.substr(0, last_slash_index);
170     service_name = service_full_path.substr(last_slash_index + 1);
171   }
172
173   LOG_DEBUG << "service_path: [" << service_path << "] , "
174                "service_name: [" << service_name << "] , "
175                "method_name = [" << method_name << "]";
176   ////////////////////////////////////////////////////////////////////
177
178   // maybe process auto-forms request
179   do {
180     if ( auto_forms_js_.empty() ||
181          url_path.length() <= http_path_.length() ) {
182       break;
183     }
184
185     if ( !strutil::StrStartsWith(method_name, "__form") ) {
186       break;
187     }
188
189     rpc::ServiceInvoker* service = servicesManager_.FindService(service_name);
190     if ( !service ) {
191       req->request()->server_data()->Write(
192           "<h1>404 Not Found</h1>"
193           " could not find RPC service: [" + service_name + "] on "
194           "suburl: [" + sub_path +"]");
195       req->request()->server_data()->Write(
196           "<br/> Known services are: {" +
197           servicesManager_.StrListServices(", ") + "}");
198       req->ReplyWithStatus(http::NOT_FOUND);
199       return;
200     }
201
202     const string auto_form_data =
203         service->GetTurntablePage(http_path_, method_name);
204     req->request()->server_data()->Write("<html>\n");
205     req->request()->server_data()->Write(auto_forms_js_);
206     req->request()->server_data()->Write("\n<body>\n");
207     req->request()->server_data()->Write(auto_form_data);
208     req->request()->server_data()->Write("\n</body></html>\n");
209     req->Reply();
210     return;
211   } while ( false );
212
213   // receives the RPC packet
214   rpc::Message p;
215
216   // will be set to appropriate codec
217   rpc::Codec* codec = NULL;
218
219   ////////////////////////////////////////////////////////////////////
220   //
221   // read RPC packet from HTTP GET
222   //
223   if ( req->request()->client_header()->method() == http::METHOD_GET ) {
224
225     // is HTTP GET enabled for rpc ?
226     //
227     if ( !FLAGS_rpc_enable_http_get ) {
228       LOG_ERROR << "Bad request method: "
229                 << req->request()->client_header()->method()
230                 << ". RPC HTTP Get not enabled.";
231       req->ReplyWithStatus(http::BAD_REQUEST);
232       return;
233     }
234
235     // codec is always JSON
236     //
237     codec = &json_codec_;
238
239     // service, method and params are encode inside URL
240     //
241     URL * url = req->request()->url();
242     CHECK_NOT_NULL(url) << "NULL url on http request";
243
244     // receives call parameters
245     io::MemoryStream& params = p.cbody_.params_;
246
247     // GET parameters should be: ?params=[...]
248     //
249     std::vector< std::pair<std::string, std::string> > http_get_params;
250     url->GetQueryParameters(&http_get_params, true);
251     for ( std::vector< std::pair<std::string, std::string> >::const_iterator
252         it = http_get_params.begin(); it != http_get_params.end(); ++it) {
253       const std::string& key = (*it).first;
254       const std::string& value = (*it).second;
255       if ( key == RPC_HTTP_FIELD_PARAMS ) {
256         params.Write(value);
257         continue;
258       }
259       LOG_ERROR << "Ignoring unknown parameter: [" << key << "]"
260                    " , value: [" << value << "]";
261     }
262     if ( params.IsEmpty() ) {
263       LOG_ERROR << "Cannot find parameter: [" << RPC_HTTP_FIELD_PARAMS << "]."
264                    " in url: [" << url->path() << "]";
265       req->request()->server_data()->Write("Cannot find parameter: ["
266           + string(RPC_HTTP_FIELD_PARAMS) + "] in url: [" + url->path() + "]");
267       req->ReplyWithStatus(http::BAD_REQUEST);
268       return;
269     }
270
271     p.header_.msgType_ = RPC_CALL;
272     p.header_.xid_ = 0;
273     p.cbody_.service_ = service_name;
274     p.cbody_.method_ = method_name;
275     // p.cbody_.params_ were filled directly
276   }
277
278   /////////////////////////////////////////////////////////////////
279   //
280   // read RPC packet from HTTP POST
281   //
282   if ( req->request()->client_header()->method() == http::METHOD_POST ) {
283     // find codec ID in HTTP header (every request should specify the rpc codec)
284     //
285     do {
286       string strCodecID;
287       bool success = req->request()->client_header()->FindField(
288           string(RPC_HTTP_FIELD_CODEC_ID), &strCodecID);
289       if ( !success ) {
290         //  LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID
291         //            << "'. Cannot decode rpc query from http request.";
292         //  req->ReplyWithStatus(http::BAD_REQUEST);
293         //  return;
294         LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID
295                   << "'. Assuming rpc::CID_JSON(" << rpc::CID_JSON << ").";
296         codec = &json_codec_;
297         break;
298       }
299       // create codec
300       //
301       errno = 0; // required, to detect ::strtol failure
302       uint32 nCodecID = ::strtol(strCodecID.c_str(), NULL, 10);
303       if ( errno != 0 ) {
304         LOG_ERROR << "invalid codec_id, not a number: [" << strCodecID << "]";
305         req->ReplyWithStatus(http::BAD_REQUEST);
306         return;
307       }
308       switch ( nCodecID ) {
309         case rpc::CID_BINARY:
310           codec = &binary_codec_;
311           break;
312         case rpc::CID_JSON:
313           codec = &json_codec_;
314           break;
315         default:
316           LOG_ERROR << "invalid codec_id value: [" << strCodecID << "]";
317           req->ReplyWithStatus(http::BAD_REQUEST);
318           return;
319       }
320     } while( false );
321
322     // HTTP client data should contain the RPC packet
323     //
324     // decode rpc message
325     //
326     DECODE_RESULT result = codec->DecodePacket(*req->request()->client_data(),
327                                                p);
328     if ( result == DECODE_RESULT_ERROR ) {
329       LOG_ERROR << "Error decoding RPC message. Bad data.";
330       req->ReplyWithStatus(http::BAD_REQUEST);
331       return;
332     }
333     if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) {
334       LOG_ERROR << "Incomplete RPC message. Bad data.";
335       req->ReplyWithStatus(http::BAD_REQUEST);
336       return;
337     }
338     CHECK_EQ(result, DECODE_RESULT_SUCCESS);
339
340     if ( p.header_.msgType_ != RPC_CALL ) {
341       LOG_ERROR << "Received a non-CALL message! ignoring: " << p;
342       req->request()->server_data()->Write("Ignoring no-CALL message!");
343       req->ReplyWithStatus(http::BAD_REQUEST);
344       return;
345     }
346   }
347
348
349   // "codec" should be set
350   CHECK_NOT_NULL(codec);
351   // "p" should be filled
352   CHECK_EQ(p.header_.msgType_, RPC_CALL);
353
354
355   ///////////////////////////////////////////////////////////
356   //
357   // handle rpc message
358   //
359   LOG_DEBUG << "Handle received packet: " << p;
360
361   // extract transport
362   //
363   net::HostPort local_address(req->local_address());
364   net::HostPort remote_address(req->remote_address());
365   // If there is a HTTP proxy, the X-Forwarded-For (XFF) HTTP header is
366   // de facto standard for identifying the originating IP address.
367   string originating_ip;
368   bool success = req->request()->client_header()->FindField(
369       string("X-Forwarded-For"), &originating_ip);
370   if ( success ) {
371     net::IpAddress orig_ip(originating_ip.c_str());
372     if ( !orig_ip.IsInvalid() ) {
373       LOG_DEBUG << "X-Forwarded-For: " << originating_ip;
374       remote_address.set_ip(orig_ip);
375       remote_address.set_port(0);   // unknown
376     }
377   }
378   rpc::Transport transport(rpc::Transport::HTTP, local_address, remote_address);
379
380   // extract call: service, method and arguments
381   //
382   CHECK_EQ(p.header_.msgType_, RPC_CALL);
383   const uint32 xid = p.header_.xid_;
384   const string service = p.cbody_.service_.StdStr();
385   const string method = p.cbody_.method_.StdStr();
386   io::MemoryStream& params =
387       const_cast<io::MemoryStream&>(p.cbody_.params_);
388
389   // create an internal query.
390   //
391   // DO NOT use xid as qid. Because rpc::HttpProcessor manages multiple
392   // connections(being unique on server).. so xid s may be identical in
393   // different connections.
394   //
395   // TODO(cosmin) : ** possible int64 trouble ** !!
396   intptr_t qid = intptr_t(req);
397   // const int32 qid = static_cast<int32>();
398   rpc::Query* query = new rpc::Query(transport, qid, service, method, params,
399                                      *codec, GetResultHandlerID());
400
401   // put the request on waiting list
402   // RACE NOTE: ! do this before queueing the query for execution. Because the
403   //              execution may happen extremely fast, returning and finding
404   //              no waiting request.
405   //
406   pair<MapOfRequests::iterator, bool> bit;
407   {
408     synch::MutexLocker lock(&accessRequestsInExecution_);
409     bit = requestsInExecution_.insert(
410         make_pair(qid, new ExecutingRequest(req, *codec, xid)));
411     codec = NULL;
412   }
413   CHECK(bit.second);   // the qid must not be already in execution.
414   MapOfRequests::iterator it = bit.first;
415
416   // send the query to the executor. Specify us as the result collector.
417   //
418   success = asyncQueryExecutor_.QueueRPC(query);
419   if ( !success ) {
420     delete query;
421     query = NULL;
422
423     LOG_ERROR << "Async queue execution failed:"
424               << " service=" << service
425               << " method=" << method
426               << " reason=" << GetLastSystemErrorDescription();
427
428     // on error, remove & complete the waiting request
429     //
430     delete it->second;   // delete executing request
431     {
432       synch::MutexLocker lock(&accessRequestsInExecution_);
433       requestsInExecution_.erase(it);
434     }
435     req->ReplyWithStatus(http::INTERNAL_SERVER_ERROR);
436     return;
437   }
438
439   // Query execution is now in progress. We will be notified about
440   // result in HandleRPCResult(..).
441 }
442
443 void rpc::HttpProcessor::CallbackReplyToHTTPRequest(
444     http::ServerRequest* req,
445     http::HttpReturnCode rCode) {
446   CHECK_NOT_NULL(req);
447   req->ReplyWithStatus(rCode);
448 }
449
450 //////////////////////////////////////////////////////////////
451 //
452 //   Methods available to any external thread (worker threads).
453 //
454 void rpc::HttpProcessor::WriteReply(uint32 qid,
455                                     rpc::REPLY_STATUS status,
456                                     const io::MemoryStream& result) {
457   // [external/worker thread here]
458   scoped_ptr<ExecutingRequest> ereq;
459   {
460     synch::MutexLocker lock(&accessRequestsInExecution_);
461     MapOfRequests::iterator it = requestsInExecution_.find(qid);
462     CHECK(it != requestsInExecution_.end());
463     ereq.reset(it->second);
464     requestsInExecution_.erase(it);
465   }
466   CHECK_NOT_NULL(ereq.get());
467   CHECK_NOT_NULL(ereq->req_);
468
469   http::ServerRequest* req = ereq->req_;
470   rpc::Codec& codec = ereq->codec_;
471   uint32 xid = ereq->xid_;
472
473   // create a RPC result message, and serialize it in http
474   // request -> server_data
475   rpc::Message p;
476
477   rpc::Message::Header& header = p.header_;
478   header.xid_ = xid;
479   header.msgType_ = RPC_REPLY;
480
481   rpc::Message::ReplyBody& body = p.rbody_;
482   body.replyStatus_ = status;
483   body.result_.AppendStreamNonDestructive(&result);
484
485   LOG_DEBUG << "WriteReply sending packet: " << p;
486
487   // encode the RPC result message
488   //
489   codec.EncodePacket(*req->request()->server_data(), p);
490
491   // enable/disable gzip compression
492   req->request()->set_server_use_gzip_encoding(FLAGS_rpc_enable_http_gzip);
493
494   // queue a closure that sends the response by replying to the http request
495   //
496   httpServer_->selector()->RunInSelectLoop(
497       NewCallback(this, &rpc::HttpProcessor::CallbackReplyToHTTPRequest,
498                   req, http::OK));
499 }
500
501 void rpc::HttpProcessor::HandleRPCResult(const rpc::Query& q) {
502   WriteReply(q.qid(), q.status(), q.result());
503 }
504 }
Note: See TracBrowser for help on using the browser.