root/trunk/whisperlib/net/rpc/lib/client/rpc_client_connection_http.cc

Revision 7, 8.5 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Cosmin Tudorache
31
32 #include "common/base/strutil.h"
33 #include "common/base/common.h"
34 #include "common/base/scoped_ptr.h"
35 #include "net/rpc/lib/client/rpc_client_connection_http.h"
36 #include "net/rpc/lib/rpc_constants.h"
37
38 namespace rpc {
39
40 ClientConnectionHTTP::ClientConnectionHTTP(
41     net::Selector& selector,
42     net::NetFactory& net_factory,
43     net::PROTOCOL net_protocol,
44     const http::ClientParams& params,
45     const net::HostPort& addr,
46     rpc::CODEC_ID codec_id,
47     const string& http_request_path)
48     : IClientConnection(selector, rpc::CONNECTION_HTTP, codec_id),
49       selector_(selector),
50       net_factory_(net_factory),
51       net_protocol_(net_protocol),
52       remote_addr_(addr),
53       http_params_(new http::ClientParams(params)),
54       http_protocol_(new http::ClientProtocol(
55                          http_params_,
56                          new http::SimpleClientConnection(&selector_,
57                                                           &net_factory_,
58                                                           net_protocol),
59                          remote_addr_)),
60       http_request_path_(http_request_path),
61       disconnect_completed_(false, true),
62       queries_() {
63 }
64
65 ClientConnectionHTTP::~ClientConnectionHTTP() {
66   Close();
67   delete http_params_;
68   http_params_ = NULL;
69   CHECK(queries_.empty());
70 }
71
72 const char* rpc::ClientConnectionHTTP::Error() const {
73   // http::ClientErrorName(http_protocol_->conn_error_);
74   return "no_error";
75 }
76
77 void rpc::ClientConnectionHTTP::Close() {
78   // Run this function no mater what. Maybe the selector just closed the
79   // connection, or a HandleError is in progress.. it doesn't matter.
80   // Testing for socket alive does not assure that the selector is not executing
81   // something in here.
82   //  e.g. [selector] -> rpc::ClientConnectionTCP::Close which closes fd but
83   //                     does not finish execution of Close()
84   //       [external] -> destructor -> Close -> test fd_ is invalid
85   //                     -> go on with destructor
86   //       [selector] -> still in rpc::ClientConnectionTCP::Close ==> Crash
87   //
88   if ( !selector_.IsInSelectThread() && !selector_.IsExiting() ) {
89     disconnect_completed_.Reset();
90     selector_.RunInSelectLoop(
91       NewCallback(this, &rpc::ClientConnectionHTTP::Close));
92     disconnect_completed_.Wait();
93     return;
94   }
95
96   // Notify the HTTP layer, to complete all pending queries with error.
97   http_protocol_->Clear();
98   delete http_protocol_;
99   http_protocol_ = NULL;
100
101   CHECK(queries_.empty()) << "#" << queries_.size() << " queries pending.";
102
103   LOG_INFO << "Disconnect from " << remote_address() << " completed";
104   disconnect_completed_.Signal();
105 }
106
107 //////////////////////////////////////////////////////////////////////
108 //
109 //         IClientConnection interface methods
110 //
111 void rpc::ClientConnectionHTTP::Send(const rpc::Message* p) {
112   // Keep this method [THREAD SAFE] !
113
114   // Create a http request containing:
115   //  - the codec id
116   //  - the encoded RPC message
117   //
118
119   LOG_INFO << "Sending request on http path: [" << http_request_path_ << "]";
120   // req will be deleted on CallbackRequestDone
121   http::ClientRequest* req = new http::ClientRequest(http::METHOD_POST,
122                                                      http_request_path_);
123   CHECK_NOT_NULL(req);
124
125   // write codec ID
126   bool success = req->request()->client_header()->AddField(
127     string(RPC_HTTP_FIELD_CODEC_ID),
128     strutil::StringPrintf("%d", GetCodec().GetCodecID()), false);
129   CHECK(success);
130
131   // write RPC message
132   GetCodec().EncodePacket(*req->request()->client_data(), *p);
133
134   // IMPORTANT:
135   //  The query result (or failure) MUST be delivered on a different call !
136   //  Many times we synchronize the Call and HandleCallResult, so the Call
137   //  should not call HandleCallResult right away.
138   //  We also defend against stack-overflow:
139   //   - call async function
140   //     - call completion handler
141   //       - call another async function
142   //         - ...
143
144   // Send the http request (through a callback from the selector thread)
145   //
146   selector_.RunInSelectLoop(
147     NewCallback(this, &rpc::ClientConnectionHTTP::SendRequest, req, p));
148 }
149
150 void rpc::ClientConnectionHTTP::Cancel(uint32 xid) {
151   // cannot cancel http request
152   UNUSED_ALWAYS(xid);
153 }
154
155 //////////////////////////////////////////////////////////////////////
156
157 void rpc::ClientConnectionHTTP::SendRequest(http::ClientRequest* req,
158                                             const rpc::Message* p) {
159   if ( !selector_.IsInSelectThread() ) {
160     selector_.RunInSelectLoop(
161       NewCallback(this, &rpc::ClientConnectionHTTP::SendRequest, req, p));
162     return;
163   }
164   if ( !http_protocol_->IsAlive() ) {
165     LOG_DEBUG << "http_protocol_ " << (http_protocol_ ? "DEAD" : "NULL")
166               << " , recreating http::ClientProtocol";
167     delete http_protocol_;
168     http_protocol_ = new http::ClientProtocol(
169       http_params_,
170       new http::SimpleClientConnection(&selector_, &net_factory_,
171                                        net_protocol_),
172       remote_addr_);
173   }
174   queries_.insert(make_pair(req, p));
175   http_protocol_->SendRequest(
176     req, NewCallback(this,
177                      &rpc::ClientConnectionHTTP::CallbackRequestDone,
178                      req));
179 }
180
181 void rpc::ClientConnectionHTTP::CallbackRequestDone(http::ClientRequest* req) {
182   CHECK(selector_.IsInSelectThread());
183   CHECK_NOT_NULL(req);
184   scoped_ptr<http::ClientRequest> auto_del_request(req);
185
186   // No need to synchornize queries_. All changes happen in selector thread!
187   QueryMap::iterator it = queries_.find(req);
188   if ( it == queries_.end() ) {
189     LOG_ERROR << "Cannot find active query for http ClientRequest: "
190               << req->name();
191     return;
192   }
193   CHECK_EQ(it->first, req);
194   scoped_ptr<const rpc::Message> q(it->second);
195   queries_.erase(it);
196
197   const http::ClientError cli_error = req->error();
198   const http::HttpReturnCode ret_code =
199     req->request()->server_header()->status_code();
200   if ( cli_error != http::CONN_OK || ret_code != http::OK ) {
201     const string error =
202       string("client_error=") + http::ClientErrorName(cli_error) +
203       string(" http_return_code=") + http::GetHttpReturnCodeName(ret_code);
204     LOG_ERROR << "Error: req[0x" << std::hex << req << std::dec << "]" << error;
205     NotifySendFailed(q.release(), RPC_CONN_ERROR);
206     return;
207   }
208
209   // wrap request buffer
210   io::MemoryStream* const in = req->request()->server_data();
211   in->MarkerSet(); // to be able to restore data on error & print it for debug
212
213   // decode RPC message from inside http request
214   rpc::Message* msg = new rpc::Message();
215   DECODE_RESULT result = GetCodec().DecodePacket(*in, *msg);
216   if ( result == DECODE_RESULT_ERROR ) {
217     in->MarkerRestore();
218     LOG_ERROR << "RPC Decode Message error, in http reply.";
219     delete msg;
220     return;
221   }
222
223   if ( result == DECODE_RESULT_NOT_ENOUGH_DATA ) {
224     in->MarkerRestore();
225     LOG_ERROR << "RPC Decode Message insufficient data, in http reply.";
226     delete msg;
227     return;
228   }
229   CHECK_EQ(result, DECODE_RESULT_SUCCESS);
230   in->MarkerClear();
231
232   // send msg to the IConnection which will send RPC response back to
233   // application
234   HandleResponse(msg);
235 }
236 }
Note: See TracBrowser for help on using the browser.