| 1 |
// Copyright (c) 2009, Whispersoft s.r.l. |
|---|
| 2 |
// All rights reserved. |
|---|
| 3 |
// |
|---|
| 4 |
// Redistribution and use in source and binary forms, with or without |
|---|
| 5 |
// modification, are permitted provided that the following conditions are |
|---|
| 6 |
// met: |
|---|
| 7 |
// |
|---|
| 8 |
// * Redistributions of source code must retain the above copyright |
|---|
| 9 |
// notice, this list of conditions and the following disclaimer. |
|---|
| 10 |
// * Redistributions in binary form must reproduce the above |
|---|
| 11 |
// copyright notice, this list of conditions and the following disclaimer |
|---|
| 12 |
// in the documentation and/or other materials provided with the |
|---|
| 13 |
// distribution. |
|---|
| 14 |
// * Neither the name of Whispersoft s.r.l. nor the names of its |
|---|
| 15 |
// contributors may be used to endorse or promote products derived from |
|---|
| 16 |
// this software without specific prior written permission. |
|---|
| 17 |
// |
|---|
| 18 |
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|---|
| 19 |
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|---|
| 20 |
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
|---|
| 21 |
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
|---|
| 22 |
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|---|
| 23 |
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|---|
| 24 |
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|---|
| 25 |
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|---|
| 26 |
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|---|
| 27 |
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|---|
| 28 |
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|---|
| 29 |
// |
|---|
| 30 |
// Author: Catalin Popescu |
|---|
| 31 |
|
|---|
| 32 |
#include "common/base/errno.h" |
|---|
| 33 |
#include "net/rpc/lib/server/rpc_http_server.h" |
|---|
| 34 |
#include "common/io/file/file_input_stream.h" |
|---|
| 35 |
|
|---|
| 36 |
////////////////////////////////////////////////////////////////////// |
|---|
| 37 |
|
|---|
| 38 |
// TODO(cpopescu): make this a parameter rather than a flag ... |
|---|
| 39 |
DEFINE_string(rpc_js_form_path, |
|---|
| 40 |
"", |
|---|
| 41 |
"If specified, we export RPC accesing via auto-generated forms " |
|---|
| 42 |
"and we read the extra js sources from here (we need files " |
|---|
| 43 |
"from //depot/.../libs/net/rpc/js/"); |
|---|
| 44 |
|
|---|
| 45 |
DEFINE_bool(rpc_enable_http_get, |
|---|
| 46 |
false, |
|---|
| 47 |
"If enabled we process HTTP GET requests. By default only" |
|---|
| 48 |
" HTTP POST requests are enabled."); |
|---|
| 49 |
|
|---|
| 50 |
////////////////////////////////////////////////////////////////////// |
|---|
| 51 |
|
|---|
| 52 |
namespace rpc { |
|---|
| 53 |
HttpServer::HttpServer(net::Selector* selector, |
|---|
| 54 |
http::Server* server, |
|---|
| 55 |
net::UserAuthenticator* authenticator, |
|---|
| 56 |
const string& path, |
|---|
| 57 |
bool auto_js_forms, |
|---|
| 58 |
bool is_public, |
|---|
| 59 |
int max_concurrent_requests, |
|---|
| 60 |
const string& ip_class_restriction) |
|---|
| 61 |
: selector_(selector), |
|---|
| 62 |
authenticator_(authenticator), |
|---|
| 63 |
accepted_clients_( |
|---|
| 64 |
ip_class_restriction.empty() |
|---|
| 65 |
? NULL |
|---|
| 66 |
: net::IpClassifier::CreateClassifier(ip_class_restriction)), |
|---|
| 67 |
path_(path), |
|---|
| 68 |
max_concurrent_requests_(max_concurrent_requests), |
|---|
| 69 |
current_requests_(0) { |
|---|
| 70 |
if ( auto_js_forms && !FLAGS_rpc_js_form_path.empty() ) { |
|---|
| 71 |
const string rpc_base_js = io::FileInputStream::ReadFileOrDie( |
|---|
| 72 |
(FLAGS_rpc_js_form_path + "/rpc_base.js").c_str()); |
|---|
| 73 |
const string rpc_standard_js = io::FileInputStream::ReadFileOrDie( |
|---|
| 74 |
(FLAGS_rpc_js_form_path + "/rpc_standard.js").c_str()); |
|---|
| 75 |
js_prolog_ = ("<script language=\"JavaScript1.1\">\n" + |
|---|
| 76 |
rpc_base_js + "\n" + |
|---|
| 77 |
rpc_standard_js + "\n" + |
|---|
| 78 |
"</script>\n"); |
|---|
| 79 |
} |
|---|
| 80 |
server->RegisterProcessor( |
|---|
| 81 |
path, |
|---|
| 82 |
NewPermanentCallback(this, &HttpServer::ProcessRequest), is_public); |
|---|
| 83 |
} |
|---|
| 84 |
|
|---|
| 85 |
HttpServer::~HttpServer() { |
|---|
| 86 |
for ( MirrorMap::const_iterator it_mirror = mirror_map_.begin(); |
|---|
| 87 |
it_mirror != mirror_map_.end(); |
|---|
| 88 |
++it_mirror ) { |
|---|
| 89 |
delete it_mirror->second; |
|---|
| 90 |
} |
|---|
| 91 |
delete accepted_clients_; |
|---|
| 92 |
LOG_INFO_IF(current_requests_ > 0) |
|---|
| 93 |
<< " Exiting the RPC server with current_requests_ " |
|---|
| 94 |
<< "requests in processing !!"; |
|---|
| 95 |
} |
|---|
| 96 |
|
|---|
| 97 |
void HttpServer::RegisterServiceMirror(const string& sub_path_src, |
|---|
| 98 |
const string& sub_path_dest) { |
|---|
| 99 |
string normalized_sub_path_src(strutil::NormalizePath(sub_path_src + '/')); |
|---|
| 100 |
string normalized_sub_path_dest(strutil::NormalizePath(sub_path_dest + '/')); |
|---|
| 101 |
MirrorMap::iterator it_mirror = mirror_map_.find(normalized_sub_path_src); |
|---|
| 102 |
if ( it_mirror != mirror_map_.end() ) { |
|---|
| 103 |
it_mirror->second->insert(normalized_sub_path_dest); |
|---|
| 104 |
} else { |
|---|
| 105 |
hash_set<string>* s = new hash_set<string>; |
|---|
| 106 |
s->insert(normalized_sub_path_dest); |
|---|
| 107 |
mirror_map_.insert(make_pair(normalized_sub_path_src, s)); |
|---|
| 108 |
} |
|---|
| 109 |
reverse_mirror_map_[normalized_sub_path_dest] = normalized_sub_path_src; |
|---|
| 110 |
} |
|---|
| 111 |
|
|---|
| 112 |
bool HttpServer::RegisterService(const string& sub_path, |
|---|
| 113 |
rpc::ServiceInvoker* invoker) { |
|---|
| 114 |
const string full_path(sub_path + invoker->GetName()); |
|---|
| 115 |
ServicesMap::const_iterator it = services_.find(full_path); |
|---|
| 116 |
if ( it != services_.end() ) { |
|---|
| 117 |
LOG_WARNING << " Tried to double register " << full_path; |
|---|
| 118 |
return false; |
|---|
| 119 |
} |
|---|
| 120 |
LOG_INFO << " Registering: " << invoker->ToString() << " on path: " |
|---|
| 121 |
<< full_path; |
|---|
| 122 |
services_.insert(make_pair(full_path, invoker)); |
|---|
| 123 |
return true; |
|---|
| 124 |
} |
|---|
| 125 |
bool HttpServer::UnregisterService(const string& sub_path, |
|---|
| 126 |
rpc::ServiceInvoker* invoker) { |
|---|
| 127 |
const string full_path(sub_path + invoker->GetName()); |
|---|
| 128 |
ServicesMap::iterator it = services_.find(full_path); |
|---|
| 129 |
if ( it == services_.end() ) { |
|---|
| 130 |
LOG_WARNING << " Tried to unregister " << full_path |
|---|
| 131 |
<< " not found."; |
|---|
| 132 |
return false; |
|---|
| 133 |
} |
|---|
| 134 |
if ( invoker != it->second ) { |
|---|
| 135 |
LOG_WARNING << " Different service registered under " << full_path |
|---|
| 136 |
<< invoker->ToString() << " vs. " << it->second->ToString() |
|---|
| 137 |
<< " --> " << invoker << " vs. " << it->second; |
|---|
| 138 |
return false; |
|---|
| 139 |
} |
|---|
| 140 |
string normalized_sub_path(strutil::NormalizePath(sub_path + '/')); |
|---|
| 141 |
MirrorMap::iterator it_mirror = mirror_map_.find(normalized_sub_path); |
|---|
| 142 |
if ( it_mirror != mirror_map_.end() ) { |
|---|
| 143 |
for ( hash_set<string>::const_iterator it_rev = it_mirror->second->begin(); |
|---|
| 144 |
it_rev != it_mirror->second->end(); ++it_rev ) { |
|---|
| 145 |
reverse_mirror_map_.erase(*it_rev); |
|---|
| 146 |
} |
|---|
| 147 |
delete it_mirror->second; |
|---|
| 148 |
mirror_map_.erase(it_mirror); |
|---|
| 149 |
LOG_INFO << " RPC forward mapping removed for " << normalized_sub_path; |
|---|
| 150 |
} |
|---|
| 151 |
ReverseMirrorMap::iterator |
|---|
| 152 |
it_rev = reverse_mirror_map_.find(normalized_sub_path); |
|---|
| 153 |
if ( it_rev != reverse_mirror_map_.end() ) { |
|---|
| 154 |
it_mirror = mirror_map_.find(it_rev->second); |
|---|
| 155 |
if ( it_mirror != mirror_map_.end() ) { |
|---|
| 156 |
it_mirror->second->erase(normalized_sub_path); |
|---|
| 157 |
if ( it_mirror->second->empty() ) { |
|---|
| 158 |
delete it_mirror->second; |
|---|
| 159 |
mirror_map_.erase(it_mirror); |
|---|
| 160 |
} |
|---|
| 161 |
} |
|---|
| 162 |
LOG_INFO << " RPC backward mapping removed for " << normalized_sub_path |
|---|
| 163 |
<< " from " << it_rev->second; |
|---|
| 164 |
reverse_mirror_map_.erase(it_rev); |
|---|
| 165 |
} |
|---|
| 166 |
services_.erase(full_path); |
|---|
| 167 |
return true; |
|---|
| 168 |
} |
|---|
| 169 |
bool HttpServer::RegisterService(rpc::ServiceInvoker* invoker) { |
|---|
| 170 |
return RegisterService("", invoker); |
|---|
| 171 |
} |
|---|
| 172 |
|
|---|
| 173 |
bool HttpServer::UnregisterService(rpc::ServiceInvoker* invoker) { |
|---|
| 174 |
return UnregisterService("", invoker); |
|---|
| 175 |
} |
|---|
| 176 |
|
|---|
| 177 |
// Small helper |
|---|
| 178 |
void InvokeCall(rpc::ServiceInvoker* invoker, rpc::Query* q) { |
|---|
| 179 |
CHECK(invoker->Call(q)) << " We do not accept failing calls here"; |
|---|
| 180 |
} |
|---|
| 181 |
|
|---|
| 182 |
void HttpServer::ProcessRequest(http::ServerRequest* req) { |
|---|
| 183 |
const string req_id = |
|---|
| 184 |
req->request()->client_header()->FindField(http::kHeaderXRequestId); |
|---|
| 185 |
if ( !req_id.empty() ) { |
|---|
| 186 |
req->request()->server_header()->AddField( |
|---|
| 187 |
http::kHeaderXRequestId, req_id, true); |
|---|
| 188 |
} |
|---|
| 189 |
if ( current_requests_ >= max_concurrent_requests_ ) { |
|---|
| 190 |
LOG_ERROR << " Too many concurrent requests: " << current_requests_; |
|---|
| 191 |
// TODO(cpopescu): stats |
|---|
| 192 |
req->ReplyWithStatus(http::INTERNAL_SERVER_ERROR); |
|---|
| 193 |
return; |
|---|
| 194 |
} |
|---|
| 195 |
if ( accepted_clients_ != NULL && |
|---|
| 196 |
!accepted_clients_->IsInClass(req->remote_address().ip_object()) ) { |
|---|
| 197 |
// TODO(cpopescu): stats |
|---|
| 198 |
req->request()->server_data()->Write("Bad IP"); |
|---|
| 199 |
req->ReplyWithStatus(http::UNAUTHORIZED); |
|---|
| 200 |
return; |
|---|
| 201 |
} |
|---|
| 202 |
|
|---|
| 203 |
|
|---|
| 204 |
/////////////////////////////////////////////////////////////// |
|---|
| 205 |
// |
|---|
| 206 |
// Split url into: service_path / service_name / method_name |
|---|
| 207 |
// |
|---|
| 208 |
const string url_path = URL::UrlUnescape(req->request()->url()->path()); |
|---|
| 209 |
|
|---|
| 210 |
std::string sub_path = url_path.substr(path_.size()); |
|---|
| 211 |
sub_path = strutil::StrTrimChars(sub_path, "/"); |
|---|
| 212 |
LOG_DEBUG << "url: [" << url_path << "] , path_: [" << path_ |
|---|
| 213 |
<< "], sub_path: [" << sub_path << "]"; |
|---|
| 214 |
|
|---|
| 215 |
// sub_path should be "a/b/c/service_name/method_name" |
|---|
| 216 |
// |
|---|
| 217 |
// We extract: |
|---|
| 218 |
// service_full_path = "a/b/c/service_name" |
|---|
| 219 |
// service_path = "a/b/c"; |
|---|
| 220 |
// service_name = "service_name"; |
|---|
| 221 |
// method_name = "method_name"; |
|---|
| 222 |
// |
|---|
| 223 |
string service_full_path; |
|---|
| 224 |
string* method_name = new string(); |
|---|
| 225 |
int last_slash_index = sub_path.rfind('/'); |
|---|
| 226 |
if ( last_slash_index == std::string::npos ) { |
|---|
| 227 |
service_full_path = sub_path; |
|---|
| 228 |
*method_name = ""; |
|---|
| 229 |
} else { |
|---|
| 230 |
service_full_path = sub_path.substr(0, last_slash_index); |
|---|
| 231 |
*method_name = sub_path.substr(last_slash_index + 1); |
|---|
| 232 |
} |
|---|
| 233 |
|
|---|
| 234 |
string service_path; |
|---|
| 235 |
string* service_name = new string(); |
|---|
| 236 |
last_slash_index = service_full_path.rfind('/'); |
|---|
| 237 |
if ( last_slash_index == std::string::npos ) { |
|---|
| 238 |
service_path = ""; |
|---|
| 239 |
*service_name = service_full_path; |
|---|
| 240 |
} else { |
|---|
| 241 |
service_path = service_full_path.substr(0, last_slash_index); |
|---|
| 242 |
*service_name = service_full_path.substr(last_slash_index + 1); |
|---|
| 243 |
} |
|---|
| 244 |
|
|---|
| 245 |
//////////////////////////////////////////////////////////////////// |
|---|
| 246 |
|
|---|
| 247 |
// maybe process auto-forms request |
|---|
| 248 |
do { |
|---|
| 249 |
if ( js_prolog_.empty() || |
|---|
| 250 |
url_path.length() <= path_.length() ) { |
|---|
| 251 |
break; |
|---|
| 252 |
} |
|---|
| 253 |
|
|---|
| 254 |
if ( !strutil::StrStartsWith(*method_name, "__form") ) { |
|---|
| 255 |
break; |
|---|
| 256 |
} |
|---|
| 257 |
|
|---|
| 258 |
ServicesMap::const_iterator it = services_.find(service_full_path); |
|---|
| 259 |
rpc::ServiceInvoker* service = (it == services_.end() ? NULL : it->second); |
|---|
| 260 |
if ( !service ) { |
|---|
| 261 |
req->request()->server_data()->Write( |
|---|
| 262 |
"<h1>404 Not Found</h1>" |
|---|
| 263 |
" could not find RPC service: [" + service_full_path + "] on " |
|---|
| 264 |
"suburl: [" + sub_path +"]"); |
|---|
| 265 |
req->request()->server_data()->Write( |
|---|
| 266 |
"<br/> Known services are: {"); |
|---|
| 267 |
for ( ServicesMap::const_iterator it = services_.begin(); |
|---|
| 268 |
it != services_.end(); ) { |
|---|
| 269 |
req->request()->server_data()->Write(it->first.c_str()); |
|---|
| 270 |
++it; |
|---|
| 271 |
if ( it != services_.end() ) { |
|---|
| 272 |
req->request()->server_data()->Write(", "); |
|---|
| 273 |
} |
|---|
| 274 |
} |
|---|
| 275 |
req->request()->server_data()->Write("}"); |
|---|
| 276 |
req->ReplyWithStatus(http::NOT_FOUND); |
|---|
| 277 |
delete service_name; |
|---|
| 278 |
delete method_name; |
|---|
| 279 |
return; |
|---|
| 280 |
} |
|---|
| 281 |
|
|---|
| 282 |
const string auto_form_data = service->GetTurntablePage( |
|---|
| 283 |
strutil::JoinPaths(path_, service_path), *method_name); |
|---|
| 284 |
req->request()->server_data()->Write("<html>\n"); |
|---|
| 285 |
req->request()->server_data()->Write(js_prolog_); |
|---|
| 286 |
req->request()->server_data()->Write("\n<body>\n"); |
|---|
| 287 |
req->request()->server_data()->Write(auto_form_data); |
|---|
| 288 |
req->request()->server_data()->Write("\n</body></html>\n"); |
|---|
| 289 |
req->Reply(); |
|---|
| 290 |
delete service_name; |
|---|
| 291 |
delete method_name; |
|---|
| 292 |
return; |
|---|
| 293 |
} while ( false ); |
|---|
| 294 |
|
|---|
| 295 |
req->AuthenticateRequest( |
|---|
| 296 |
authenticator_, |
|---|
| 297 |
NewCallback(this, &HttpServer::ProcessAuthenticatedRequest, |
|---|
| 298 |
req, service_name, method_name)); |
|---|
| 299 |
} |
|---|
| 300 |
|
|---|
| 301 |
void HttpServer::ProcessAuthenticatedRequest( |
|---|
| 302 |
http::ServerRequest* req, |
|---|
| 303 |
string* service_name, string* method_name, |
|---|
| 304 |
net::UserAuthenticator::Answer auth_answer) { |
|---|
| 305 |
if ( auth_answer != net::UserAuthenticator::Authenticated ) { |
|---|
| 306 |
LOG_INFO << "Unauthenticated request: " << req->ToString() |
|---|
| 307 |
<< " answer: " << auth_answer; |
|---|
| 308 |
req->AnswerUnauthorizedRequest(authenticator_); |
|---|
| 309 |
delete service_name; |
|---|
| 310 |
delete method_name; |
|---|
| 311 |
return; |
|---|
| 312 |
} |
|---|
| 313 |
// Authenticated - OK ! |
|---|
| 314 |
|
|---|
| 315 |
// receives the RPC packet |
|---|
| 316 |
rpc::Message p; |
|---|
| 317 |
|
|---|
| 318 |
// will be set to appropriate codec |
|---|
| 319 |
rpc::Codec* codec = NULL; |
|---|
| 320 |
|
|---|
| 321 |
URL * url = req->request()->url(); |
|---|
| 322 |
const string url_path = URL::UrlUnescape(req->request()->url()->path()); |
|---|
| 323 |
std::string sub_path = url_path.substr(path_.size()); |
|---|
| 324 |
sub_path = strutil::StrTrimChars(sub_path, "/"); |
|---|
| 325 |
|
|---|
| 326 |
//////////////////////////////////////////////////////////////////// |
|---|
| 327 |
// |
|---|
| 328 |
// read RPC packet from HTTP GET |
|---|
| 329 |
// |
|---|
| 330 |
if ( req->request()->client_header()->method() == http::METHOD_GET ) { |
|---|
| 331 |
|
|---|
| 332 |
// is HTTP GET enabled for rpc ? |
|---|
| 333 |
// |
|---|
| 334 |
if ( !FLAGS_rpc_enable_http_get ) { |
|---|
| 335 |
LOG_ERROR << "Bad request method: " |
|---|
| 336 |
<< req->request()->client_header()->method() |
|---|
| 337 |
<< ". RPC HTTP Get not enabled."; |
|---|
| 338 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 339 |
delete service_name; |
|---|
| 340 |
delete method_name; |
|---|
| 341 |
return; |
|---|
| 342 |
} |
|---|
| 343 |
|
|---|
| 344 |
// codec is always JSON |
|---|
| 345 |
// |
|---|
| 346 |
codec = &json_codec_; |
|---|
| 347 |
|
|---|
| 348 |
// params are encode inside URL |
|---|
| 349 |
// |
|---|
| 350 |
|
|---|
| 351 |
// receives call parameters |
|---|
| 352 |
io::MemoryStream& params = p.cbody_.params_; |
|---|
| 353 |
|
|---|
| 354 |
// GET parameters should be: ?params=[...] |
|---|
| 355 |
// |
|---|
| 356 |
std::vector< std::pair<std::string, std::string> > http_get_params; |
|---|
| 357 |
url->GetQueryParameters(&http_get_params, true); |
|---|
| 358 |
for ( std::vector< std::pair<std::string, std::string> >::const_iterator |
|---|
| 359 |
it = http_get_params.begin(); it != http_get_params.end(); ++it) { |
|---|
| 360 |
const std::string& key = (*it).first; |
|---|
| 361 |
const std::string& value = (*it).second; |
|---|
| 362 |
if ( key == RPC_HTTP_FIELD_PARAMS ) { |
|---|
| 363 |
params.Write(value); |
|---|
| 364 |
continue; |
|---|
| 365 |
} |
|---|
| 366 |
LOG_ERROR << "Ignoring unknown parameter: [" << key << "]" |
|---|
| 367 |
" , value: [" << value << "]"; |
|---|
| 368 |
} |
|---|
| 369 |
if ( params.IsEmpty() ) { |
|---|
| 370 |
LOG_ERROR << "Cannot find parameter: [" << RPC_HTTP_FIELD_PARAMS << "]." |
|---|
| 371 |
" in url: [" << url->path() << "]"; |
|---|
| 372 |
req->request()->server_data()->Write("Cannot find parameter: [" |
|---|
| 373 |
+ string(RPC_HTTP_FIELD_PARAMS) + "] in url: [" + url->path() + "]"); |
|---|
| 374 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 375 |
delete service_name; |
|---|
| 376 |
delete method_name; |
|---|
| 377 |
return; |
|---|
| 378 |
} |
|---|
| 379 |
|
|---|
| 380 |
p.header_.msgType_ = RPC_CALL; |
|---|
| 381 |
p.header_.xid_ = 0; |
|---|
| 382 |
p.cbody_.service_ = *service_name; |
|---|
| 383 |
p.cbody_.method_ = *method_name; |
|---|
| 384 |
// p.cbody_.params_ were filled directly |
|---|
| 385 |
} |
|---|
| 386 |
delete service_name; |
|---|
| 387 |
service_name = NULL; |
|---|
| 388 |
delete method_name; |
|---|
| 389 |
method_name = NULL; |
|---|
| 390 |
|
|---|
| 391 |
|
|---|
| 392 |
///////////////////////////////////////////////////////////////// |
|---|
| 393 |
// |
|---|
| 394 |
// read RPC packet from HTTP POST |
|---|
| 395 |
// |
|---|
| 396 |
if ( req->request()->client_header()->method() == http::METHOD_POST ) { |
|---|
| 397 |
// find codec ID in HTTP header (every request should specify the rpc codec) |
|---|
| 398 |
// |
|---|
| 399 |
do { |
|---|
| 400 |
string strCodecID; |
|---|
| 401 |
bool success = req->request()->client_header()->FindField( |
|---|
| 402 |
string(RPC_HTTP_FIELD_CODEC_ID), &strCodecID); |
|---|
| 403 |
if ( !success ) { |
|---|
| 404 |
// LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID |
|---|
| 405 |
// << "'. Cannot decode rpc query from http request."; |
|---|
| 406 |
// req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 407 |
// return; |
|---|
| 408 |
LOG_ERROR << "Cannot find field '" << RPC_HTTP_FIELD_CODEC_ID |
|---|
| 409 |
<< "'. Assuming rpc::CID_JSON(" << rpc::CID_JSON << ")."; |
|---|
| 410 |
codec = &json_codec_; |
|---|
| 411 |
break; |
|---|
| 412 |
} |
|---|
| 413 |
// create codec |
|---|
| 414 |
// |
|---|
| 415 |
errno = 0; // required, to detect ::strtol failure |
|---|
| 416 |
uint32 nCodecID = ::strtol(strCodecID.c_str(), NULL, 10); |
|---|
| 417 |
if ( errno != 0 ) { |
|---|
| 418 |
LOG_ERROR << "invalid codec_id, not a number: [" << strCodecID << "]"; |
|---|
| 419 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 420 |
return; |
|---|
| 421 |
} |
|---|
| 422 |
switch ( nCodecID ) { |
|---|
| 423 |
case rpc::CID_BINARY: |
|---|
| 424 |
codec = &binary_codec_; |
|---|
| 425 |
break; |
|---|
| 426 |
case rpc::CID_JSON: |
|---|
| 427 |
codec = &json_codec_; |
|---|
| 428 |
break; |
|---|
| 429 |
default: |
|---|
| 430 |
LOG_ERROR << "invalid codec_id value: [" << strCodecID << "]"; |
|---|
| 431 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 432 |
return; |
|---|
| 433 |
} |
|---|
| 434 |
} while( false ); |
|---|
| 435 |
|
|---|
| 436 |
// HTTP client data should contain the RPC packet |
|---|
| 437 |
// |
|---|
| 438 |
// decode rpc message |
|---|
| 439 |
// |
|---|
| 440 |
DECODE_RESULT result = codec->DecodePacket(*req->request()->client_data(), |
|---|
| 441 |
p); |
|---|
| 442 |
if ( result == DECODE_RESULT_ERROR ) { |
|---|
| 443 |
LOG_ERROR << "Error decoding RPC message. Bad data."; |
|---|
| 444 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 445 |
return; |
|---|
| 446 |
} |
|---|
| 447 |
if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) { |
|---|
| 448 |
LOG_ERROR << "Incomplete RPC message. Bad data."; |
|---|
| 449 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 450 |
return; |
|---|
| 451 |
} |
|---|
| 452 |
CHECK_EQ(result, DECODE_RESULT_SUCCESS); |
|---|
| 453 |
|
|---|
| 454 |
if ( p.header_.msgType_ != RPC_CALL ) { |
|---|
| 455 |
LOG_ERROR << "Received a non-CALL message! ignoring: " << p; |
|---|
| 456 |
req->request()->server_data()->Write("Ignoring no-CALL message!"); |
|---|
| 457 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 458 |
return; |
|---|
| 459 |
} |
|---|
| 460 |
} |
|---|
| 461 |
|
|---|
| 462 |
// "codec" should be set |
|---|
| 463 |
CHECK_NOT_NULL(codec); |
|---|
| 464 |
// "p" should be filled |
|---|
| 465 |
CHECK_EQ(p.header_.msgType_, RPC_CALL); |
|---|
| 466 |
|
|---|
| 467 |
|
|---|
| 468 |
/////////////////////////////////////////////////////////// |
|---|
| 469 |
// |
|---|
| 470 |
// handle RPC message |
|---|
| 471 |
// |
|---|
| 472 |
LOG_DEBUG << "Handle received packet: " << p; |
|---|
| 473 |
|
|---|
| 474 |
if ( p.header_.msgType_ != RPC_CALL ) { |
|---|
| 475 |
// TODO(cpopescu): stats |
|---|
| 476 |
LOG_ERROR << "Received a non-CALL message! ignoring: " << p; |
|---|
| 477 |
req->ReplyWithStatus(http::BAD_REQUEST); |
|---|
| 478 |
return; |
|---|
| 479 |
} |
|---|
| 480 |
|
|---|
| 481 |
// extract transport information |
|---|
| 482 |
// |
|---|
| 483 |
rpc::Transport transport(rpc::Transport::HTTP, |
|---|
| 484 |
net::HostPort(), |
|---|
| 485 |
req->remote_address()); |
|---|
| 486 |
if ( authenticator_ != NULL ) { |
|---|
| 487 |
string user, passwd; |
|---|
| 488 |
if ( req->request()->client_header()->GetAuthorizationField(&user, |
|---|
| 489 |
&passwd) ) { |
|---|
| 490 |
transport.set_user_passwd(user, passwd); |
|---|
| 491 |
} |
|---|
| 492 |
} |
|---|
| 493 |
|
|---|
| 494 |
// Any timeouts ? |
|---|
| 495 |
string timeout_str; |
|---|
| 496 |
int timeout = 0; |
|---|
| 497 |
if ( req->request()->client_header()->FindField("Timeout", &timeout_str) ) { |
|---|
| 498 |
errno = 0; // essential as strtol would not set a 0 errno |
|---|
| 499 |
timeout = strtol(timeout_str.c_str(), NULL, 10); |
|---|
| 500 |
if ( timeout < 0 ) { |
|---|
| 501 |
timeout = 0; |
|---|
| 502 |
} |
|---|
| 503 |
} |
|---|
| 504 |
// TODO(cosmin): support timeouts in the rpc processing ! |
|---|
| 505 |
|
|---|
| 506 |
// Now see if secondary paths are involved |
|---|
| 507 |
string normalized_sub_path(strutil::NormalizePath(sub_path + '/')); |
|---|
| 508 |
MirrorMap::const_iterator it_mirror = mirror_map_.find(normalized_sub_path); |
|---|
| 509 |
if ( it_mirror != mirror_map_.end() ) { |
|---|
| 510 |
// WELL -- this is importnant - the called mirror request should not |
|---|
| 511 |
// cause the removal of the service |
|---|
| 512 |
for ( hash_set<string>::const_iterator it_dup = it_mirror->second->begin(); |
|---|
| 513 |
it_dup != it_mirror->second->end(); ++it_dup ) { |
|---|
| 514 |
LOG_DEBUG << " RPC mirror processing for : " << *it_dup << " / " |
|---|
| 515 |
<< p.cbody_.service_.StdStr() |
|---|
| 516 |
<< " per request on " << sub_path; |
|---|
| 517 |
ProcessRequestCore(NULL, *it_dup, transport, p, codec, timeout); |
|---|
| 518 |
} |
|---|
| 519 |
} |
|---|
| 520 |
|
|---|
| 521 |
ProcessRequestCore(req, sub_path, transport, p, codec, timeout); |
|---|
| 522 |
} |
|---|
| 523 |
|
|---|
| 524 |
|
|---|
| 525 |
void HttpServer::ProcessRequestCore(http::ServerRequest* req, |
|---|
| 526 |
const string& sub_path, |
|---|
| 527 |
const rpc::Transport& transport, |
|---|
| 528 |
const rpc::Message& p, |
|---|
| 529 |
rpc::Codec* codec, |
|---|
| 530 |
int timeout) { |
|---|
| 531 |
// extract call: service, method and arguments |
|---|
| 532 |
// |
|---|
| 533 |
const uint32 xid = p.header_.xid_; |
|---|
| 534 |
string service = p.cbody_.service_.StdStr(); |
|---|
| 535 |
const string method = p.cbody_.method_.StdStr(); |
|---|
| 536 |
|
|---|
| 537 |
|
|---|
| 538 |
ServicesMap::const_iterator it = |
|---|
| 539 |
services_.find(strutil::JoinPaths(sub_path, service)); |
|---|
| 540 |
if ( it == services_.end() ) { |
|---|
| 541 |
LOG_ERROR << "Cannot serve service: " << service; |
|---|
| 542 |
if ( req ) { |
|---|
| 543 |
req->ReplyWithStatus(http::NOT_FOUND); |
|---|
| 544 |
} |
|---|
| 545 |
return; |
|---|
| 546 |
} |
|---|
| 547 |
rpc::ServiceInvoker* const invoker = it->second; |
|---|
| 548 |
if ( service != invoker->GetName() ) { |
|---|
| 549 |
service = invoker->GetName(); // may miss a '/' |
|---|
| 550 |
} |
|---|
| 551 |
// check if we can serve this one: |
|---|
| 552 |
io::MemoryStream* params = NULL; |
|---|
| 553 |
|
|---|
| 554 |
if ( req ) { |
|---|
| 555 |
params = const_cast<io::MemoryStream*>(&p.cbody_.params_); |
|---|
| 556 |
} else { |
|---|
| 557 |
params = new io::MemoryStream(common::kByteOrder, |
|---|
| 558 |
p.cbody_.params_.Size()); |
|---|
| 559 |
params->AppendStreamNonDestructive(&p.cbody_.params_); |
|---|
| 560 |
} |
|---|
| 561 |
|
|---|
| 562 |
// create an internal query. |
|---|
| 563 |
if ( req ) { |
|---|
| 564 |
req->request_mutex()->Lock(); |
|---|
| 565 |
if ( req->is_orphaned() ) { |
|---|
| 566 |
req->request_mutex()->Unlock(); |
|---|
| 567 |
return; |
|---|
| 568 |
} |
|---|
| 569 |
// TODO(cpopescu): parametrize this one !! |
|---|
| 570 |
req->request()->set_server_use_gzip_encoding(false); |
|---|
| 571 |
} |
|---|
| 572 |
|
|---|
| 573 |
rpc::Query* const query = |
|---|
| 574 |
new rpc::Query(transport, xid, service, method, *params, *codec, 0); |
|---|
| 575 |
if ( req ) { |
|---|
| 576 |
query->SetCompletionCallback( |
|---|
| 577 |
NewCallback(this, &HttpServer::RpcCallback, req)); |
|---|
| 578 |
} else { |
|---|
| 579 |
query->SetCompletionCallback( |
|---|
| 580 |
NewCallback(this, &HttpServer::RpcSecondaryCallback, params)); |
|---|
| 581 |
} |
|---|
| 582 |
|
|---|
| 583 |
if ( selector_ != NULL && !selector_->IsInSelectThread() ) { |
|---|
| 584 |
selector_->RunInSelectLoop(NewCallback(&InvokeCall, invoker, query)); |
|---|
| 585 |
} else if ( !invoker->Call(query) ) { |
|---|
| 586 |
delete query; |
|---|
| 587 |
if ( req ) { |
|---|
| 588 |
current_requests_--; |
|---|
| 589 |
// TODO(cpopescu): stats |
|---|
| 590 |
req->ReplyWithStatus(http::INTERNAL_SERVER_ERROR); |
|---|
| 591 |
} |
|---|
| 592 |
} |
|---|
| 593 |
if ( req ) { |
|---|
| 594 |
req->request_mutex()->Unlock(); |
|---|
| 595 |
} else { |
|---|
| 596 |
current_requests_--; |
|---|
| 597 |
} |
|---|
| 598 |
} |
|---|
| 599 |
|
|---|
| 600 |
void HttpServer::RpcSecondaryCallback(io::MemoryStream* params, |
|---|
| 601 |
const rpc::Query& query) { |
|---|
| 602 |
LOG_DEBUG << " Secondary rpc call completed for: " << query.ToString(); |
|---|
| 603 |
delete params; |
|---|
| 604 |
} |
|---|
| 605 |
|
|---|
| 606 |
void HttpServer::RpcCallback(http::ServerRequest* req, |
|---|
| 607 |
const rpc::Query& query) { |
|---|
| 608 |
// On several errors we want to reply HTTP directly |
|---|
| 609 |
if ( query.status() == RPC_UNAUTHORIZED ) { |
|---|
| 610 |
// This is safe from any thread.. |
|---|
| 611 |
if ( !authenticator_->realm().empty() ) { |
|---|
| 612 |
req->request()->server_header()->AddField( |
|---|
| 613 |
http::kHeaderWWWAuthenticate, |
|---|
| 614 |
strutil::StringPrintf("Basic realm=\"%s\"", |
|---|
| 615 |
authenticator_->realm().c_str()), |
|---|
| 616 |
true); |
|---|
| 617 |
} |
|---|
| 618 |
req->ReplyWithStatus(http::UNAUTHORIZED); |
|---|
| 619 |
return; |
|---|
| 620 |
} |
|---|
| 621 |
// create a RPC result message, and serialize it in |
|---|
| 622 |
// http request -> server_data |
|---|
| 623 |
// |
|---|
| 624 |
rpc::Message p; |
|---|
| 625 |
rpc::Message::Header& header = p.header_; |
|---|
| 626 |
header.xid_ = query.qid(); |
|---|
| 627 |
header.msgType_ = RPC_REPLY; |
|---|
| 628 |
|
|---|
| 629 |
rpc::Message::ReplyBody& body = p.rbody_; |
|---|
| 630 |
body.replyStatus_ = query.status(); |
|---|
| 631 |
body.result_.AppendStreamNonDestructive(&query.result()); |
|---|
| 632 |
|
|---|
| 633 |
// encode the RPC result message |
|---|
| 634 |
// |
|---|
| 635 |
io::MemoryStream * reply = new io::MemoryStream(); |
|---|
| 636 |
query.codec().EncodePacket(*reply, p); |
|---|
| 637 |
|
|---|
| 638 |
RpcReply(req, reply); |
|---|
| 639 |
|
|---|
| 640 |
// TODO(cpopescu): stats |
|---|
| 641 |
// -- including req->is_orphaned() |
|---|
| 642 |
// -- TODO(cpopescu): break request on orphaned requests |
|---|
| 643 |
} |
|---|
| 644 |
|
|---|
| 645 |
void HttpServer::RpcReply(http::ServerRequest* req, |
|---|
| 646 |
io::MemoryStream* reply) { |
|---|
| 647 |
if ( !selector_->IsInSelectThread() ) { |
|---|
| 648 |
selector_->RunInSelectLoop(NewCallback(this, &HttpServer::RpcReply, |
|---|
| 649 |
req, reply)); |
|---|
| 650 |
return; |
|---|
| 651 |
} |
|---|
| 652 |
current_requests_--; |
|---|
| 653 |
req->request()->server_data()->AppendStreamNonDestructive(reply); |
|---|
| 654 |
req->Reply(); |
|---|
| 655 |
|
|---|
| 656 |
delete reply; |
|---|
| 657 |
} |
|---|
| 658 |
} |
|---|