root/trunk/whisperlib/net/rpc/lib/client/rpc_failsafe_client_connection_http.cc

Revision 7, 6.3 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "common/base/strutil.h"
33 #include "common/base/common.h"
34 #include "common/base/scoped_ptr.h"
35 #include "net/rpc/lib/client/rpc_failsafe_client_connection_http.h"
36 #include "net/rpc/lib/rpc_constants.h"
37
38 namespace rpc {
39
40 FailsafeClientConnectionHTTP::FailsafeClientConnectionHTTP(
41     net::Selector* selector,
42     rpc::CODEC_ID codec_id,
43     http::FailSafeClient* failsafe_client,
44     const string& http_request_path,
45     const string& auth_user,
46     const string& auth_pass)
47     : IClientConnection(*selector, rpc::CONNECTION_FAILSAFE_HTTP, codec_id),
48       failsafe_client_(failsafe_client),
49       http_request_path_(http_request_path),
50       auth_user_(auth_user),
51       auth_pass_(auth_pass) {
52 }
53
54 FailsafeClientConnectionHTTP::~FailsafeClientConnectionHTTP() {
55   Close();
56   delete failsafe_client_;
57   CHECK(queries_.empty());
58 }
59
60 const char* FailsafeClientConnectionHTTP::Error() const {
61   return "no_error";
62 }
63
64 void FailsafeClientConnectionHTTP::Close() {
65   delete failsafe_client_;
66   failsafe_client_ = NULL;
67
68   CHECK(queries_.empty()) << "#" << queries_.size() << " queries pending.";
69 }
70
71 void FailsafeClientConnectionHTTP::Send(const rpc::Message* p) {
72   // Create a http request containing:
73   //  - the codec id
74   //  - the encoded RPC message
75   //
76
77   DLOG_DEBUG << "Sending request on http path: [" << http_request_path_ << "]";
78   // req will be deleted on CallbackRequestDone
79   http::ClientRequest* req = new http::ClientRequest(
80       http::METHOD_POST, http_request_path_);
81   //strutil::StringPrintf("%s/%s/%s",
82   //                           http_request_path_.c_str(),
83   //                          p->cbody_.service_.StdStr().c_str(),
84   //                          p->cbody_.method_.StdStr().c_str()));
85   CHECK_NOT_NULL(req);
86
87   // write codec ID
88   req->request()->client_header()->AddField(
89       string(RPC_HTTP_FIELD_CODEC_ID),
90       strutil::StringPrintf("%d", GetCodec().GetCodecID()), false);
91
92   if ( !auth_user_.empty() ) {
93     req->request()->client_header()->SetAuthorizationField(auth_user_,
94                                                            auth_pass_);
95   }
96
97   // write RPC message
98   GetCodec().EncodePacket(*req->request()->client_data(), *p);
99
100   QueryStruct* qs = new QueryStruct(req, p);
101   queries_.insert(make_pair(qs->xid_, qs));
102
103   // NOTE is essential to interpose a selector loop before calling -
104   //  async RPC callbacks should not return immediatly
105   Closure* const req_done_callback =
106       NewCallback(
107           this,
108           &rpc::FailsafeClientConnectionHTTP::CallbackRequestDone,
109           qs);
110   selector_.RunInSelectLoop(
111       NewCallback(failsafe_client_,
112                   &http::FailSafeClient::StartRequest,
113                   qs->req_, req_done_callback));
114 }
115
116 void FailsafeClientConnectionHTTP::Cancel(uint32 xid) {
117   QueryMap::const_iterator it = queries_.find(xid);
118   if ( it != queries_.end() ) {
119     it->second->cancelled_ = true;
120   }
121 }
122
123 void FailsafeClientConnectionHTTP::CallbackRequestDone(QueryStruct* qs) {
124   DCHECK(selector_.IsInSelectThread());
125   CHECK(queries_.erase(qs->xid_));
126   if ( !qs->cancelled_ ) {
127     const http::ClientError cli_error = qs->req_->error();
128     const http::HttpReturnCode ret_code =
129         qs->req_->request()->server_header()->status_code();
130     if ( cli_error != http::CONN_OK || ret_code != http::OK ) {
131       LOG_ERROR << "Error: req[" << qs->req_->name() << ": "
132                 << http::ClientErrorName(cli_error)
133                 << " http: " << http::GetHttpReturnCodeName(ret_code);
134       NotifySendFailed(qs->msg_, RPC_CONN_ERROR);
135     } else {
136       // wrap request buffer
137       io::MemoryStream* const in = qs->req_->request()->server_data();
138       in->MarkerSet(); // to be able to restore data on error & print
139                        // it for debug
140       // decode RPC message from inside http request
141       rpc::Message* msg = new rpc::Message();
142       DECODE_RESULT result = GetCodec().DecodePacket(*in, *msg);
143       if ( result == DECODE_RESULT_ERROR ) {
144         LOG_ERROR << "Error in RPC encountered: "
145                   << qs->msg_->cbody_.service_ << "::"
146                   << qs->msg_->cbody_.method_;
147         in->MarkerRestore();
148         DLOG_DEBUG << "Received message: " << in->DebugString();
149         delete msg;
150       } else if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) {
151         LOG_ERROR << "RPC Decode Message insufficient data, in http reply for "
152                   << qs->msg_->cbody_.service_ << "::"
153                   << qs->msg_->cbody_.method_;
154         in->MarkerRestore();
155         DLOG_DEBUG << "Received message: " << in;
156         delete msg;
157       } else {
158         DCHECK_EQ(result, DECODE_RESULT_SUCCESS);
159         in->MarkerClear();
160         // send msg to the IConnection which will send RPC response back to
161         // application
162         HandleResponse(msg);
163       }
164     }
165   }
166   delete qs;
167 }
168 }
Note: See TracBrowser for help on using the browser.