root/trunk/whisperer/stream_manager.cc

Revision 7, 25.4 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Mihai Ianculescu
31
32 #include <whisperlib/common/base/common.h>
33 #include <whisperlib/common/io/file/file.h>
34 #include <whisperlib/common/io/file/file_output_stream.h>
35 #include <whisperlib/common/io/file/file_input_stream.h>
36
37 #include "stream_manager.h"
38
39 //////////////////////////////////////////////////////////////////////
40
41 DEFINE_int32(max_lag,
42              1024*1024,
43              "The default maximum lag on the providers");
44
45 //////////////////////////////////////////////////////////////////////
46
47 // our namespace
48 using namespace media;
49
50 static
51 http::HttpReturnCode ProviderResult2HTTPCode(
52   media::StreamProvider::Result result) {
53   switch (result) {
54   case media::StreamProvider::NoError:
55     return http::OK;
56   case media::StreamProvider::PipelineParametersError:
57     return http::BAD_REQUEST;
58   default:
59     return http::INTERNAL_SERVER_ERROR;
60   }
61 }
62
63 bool StreamManager::ParseQueryString(
64     const URL& url,
65     bool& relay,
66     bool& wrapped,
67     media::Stream::Type& encoder_type,
68     media::Encoder::Audio::Params& audio_params,
69     media::Encoder::Video::Params& video_params,
70     int& audio_port,
71     int& video_port) {
72   vector< pair<string, string> > query;
73   url.GetQueryParameters(&query, true);
74
75   parsable::Boolean relay_;
76   if (!media::ParseFromValues(&relay_, "relay", query)) {
77     LOG_ERROR << "Couldn't parse the relay flag from '" << url.spec() << "'.";
78     return false;
79   }
80   if (!relay_.IsSet()) {
81     relay = false;
82   } else {
83     relay = relay_;
84   }
85   parsable::Boolean wrapped_;
86   if (!media::ParseFromValues(&wrapped_, "wrapped", query)) {
87     LOG_ERROR << "Couldn't parse the wrapped flag from '" << url.spec() << "'.";
88     return false;
89   }
90   if (!wrapped_.IsSet()) {
91     wrapped = false;
92   } else {
93     wrapped = wrapped_;
94   }
95
96   encoder_type = media::Stream::Unknown;
97
98   if (!ParseEncoderParameters(query, encoder_type,
99                               audio_params, video_params,
100                               audio_port, video_port)) {
101     LOG_ERROR << "Couldn't parse the encoding parameters from '"
102               << url.spec() << "'.";
103     return false;
104   }
105   return true;
106 }
107
108 StreamManager::StreamManager(net::Selector* selector,
109                              net::NetFactory* http_net_factory,
110                              net::PROTOCOL http_net_protocol,
111                              io::StateKeeper* state_keeper,
112                              http::Server* http_server,
113                              int http_server_port,
114                              const http::ClientParams* http_client_params,
115                              const char* media_root,
116                              rpc::HttpServer* rpc_server,
117                              const char* config_file) :
118   ServiceInvokerMediaProviderService(
119       ServiceInvokerMediaProviderService::GetClassName()),
120   selector_(selector),
121   http_net_factory_(http_net_factory),
122   http_net_protocol_(http_net_protocol),
123   state_keeper_(state_keeper),
124   stream_server_(NULL),
125   http_server_(http_server),
126   rpc_server_(rpc_server),
127   http_server_port_(http_server_port),
128   http_client_params_(http_client_params),
129   provider_media_root_((media_root == NULL) ? "" : media_root),
130   config_file_((config_file == NULL) ? "" : config_file),
131   running_(false) {
132   CHECK_NOT_NULL(selector_);
133 }
134
135 StreamManager::~StreamManager() {
136   CHECK_NULL(stream_server_);
137 }
138
139 void StreamManager::AddProvider(media::StreamProvider* provider) {
140   CHECK(stream_providers_.find(provider->name()) == stream_providers_.end());
141   stream_providers_[provider->name()] = provider;
142
143   provider->Initialize(selector_);
144   provider->SetMessageQueueCallback(
145     NewPermanentCallback(this, &StreamManager::PostMessageCallback, provider));
146
147   if (running_) {
148     CHECK(provider->Start());
149   }
150
151   CHECK(stream_rpc_.find(provider) == stream_rpc_.end());
152
153   if (rpc_server_ != NULL) {
154     CHECK(rpc_server_->RegisterService(
155         (std::string(provider->name()) + "/"), provider));
156   }
157
158   RPC rpc;
159   stream_rpc_[provider] = rpc;
160   LOG_INFO << "Provider '" << provider->name() << "' was added.";
161 }
162
163 void StreamManager::RemoveProvider(const char* name) {
164   ProviderMap::iterator it = stream_providers_.find(name);
165   if (it != stream_providers_.end()) {
166     StreamProvider* provider = it->second;
167     stream_providers_.erase(it);
168
169     stream_providers_removed_.insert(provider);
170
171     RemoveRPC(provider);
172     provider->Stop();
173   }
174   AuthenticatorMap::iterator it_auth = authenticators_.find(name);
175   if ( it_auth != authenticators_.end() ) {
176     delete it_auth->second;
177     authenticators_.erase(it_auth);
178   }
179   AuthenticatorUserMap::iterator it_auth_user = authenticators_users_.find(name);
180   if ( it_auth_user != authenticators_users_.end() ) {
181     delete it_auth_user->second;
182     authenticators_users_.erase(it_auth_user);
183   }
184 }
185
186 bool StreamManager::Publish(const URL& url,
187                             int retry_timeout,
188                             const string& user,
189                             const string& pass) {
190   CHECK(url.is_valid());
191
192   StreamPoster* poster = new StreamPoster(this, http_client_params_);
193   if ( poster->Start(
194            url,
195            retry_timeout,
196            user, pass,
197            NewCallback(this, &StreamManager::PosterDone, poster)) ) {
198     published_[url.spec()] = poster;
199     return true;
200   }
201
202   poster->Stop(); // will also auto-delete itself
203   return false;
204 }
205 void StreamManager::Unpublish(const URL& url) {
206   CHECK(url.is_valid());
207
208   PosterMap::iterator poster = published_.find(url.spec());
209   if (poster != published_.end()) {
210     poster->second->Stop();
211   }
212 }
213
214 bool StreamManager::RunRequest(
215     StreamRequest* request,
216     const char* name,
217     bool relay,
218     bool wrapped,
219     media::Stream::Type encoder_type,
220     const media::Encoder::Audio::Params* const audio_params,
221     const media::Encoder::Video::Params* const video_params,
222     int audio_port,
223     int video_port) {
224   request->selector_ = selector_;
225   request->net_factory_ = http_net_factory_;
226   request->net_protocol_ = http_net_protocol_;
227
228   AuthenticatorMap::iterator it_auth = authenticators_.find(name);
229   if ( it_auth != authenticators_.end() ) {
230     // TODO(cpopescu): use the async interface at some point
231     if ( !request->Authenticate(it_auth->second) ) {
232       return false;
233     }
234   }
235
236   ProviderMap::iterator provider = stream_providers_.find(name);
237   if (provider == stream_providers_.end()) {
238     request->Fail(http::BAD_REQUEST);
239     return false;
240   }
241
242   media::StreamProvider::Result result;
243   if ((result = provider->second->CreateSource()) !=
244       media::StreamProvider::NoError) {
245     request->Fail(ProviderResult2HTTPCode(result));
246     return false;
247   }
248
249   int fd;
250   const char* mime_type;
251
252   if (!relay) {
253     media::StreamProvider::Result result;
254     if ((result = provider->second->CreateEncoder(encoder_type,
255         audio_params, video_params, audio_port, video_port,
256         wrapped, fd, mime_type)) != media::StreamProvider::NoError) {
257       request->Fail(ProviderResult2HTTPCode(result));
258       return false;
259     }
260   } else {
261     // we need to get the source's mime-type, as we cannot determine it
262     mime_type = provider->second->source_mime_type().c_str();
263
264     if ((result = provider->second->CreateRelay(wrapped, fd)) !=
265         media::StreamProvider::NoError) {
266       request->Fail(ProviderResult2HTTPCode(result));
267       return false;
268     }
269   }
270
271   request->mime_type_ = mime_type;
272
273   if (provider->second->max_lag() == INT_MIN)
274     request->max_lag_ = FLAGS_max_lag;
275   else
276     request->max_lag_ = provider->second->max_lag();
277   CHECK_GT(request->max_lag_, 0);
278
279   CHECK_NULL(request->reader_);
280   request->reader_ = new net::SelectableFilereader(selector_);
281
282   request->wrapped_ = wrapped;
283   if (wrapped) {
284     request->processor_ = new StreamFrameProcessor();
285   } else {
286     request->processor_ = new StreamCopyProcessor();
287   }
288
289   if (!request->reader_->InitializeFd(
290         fd,
291         NewPermanentCallback(request, &StreamRequest::ReaderProcess_),
292         NewCallback(request, &StreamRequest::ReaderClose_)) ) {
293     ::close(fd);
294
295     delete request->reader_;
296     request->reader_ = NULL;
297     delete request->processor_;
298     request->processor_ = NULL;
299
300     request->Fail(http::INTERNAL_SERVER_ERROR);
301     return false;
302   }
303
304   request->Start();
305   return true;
306 }
307
308 void StreamManager::Run() {
309   running_ = true;
310   LoadConfig();
311
312   stream_server_ = new StreamServer(this, http_server_, provider_media_root_.c_str());
313
314   http_server_->AddAcceptor(http_net_protocol_,
315                             net::HostPort(0, http_server_port_));
316   selector_->RunInSelectLoop(
317     NewCallback(http_server_, &http::Server::StartServing));
318
319   selector_->Loop();
320   CHECK(!running_);
321
322   CHECK(stream_rpc_.empty());
323   CHECK(stream_providers_.empty());
324   CHECK(authenticators_.empty());
325   CHECK(authenticators_users_.empty());
326 }
327
328 void StreamManager::Stop() {
329   running_ = false;
330   if (!CheckTerminate()) {
331     LOG_INFO << "Stopping the published streams...";
332     for ( PosterMap::iterator poster = published_.begin();
333          poster != published_.end(); ++poster ) {
334       poster->second->Stop();
335     }
336     LOG_INFO << "Stopping the providers...";
337     for ( ProviderMap::iterator provider = stream_providers_.begin();
338           provider != stream_providers_.end(); ++provider ) {
339
340       RemoveRPC(provider->second);
341       provider->second->Stop();
342     }
343   }
344
345   http_server_->StopServing();
346
347   delete stream_server_;
348   stream_server_ = NULL;
349
350   // The manager will asynchronously stop and shutdown when
351   // all the providers/posts are terminated...
352 }
353
354 bool StreamManager::CheckTerminate() {
355   if (!running_) {
356     if ( stream_providers_.empty() &&
357          stream_providers_removed_.empty() &&
358          published_.empty() ) {
359       LOG_INFO << "No providers or publishing requests are running, "
360                << "stopping selector";
361       selector_->RunInSelectLoop(
362         NewCallback(selector_, &net::Selector::MakeLoopExit));
363       return true;
364     }
365   }
366   return false;
367 }
368
369 void StreamManager::ProviderDone(StreamProvider* provider) {
370   // make sure we're removed...
371   RemoveRPC(provider);
372
373   AuthenticatorMap::iterator it_auth =
374       authenticators_.find(provider->name());
375   if ( it_auth != authenticators_.end() ) {
376     delete it_auth->second;
377     authenticators_.erase(it_auth);
378   }
379   AuthenticatorUserMap::iterator it_auth_user =
380       authenticators_users_.find(provider->name());
381   if ( it_auth_user != authenticators_users_.end() ) {
382     delete it_auth_user->second;
383     authenticators_users_.erase(it_auth_user);
384   }
385
386   stream_providers_.erase(provider->name());
387   const ProviderSet::iterator it = stream_providers_removed_.find(provider);
388   if ( it != stream_providers_removed_.end() ) {
389     provider->DeleteState();
390     stream_providers_removed_.erase(it);
391   }
392
393   LOG_INFO << "Provider '" << provider->name() << "' is done.";
394   delete provider;
395   CheckTerminate();
396 }
397
398 void StreamManager::PosterDone(StreamPoster* poster) {
399   published_.erase(poster->url().spec());
400   LOG_INFO << "Stopped publishing to [" << poster->url() << "].";
401
402   CheckTerminate();
403 }
404
405 void StreamManager::RemoveRPC(StreamProvider* provider) {
406   // remove RPC
407   RpcMap::iterator rpc = stream_rpc_.find(provider);
408   if (rpc != stream_rpc_.end()) {
409     CHECK_NOT_NULL(rpc_server_);
410     rpc_server_->UnregisterService(
411       (std::string(provider->name()) + "/"), provider);
412     stream_rpc_.erase(rpc);
413   }
414 }
415
416 bool StreamManager::LoadConfig() {
417   if (config_file_.empty()) {
418     LOG_ERROR << "Config file was not set, state loading failed...";
419     return false;
420   }
421   string config_str;
422   if ( !io::FileInputStream::TryReadFile(config_file_.c_str(),
423                                          &config_str) ) {
424     LOG_ERROR << "Cannot open the config file [" << config_file_ << "]";
425     return false;
426   }
427   io::MemoryStream iomis;
428   iomis.Write(config_str);
429   rpc::JsonDecoder decoder(iomis);
430
431   bool result = true;
432   rpc::Array< ProviderSpec > providers;
433
434   if (decoder.Decode(providers) == rpc::DECODE_RESULT_SUCCESS) {
435     for (uint32 i = 0; i < providers.Size(); ++i) {
436       MediaServerResult ret;
437       if (!AddProvider(&providers.Get(i), ret)) {
438         LOG_ERROR
439           << "Couldn't add a provider while loading the configuration from ["
440           << config_file_ << "], error: \""
441           << ret.description_.Get().StdStr() << "\"...";
442       }
443     }
444   } else {
445     LOG_ERROR << "Couldn't load the providers from ["
446               << config_file_ << "]...";
447     result = false;
448   }
449
450   rpc::Array< PublishSpec > published;
451   if (decoder.Decode(published) == rpc::DECODE_RESULT_SUCCESS) {
452     for (uint32 i = 0; i < published.Size(); ++i) {
453       MediaServerResult ret;
454       if (!AddPublished(&published.Get(i), ret)) {
455         LOG_ERROR
456           << "Couldn't publish a stream while loading the configuration from ["
457           << config_file_ << "], error: \""
458           << ret.description_.Get().StdStr() << "\"...";
459       }
460     }
461   } else {
462     LOG_ERROR << "Couldn't load the published streams from ["
463               << config_file_ << "]...";
464     result = false;
465   }
466   return result;
467 }
468 // Saves the state into the config file.
469 bool StreamManager::SaveConfig() {
470   if (config_file_.empty()) {
471     LOG_ERROR << "Config file was not set, state saving failed...";
472     return false;
473   }
474
475   const string tmp_file(config_file_ + "__temp__");
476
477   io::File* const f = new io::File();
478   if (!f->Open(tmp_file.c_str(),
479                 io::File::GENERIC_READ_WRITE,
480                 io::File::CREATE_ALWAYS)) {
481     LOG_ERROR << "Cannot open the temporary config file ["
482               << tmp_file << "] for writing...";
483     delete f;
484     return false;
485   }
486
487   {
488     io::FileOutputStream fos(f);
489     {
490       rpc::Array< ProviderSpec > providers;
491       ListProviders(providers);
492       fos.WriteString(rpc::JsonEncoder::EncodeObject(providers));
493     }
494     {
495       rpc::Array< PublishSpec > published;
496       ListPublished(published);
497       fos.WriteString(rpc::JsonEncoder::EncodeObject(published));
498     }
499   }
500
501   if (rename(tmp_file.c_str(), config_file_.c_str())) {
502     LOG_ERROR << "Cannot move the temporary config file ["
503               << tmp_file << "] over the configuration file ["
504               << config_file_ << "]...";
505     return false;
506   }
507   return true;
508 }
509
510 void StreamManager::ProcessMessageCallback(
511   media::StreamProvider* provider) {
512   DLOG_DEBUG << "Iterating message for '" << provider->name() << "'.";
513   if (!provider->Iterate()) {
514     ProviderDone(provider);
515   }
516 }
517 void StreamManager::PostMessageCallback(
518   media::StreamProvider* provider,
519   Message* message) {
520   DLOG_DEBUG << "Scheduling message iteration for '"
521              << provider->name() << "' - " << message->m_kind << ".";
522   selector_->RunInSelectLoop(
523     NewCallback(this, &StreamManager::ProcessMessageCallback, provider));
524 }
525
526 bool StreamManager::AddProvider(const ProviderSpec* spec,
527                                 MediaServerResult& ret) {
528   if (!running_) {
529     ret.result_.Ref() = -1;
530     ret.description_.Ref() = "Cannot add a provider when the server is not running.";
531     return false;
532   }
533   if (spec->name_.Get().StdStr().empty() ||
534       spec->description_.Get().StdStr().empty()) {
535     ret.result_.Ref() = -4;
536     ret.description_.Ref() = "Both the name and the description must be provided.";
537     return false;
538   }
539
540   if (stream_providers_.find(spec->name_.Get().StdStr()) !=
541     stream_providers_.end()) {
542     ret.result_.Ref() = -3;
543     ret.description_.Ref() = "The provider already exists.";
544     return false;
545   }
546
547   // convert the controlled element to what we need
548   vector<string> controlled_elements;
549   if (spec->controlled_elements_.IsSet()) {
550     for (uint32 i = 0; i < spec->controlled_elements_.Get().Size(); i++)
551     controlled_elements.push_back(
552         spec->controlled_elements_.Get().Get(i).StdStr());
553   }
554   if ( spec->access_data_.IsSet() && spec->access_data_.Get().Size() > 0 ) {
555     net::SimpleUserAuthenticator*
556         user_authenticator = new net::SimpleUserAuthenticator(
557             spec->name_.Get().StdStr());
558     map<string, string>* const users = new map<string, string>;
559     for ( int i = 0; i < spec->access_data_.Get().Size(); ++i ) {
560       user_authenticator->set_user_password(
561           spec->access_data_.Get().Get(i).remote_user_.Get().StdStr(),
562           spec->access_data_.Get().Get(i).remote_password_.Get().StdStr());
563       users->insert(
564           make_pair(
565               spec->access_data_.Get().Get(i).remote_user_.Get().StdStr(),
566               spec->access_data_.Get().Get(i).remote_password_.Get().StdStr()));
567     }
568     authenticators_.insert(make_pair(spec->name_.Get().StdStr(),
569                                      user_authenticator));
570     authenticators_users_.insert(make_pair(spec->name_.Get().StdStr(),
571                                            users));
572   }
573
574   // perform
575   AddProvider(
576     new StreamProvider(
577       // name
578       spec->name_.Get().StdStr().c_str(),
579       // description
580       spec->description_.Get().StdStr().c_str(),
581       // audio ports
582       spec->audio_ports_.IsSet() ?
583       spec->audio_ports_.Get().Get() : 1,
584       // video ports
585       spec->video_ports_.IsSet() ?
586       spec->video_ports_.Get().Get() : 1,
587       // mime type
588       spec->mime_type_.IsSet() ?
589       spec->mime_type_.Get().StdStr().c_str() : "",
590       // source fireup timeout
591       spec->source_fireup_timeout_.IsSet() ?
592       spec->source_fireup_timeout_.Get().Get() : INT_MIN,
593       // encoder fireup timeout
594       spec->encoder_fireup_timeout_.IsSet() ?
595       spec->encoder_fireup_timeout_.Get().Get() : INT_MIN,
596       // source idle timeout
597       spec->source_idle_timeout_.IsSet() ?
598       spec->source_idle_timeout_.Get().Get() : INT_MIN,
599       // encoder idle timeout
600       spec->encoder_idle_timeout_.IsSet() ?
601       spec->encoder_idle_timeout_.Get().Get() : INT_MIN,
602       // maximum acceptable lag
603       spec->max_lag_.IsSet() ?
604       spec->max_lag_.Get().Get() : INT_MIN,
605       &controlled_elements,
606       state_keeper_));
607   ret.result_.Ref() = 0;
608   return true;
609 }
610
611 void StreamManager::ListProviders(rpc::Array<ProviderSpec>& providers) {
612   for ( ProviderMap::iterator provider = stream_providers_.begin();
613        provider != stream_providers_.end(); ++provider ) {
614     ProviderSpec spec;
615
616     spec.name_.Ref() = provider->first;
617     spec.description_.Ref() = provider->second->source_pipeline_description();
618
619     if (provider->second->source_mime_type() != "")
620       spec.mime_type_.Ref() = provider->second->source_mime_type();
621
622     if (provider->second->source_fireup_timeout() != INT_MIN)
623       spec.source_fireup_timeout_.Ref() =
624         provider->second->source_fireup_timeout();
625     if (provider->second->encoder_fireup_timeout() != INT_MIN)
626       spec.encoder_fireup_timeout_.Ref() =
627         provider->second->encoder_fireup_timeout();
628
629     if (provider->second->source_idle_timeout() != INT_MIN)
630       spec.source_idle_timeout_.Ref() =
631         provider->second->source_idle_timeout();
632     if (provider->second->encoder_idle_timeout() != INT_MIN)
633       spec.encoder_idle_timeout_.Ref() =
634         provider->second->encoder_idle_timeout();
635
636     if (provider->second->max_lag() != INT_MIN)
637       spec.max_lag_.Ref() =
638         provider->second->max_lag();
639
640     const media::StreamProvider::ControllerMap&
641       controllers = provider->second->controllers();
642     if (!controllers.empty()) {
643       rpc::Array<rpc::String>& c = spec.controlled_elements_.Ref();
644       for ( media::StreamProvider::ControllerMap::const_iterator
645               it = controllers.begin(); it != controllers.end(); ++it ) {
646         c.Append(it->first);
647       }
648     }
649     const AuthenticatorUserMap::const_iterator
650         it_auth = authenticators_users_.find(provider->second->name());
651     if ( it_auth != authenticators_users_.end() ) {
652       rpc::Array<WhispererSpecAccessStruct>& acc = spec.access_data_.Ref();
653       for ( map<string, string>::const_iterator it_user = it_auth->second->begin();
654             it_user != it_auth->second->end(); ++it_user ) {
655         acc.Append(WhispererSpecAccessStruct(it_user->first, it_user->second));
656       }
657     }
658     providers.PushBack(spec);
659   }
660 }
661
662 bool StreamManager::AddPublished(
663   const PublishSpec* spec,
664   MediaServerResult& ret) {
665   if ( !running_ ) {
666     ret.result_.Ref() = -1;
667     ret.description_.Ref() = "Cannot publish when the server is not running.";
668     return false;
669   }
670   if (spec->url_.Get().StdStr().empty()) {
671     ret.result_.Ref() = -4;
672     ret.description_.Ref() = "The URL to publish to must be provided.";
673     return false;
674   }
675   URL url(spec->url_.Get().StdStr());
676   if (!url.is_valid()) {
677     ret.result_.Ref() = -4;
678     ret.description_.Ref() = "The URL to publish to is invalid.";
679     return false;
680   }
681   if (published_.find(url.spec()) !=
682     published_.end()) {
683     ret.result_.Ref() = -3;
684     ret.description_.Ref() = "Publishing to the given URL is already running.";
685     return false;
686   }
687   const string user(spec->user_.IsSet()
688                     ? spec->user_.Get().remote_user_.Get().CStr() : "" );
689   const string pass(spec->user_.IsSet()
690                     ? spec->user_.Get().remote_password_.Get().CStr() : "" );
691   if ( !Publish(url,  // encoder fireup timeout
692                 spec->retry_timeout_.IsSet() ?
693                 spec->retry_timeout_.Get().Get() : INT_MIN,
694                 user, pass) ) {
695     ret.result_.Ref() = -4;
696     ret.description_.Ref() = ("Couldn't publish to the given URL "
697                               "(probably the URL is invalid).");
698     return false;
699   }
700   ret.result_.Ref() = 0;
701   return true;
702 }
703
704 void StreamManager::ListPublished(rpc::Array<PublishSpec>& published) {
705   for ( PosterMap::iterator poster = published_.begin();
706         poster != published_.end(); ++poster ) {
707     PublishSpec spec;
708     spec.url_.Ref() = poster->first;
709     if (poster->second->retry_timeout() != INT_MIN) {
710       spec.retry_timeout_.Ref() =
711         poster->second->retry_timeout();
712     }
713     if ( !poster->second->user().empty() || !poster->second->pass().empty() ) {
714       spec.user_.Ref() = WhispererSpecAccessStruct(poster->second->user(),
715                                                    poster->second->pass());
716     }
717
718     published.PushBack(spec);
719   }
720 }
721
722 // Implementation of ServiceInvokerMediaProviderService
723 void StreamManager::AddProvider(
724   rpc::CallContext<MediaServerResult>* call,
725   const ProviderSpec* spec) {
726   MediaServerResult ret;
727   AddProvider(spec, ret);
728   call->Complete(ret);
729 }
730
731 void StreamManager::RemoveProvider(
732     rpc::CallContext< MediaServerResult >* call,
733     const rpc::String* name) {
734   MediaServerResult ret;
735   if ( !running_ ) {
736     ret.result_.Ref() = -1;
737     ret.description_.Ref() =
738       "Cannot remove a provider while the server is not running.";
739     call->Complete(ret);
740     return;
741   }
742   if ( stream_providers_.find(name->StdStr()) == stream_providers_.end() ) {
743     ret.result_.Ref() = -2;
744     ret.description_.Ref() = "The provider does not exist.";
745     call->Complete(ret);
746     return;
747   }
748   RemoveProvider(name->StdStr().c_str());
749   ret.result_.Ref() = 0;
750   call->Complete(ret);
751 }
752
753 void StreamManager::ListProviders(
754   rpc::CallContext< rpc::Array< ProviderSpec > >* call) {
755   rpc::Array< ProviderSpec > ret;
756   ListProviders(ret);
757   call->Complete(ret);
758 }
759 void StreamManager::Publish(
760   rpc::CallContext<MediaServerResult>* call, const PublishSpec* spec) {
761   MediaServerResult ret;
762   AddPublished(spec, ret);
763   call->Complete(ret);
764 }
765 void StreamManager::Unpublish(
766   rpc::CallContext<MediaServerResult>* call, const rpc::String* url) {
767   MediaServerResult ret;
768   if (!running_) {
769     ret.result_.Ref() = -1;
770     ret.description_.Ref() =
771         "Cannot unpublish while the server is not running.";
772     call->Complete(ret);
773     return;
774   }
775   URL gurl(url->StdStr());
776   if (!gurl.is_valid()) {
777     ret.result_.Ref() = -4;
778     ret.description_.Ref() = "The URL to stop publishing to is invalid.";
779     return;
780   }
781   if (published_.find(gurl.spec()) == published_.end()) {
782     ret.result_.Ref() = -2;
783     ret.description_.Ref() =
784       "There is no publishing performed to the given URL.";
785     call->Complete(ret);
786     return;
787   }
788   Unpublish(gurl);
789   ret.result_.Ref() = 0;
790   call->Complete(ret);
791 }
792
793 void StreamManager::ListPublished(
794     rpc::CallContext< rpc::Array< PublishSpec > >* call) {
795   rpc::Array< PublishSpec > ret;
796   ListPublished(ret);
797   call->Complete(ret);
798 }
799
800 void StreamManager::SaveConfig(rpc::CallContext<rpc::Bool>* call) {
801   call->Complete(SaveConfig());
802 }
Note: See TracBrowser for help on using the browser.