root/trunk/whisperer/stream_poster.cc

Revision 7, 6.6 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Mihai Ianculescu
31
32 #include <netdb.h>
33
34 #include <whisperlib/common/base/common.h>
35 #include "stream_poster.h"
36
37 #include "stream_manager.h"
38
39 // our namespace
40 using namespace media;
41
42 DEFINE_int32(publishing_retry_timeout, 1000,
43     "The default stream publishing request timeout");
44
45 StreamPoster::StreamPoster(StreamManager *manager,
46     const http::ClientParams *http_client_params) :
47   manager_(manager),
48   http_client_params_(http_client_params),
49   done_callback_(NULL),
50   request_(NULL),
51   started_(false),
52   stopped_(false) {
53   CHECK_NOT_NULL(manager_);
54   CHECK_NOT_NULL(http_client_params_);
55 }
56 StreamPoster::~StreamPoster() {
57   if (done_callback_)
58     done_callback_->Run();
59 }
60
61 bool StreamPoster::Start(const URL& url,
62                          int retry_timeout,
63                          const string& user,
64                          const string& pass,
65                          Closure* done_callback) {
66   url_ = url;
67   retry_timeout_ = retry_timeout;
68   user_ = user;
69   pass_ = pass;
70
71   if (!StreamManager::ParseQueryString(
72       url_,
73       relay_,
74       wrapped_,
75       encoder_type_,
76       audio_params_,
77       video_params_,
78       audio_port_,
79       video_port_)) {
80     LOG_ERROR << "Couldn't parse the parameters from [" << url_ << "].";
81     return false;
82   }
83
84   // run the request
85   LOG_DEBUG << "Scheduling a publishing POST to [" << url_ << "].";
86   manager_->selector()->RunInSelectLoop(
87       NewCallback(this, &StreamPoster::Run));
88   started_ = true;
89
90   done_callback_ = done_callback;
91   return true;
92 }
93 void StreamPoster::Stop() {
94   if (started_) {
95     stopped_ = true;
96     if (request_) {
97       request_->Stop();
98     }
99   } else {
100     delete this;
101   }
102 }
103
104 void StreamPoster::Run() {
105   net::HostPort server(url_.host().c_str(),
106                        url_.IntPort() == -1 ? 80 : url_.IntPort());
107
108   CHECK_NULL(request_);
109   request_ = new PosterRequest(url_, server,
110                                user_, pass_,
111                                http_client_params_,
112                                NewCallback(this, &StreamPoster::Done));
113
114   LOG_INFO << "Running a publishing POST to [" << url_ << "].";
115   manager_->RunRequest(
116       request_,
117       url_.ExtractFileName().c_str(),
118       relay_, wrapped_, encoder_type_, &audio_params_, &video_params_,
119       audio_port_, video_port_);
120 }
121 void StreamPoster::Done() {
122   CHECK_NOT_NULL(request_);
123   request_ = NULL;
124
125   LOG_INFO << "The publishing POST to [" << url_ << "] has finished.";
126   if (stopped_) {
127     delete this;
128     return;
129   }
130
131   int retry_timeout = (retry_timeout_ == INT_MIN) ?
132   FLAGS_publishing_retry_timeout : retry_timeout_;
133
134   if (retry_timeout > 0) {
135     LOG_INFO <<
136     "Restarting the publishing POST to [" <<
137     url_ <<
138     "] in " << retry_timeout << " miliseconds.";
139
140     manager_->selector()->RegisterAlarm(
141         NewCallback(this, &StreamPoster::Run), retry_timeout);
142   } else {
143     LOG_INFO <<
144     "Restarting the publishing POST to [" << url_ << "].";
145     manager_->selector()->RunInSelectLoop(
146         NewCallback(this, &StreamPoster::Run));
147   }
148 }
149
150 // Implementation of StreamPoster::ServerRequest
151
152 void StreamPoster::PosterRequest::Start() {
153   CHECK_NULL(request_);
154   request_ = new http::ClientRequest(http::METHOD_POST, &url_);
155
156   CHECK_NULL(protocol_);
157   protocol_ = new http::ClientStreamingProtocol(http_client_params_,
158     new http::SimpleClientConnection(selector_, net_factory_, net_protocol_),
159     server_);
160
161   http::Header* const hs = request_->request()->client_header();
162   hs->SetChunkedTransfer(true);
163   if ( !user_.empty() || !pass_.empty() ) {
164     hs->SetAuthorizationField(user_, pass_);
165   }
166
167   if (wrapped_) {
168     hs->AddField(http::kHeaderContentType, streaming::kInternalMimeType, true);
169   } else {
170     hs->AddField(http::kHeaderContentType, mime_type_.c_str(), true);
171   }
172
173   protocol_->BeginStreaming(request_,
174       NewPermanentCallback(this, &StreamPoster::PosterRequest::RequestProcess));
175 }
176 void StreamPoster::PosterRequest::Fail(http::HttpReturnCode result) {
177   selector_->DeleteInSelectLoop(this);
178 }
179
180 void StreamPoster::PosterRequest::ReaderProcess(io::MemoryStream* inbuf) {
181   protocol_->UnpauseWriting();
182 }
183 void StreamPoster::PosterRequest::ReaderClose() {
184   reader_ = NULL;
185 }
186
187 bool StreamPoster::PosterRequest::RequestProcess(int32 available_out) {
188   DLOG_DEBUG << "RequestProcess() '" << url_ << "'...";
189   if (available_out < 0) {  // terminated
190     if (reader_ != NULL) {
191       reader_->Close();
192     }
193     selector_->DeleteInSelectLoop(this);
194     return false;
195   }
196
197   if (reader_ == NULL) {
198     selector_->DeleteInSelectLoop(this);
199     return false;
200   }
201
202   CHECK_NOT_NULL(processor_);
203
204   while (available_out > 0) {
205     // TODO(mihai): allow specification of chunk size
206     int32 to_send = min(8192, available_out);
207
208     bool send_frame;
209     if ( !processor_->Process(request_->request()->client_data(),
210                               reader_->inbuf(), to_send, send_frame) ) {
211       LOG_ERROR << "The processor returned failure for the publishing "
212                 << " POST to [" << url_.spec() << "], terminating the request.";
213       reader_->Close();
214       return false;
215     }
216
217     available_out -= to_send;
218   }
219   return true;
220 }
Note: See TracBrowser for help on using the browser.