root/trunk/whisperlib/net/http/http_proxy.cc

Revision 7, 7.9 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31
32 #include "net/http/http_proxy.h"
33
34 #define LOG_PROXY LOG_INFO_IF(dlog_level_) <<  ": "
35
36 namespace http {
37
38 Proxy::Proxy(net::Selector* selector,
39              net::NetFactory* net_factory,
40              net::PROTOCOL net_protocol,
41              http::Server* server,
42              bool dlog_level)
43   : selector_(selector),
44     net_factory_(net_factory),
45     net_protocol_(net_protocol),
46     server_(server),
47     dlog_level_(dlog_level) {
48 }
49
50 Proxy::~Proxy() {
51   for ( ReqSet::const_iterator it = pending_requests_.begin();
52         it != pending_requests_.end(); ++it ) {
53     FinalizeRequest(*it);
54   }
55   pending_requests_.clear();
56   for ( ProcMap::const_iterator it = registered_processors_.begin();
57         it != registered_processors_.end(); ++it ) {
58     server_->UnregisterProcessor(it->first);
59   }
60   registered_processors_.clear();
61 }
62
63 bool Proxy::RegisterPath(const string& path,
64                          net::HostPort remote_server,
65                          const ClientParams* params) {
66   http::Server::ServerCallback* callback = NewPermanentCallback(
67     this, &Proxy::ProcessRequest, remote_server, params);
68   if ( !server_->RegisterProcessor(path, callback, true) ) {
69     delete callback;
70     return false;
71   }
72   registered_processors_.insert(make_pair(path, callback));
73   return true;
74 }
75
76 bool Proxy::UnregisterPath(const string& path) {
77   ProcMap::iterator it = registered_processors_.find(path);
78   if ( it == registered_processors_.end() ) {
79     return false;
80   }
81   if ( !server_->UnregisterProcessor(path) ) {
82     return false;
83   }
84   registered_processors_.erase(it);
85   return true;
86 }
87
88 void Proxy::ProcessRequest(net::HostPort remote_server,
89                            const ClientParams* params,
90                            ServerRequest* sreq) {
91   ProxyReqStruct* preq = new ProxyReqStruct();
92   preq->creq_ = new http::ClientRequest(
93       sreq->request()->client_header()->method(),
94       sreq->request()->url());
95   preq->creq_->request()->client_header()->Clear();
96   preq->creq_->request()->client_header()->CopyHeaders(
97     *(sreq->request()->client_header()), true);
98   preq->creq_->request()->client_header()->AddField(
99     "X-Forwarded-For", sreq->remote_address().ip_object().ToString(), true);
100
101   preq->creq_->request()->client_data()->AppendStream(
102     sreq->request()->client_data());
103   preq->creq_->set_is_pure_dumping(true);
104
105   preq->proto_ = new ClientStreamReceiverProtocol(params,
106       new http::SimpleClientConnection(selector_, net_factory_, net_protocol_),
107       remote_server);
108
109   preq->sreq_ = sreq;
110   preq->callback_ = NewPermanentCallback(this, &Proxy::StreamingCallback, preq);
111   preq->done_callback_ = NewPermanentCallback(this, &Proxy::ClearRequest, preq);
112   pending_requests_.insert(preq);
113   LOG_PROXY << " PROXY: Starting request for: " << *sreq->request()->url()
114             << " [" << preq << "].. Header: \n"
115             << preq->creq_->request()->client_header()->ToString();
116
117   selector_->RunInSelectLoop(
118     NewCallback(preq->proto_,
119                 &ClientStreamReceiverProtocol::BeginStreamReceiving,
120                 preq->creq_, preq->callback_));
121 }
122
123 void Proxy::StreamingCallback(ProxyReqStruct* preq) {
124   if ( !selector_->IsInSelectThread() ) {
125     selector_->RunInSelectLoop(NewCallback(this,
126                                            &Proxy::StreamingCallback, preq));
127     return;
128   }
129   if ( preq->creq_->is_finalized() &&
130        preq->creq_->request()->server_data()->IsEmpty() ) {
131     FinalizeRequest(preq);
132     return;
133   }
134   int32 size_to_send = 0;
135   {
136     synch::MutexLocker l(preq->sreq_->request_mutex());
137     size_to_send = preq->sreq_->free_output_bytes();
138     if ( preq->sreq_->is_orphaned() ) {
139       LOG_PROXY << " PROXY: orphaned request !";
140       ClearRequest(preq);
141       return;
142     }
143     if ( !preq->http_header_sent_ ) {
144       preq->http_header_sent_ = true;
145       preq->sreq_->request()->server_header()->CopyHeaders(
146         *(preq->creq_->request()->server_header()), true);
147       // These will be set from the request sending part ..
148       preq->sreq_->request()->server_header()->SetContentEncoding(NULL);
149       const bool try_chunked =
150         preq->sreq_->request()->client_header()->http_version() >= VERSION_1_1;
151       if ( try_chunked ) {
152         preq->sreq_->request()->server_header()->ClearField(
153             kHeaderContentLength);
154       }
155       // We clear this anyway ..
156       preq->sreq_->request()->server_header()->SetChunkedTransfer(false);
157       preq->sreq_->BeginStreamingData(
158         preq->sreq_->request()->server_header()->status_code(),
159         NULL, preq->done_callback_, try_chunked);
160       LOG_PROXY << "PROXY:  Remote server header: \n"
161                 << preq->sreq_->request()->server_header()->ToString();
162     }
163   }
164   const int32 size_to_copy = min(size_to_send,
165                                  preq->creq_->request()->server_data()->Size());
166   if ( size_to_copy > 0 ) {
167     preq->sreq_->request()->server_data()->AppendStreamNonDestructive(
168       preq->creq_->request()->server_data(), size_to_copy);
169     preq->creq_->request()->server_data()->Skip(size_to_copy);
170   }
171   if ( !preq->creq_->request()->server_data()->IsEmpty() ) {
172     preq->proto_->PauseReading();
173     preq->paused_ = true;
174     preq->sreq_->ContinueStreamingData(preq->callback_);
175   } else {
176     if ( preq->paused_ ) {
177       preq->proto_->UnpauseReading();
178       preq->paused_ = false;
179     }
180     if ( !preq->creq_->request()->server_data()->IsEmpty() ) {
181       preq->sreq_->ContinueStreamingData(preq->callback_);
182     } else {
183       preq->sreq_->ContinueStreamingData(NULL);
184     }
185   }
186   if ( preq->creq_->is_finalized() &&
187        preq->creq_->request()->server_data()->IsEmpty() ) {
188     FinalizeRequest(preq);
189     return;
190   }
191 }
192
193 void Proxy::FinalizeRequest(ProxyReqStruct* preq) {
194   if ( !preq->http_header_sent_ ) {
195     preq->http_header_sent_ = true;
196     const http::Header* cli_head = preq->sreq_->request()->server_header();
197     if ( cli_head->status_code() == http::UNKNOWN ) {
198       preq->sreq_->ReplyWithStatus(BAD_GATEWAY);
199     } else {
200       preq->sreq_->request()->server_header()->CopyHeaders(*cli_head, true);
201       preq->sreq_->ReplyWithStatus(cli_head->status_code());
202     }
203   } else {
204     preq->sreq_->EndStreamingData();
205   }
206 }
207
208 void Proxy::ClearRequest(ProxyReqStruct* preq) {
209   selector_->DeleteInSelectLoop(preq->proto_);
210   selector_->DeleteInSelectLoop(preq->creq_);
211   delete preq->callback_;
212   preq->sreq_->ClearClosedCallback();
213   delete preq;
214   pending_requests_.erase(preq);
215 }
216 }
Note: See TracBrowser for help on using the browser.