root/trunk/whispercast/stream_server.cc

Revision 7, 26.0 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Catalin Popescu
31 //
32 // This is the main file of whispercast server..
33 //
34
35 #include <whisperlib/common/app/app.h>
36
37 #include <whisperlib/common/base/types.h>
38 #include <whisperlib/common/base/log.h>
39 #include <whisperlib/common/base/system.h>
40 #include <whisperlib/common/base/gflags.h>
41 #include <whisperlib/common/base/errno.h>
42 #include <whisperlib/common/base/strutil.h>
43 #include <whisperlib/common/sync/thread.h>
44 #include <whisperlib/common/base/callback.h>
45
46 #include <whisperlib/common/io/file/file_input_stream.h>
47 #include <whisperlib/net/base/selector.h>
48
49 #include <whisperlib/net/rpc/lib/server/rpc_http_server.h>
50
51 #include <whisperstreamlib/stats2/stats_collector.h>
52 #include <whisperstreamlib/stats2/log_stats_saver.h>
53 #include <whisperstreamlib/base/element.h>
54 #include <whisperlib/net/util/ipclassifier.h>
55
56 #include <whisperlib/net/base/user_authenticator.h>
57 #include <whisperlib/net/http/http_server_protocol.h>
58
59 #include <whisperstreamlib/rtmp/rtmp_connection.h>
60 #include <whisperstreamlib/elements/factory.h>
61
62 #include <whisperlib/common/io/ioutil.h>
63 #include "media_mapper.h"
64 #include "stream_request.h"
65
66 //////////////////////////////////////////////////////////////////////
67
68 DEFINE_int32(http_port,
69              8080,
70              "The port on which we accept HTTP connections");
71 DEFINE_string(http_local_address,
72               "0.0.0.0",
73               "The local address to bind to when accepting HTTP connections");
74 DEFINE_int32(https_port,
75              8443,
76              "The port on which we accept HTTPS connections");
77 DEFINE_string(https_local_address,
78               "0.0.0.0",
79               "The local address to bind to when accepting HTTPS connections");
80 DEFINE_int32(http_connection_max_media_outbuf_size,
81              65536 + 65536 + 16384,
82              "Sending buffer per HTTP connection for all media data");
83 DEFINE_int32(http_connection_read_timeout,
84              20000,
85              "Timeout for reading the HTTP client request");
86 DEFINE_int32(http_connection_write_timeout,
87              20000,
88              "Timeout for a HTTP client connection before disconnecting");
89 DEFINE_int64(http_connection_write_ahead_ms,
90              4000,
91              "How much HTTP data to write ahead of the element time, "
92              "in miliseconds");
93 DEFINE_bool(http_connection_dlog_level,
94             false,
95             "Turn on deep debugging messages on HTTP connections");
96 DEFINE_int32(http_max_num_connections,
97              1000,
98              "Accept at most these many concurrent HTTP connections");
99
100 DEFINE_string(http_default_content_type,
101               "text/html",
102               "We send this content type when we don't know what "
103               "to send");
104 DEFINE_string(http_extensions_to_content_types_file,
105              "",
106              "If set we read mappings from extensions to "
107              "content types from here");
108 DEFINE_string(http_ssl_certificate,
109               "",
110               "Path to an SSL certificate file. We need both certificate & key"
111               " to use HTTP over SSL; else we use regular HTTP over TCP");
112 DEFINE_string(http_ssl_key,
113               "",
114               "Path to an SSL key file. We need both certificate & key"
115               " to use HTTP over SSL; else we use regular HTTP over TCP");
116
117 //////////////////////////////////////////////////////////////////////
118
119 DEFINE_int32(rtmp_port,
120              1935,
121              "The port on which we accept RTMP connections");
122 DEFINE_string(rtmp_local_address,
123               "0.0.0.0",
124               "The local address to bind to when accepting RTMP connections");
125 DEFINE_string(rtmp_publishing_application,
126               "live",
127               "If this is set we allow client publishing under "
128               "this rtmp application");
129
130 //////////////////////////////////////////////////////////////////////
131
132 DEFINE_string(ip_classifiers,
133               "",
134               "Comma sepparated list of classifiers - first class 0, etc");
135
136 //////////////////////////////////////////////////////////////////////
137
138
139 DEFINE_string(log_stats_dir,
140               "",
141               "The directory where the stats log files will be written.");
142 DEFINE_string(log_stats_file_base,
143               "",
144               "The base name for stats log files.");
145
146 //////////////////////////////////////////////////////////////////////
147
148 DEFINE_string(server_id,
149               "",
150               "The ID of the server, as it will be logged");
151
152 //////////////////////////////////////////////////////////////////////
153
154 DEFINE_string(media_config_dir,
155               "",
156               "Where to load / save the configs");
157 DEFINE_string(base_media_dir,
158               "",
159               "Media is under this directory");
160 DEFINE_string(media_path,
161               "/media",
162               "We export our media elements under this path for http");
163 DEFINE_string(media_state_dir,
164               "",
165               "Where we may save our state");
166 DEFINE_string(media_state_name,
167               "whispercast",
168               "We save the state under this name");
169 DEFINE_string(local_media_state_name,
170               "local_whispercast",
171               "We save non config state under this name");
172
173 //////////////////////////////////////////////////////////////////////
174
175 DEFINE_string(admin_passwd,
176               "",
177               "An admin password for server administration - put this in a flag file");
178 DEFINE_string(authorization_realm,
179               "",
180               "If you set an admin password, you also need this");
181 DEFINE_string(rpc_config_allow_classifier,
182               "",
183               "If not empty we allow config rpc-s only from guys who "
184               "qualify in the classifier created by this IpClassifier (e.g. "
185               "'OR(IPFILTER(201.1.2.240/26), "
186               "IPFILTERFILE(/etc/whispercast/good_rpc_ips))' )");
187 DEFINE_string(rpc_stats_allow_classifier,
188               "",
189               "If not empty we allow stat rpc-s only from guys who qualify "
190               "in the classifier created by this IpClassifier (e.g. "
191               "'OR(IPFILTER(201.1.2.240/26), "
192               "IPFILTERFILE(/etc/whispercast/good_rpc_ips))' )");
193
194 DECLARE_int32(rtmp_max_num_connections);
195
196 //////////////////////////////////////////////////////////////////////
197
198 // We don't want to get crawled - disable all robots
199 void RobotsProcessor(http::ServerRequest* req) {
200   req->request()->server_header()->AddField(
201     http::kHeaderContentType, "text/plain", true);
202   req->request()->server_data()->Write(
203     "User-agent: *\n"
204     "Disallow: /\n");
205   req->Reply();
206 }
207
208 void QuickStats(
209   streaming::StatsCollector* collector,
210   http::ServerRequest* req) {
211   req->request()->server_header()->AddField(
212     http::kHeaderContentType, "text/plain", true);
213   // TODO [cpopescu]:
214   req->Reply();
215 }
216
217 string DecodeAdminPasswd(const string& s) {
218   const string tmp = URL::UrlUnescape(s);
219   string ret;
220   for ( int i = 0; i < tmp.size(); ++i ) {
221     ret += int8(tmp[i]) ^ 0xb6;
222   }
223   DLOG_INFO << " Decoded passwd: [" << ret << "]";
224   return ret;
225 }
226
227 //////////////////////////////////////////////////////////////////////
228
229 class Whispercast : public app::App {
230 public:
231   Whispercast(int &argc, char **&argv);
232   virtual ~Whispercast();
233
234 protected:
235   int Initialize();
236
237   void Run() {
238     selector_->Loop();
239   }
240   void Stop() {
241     selector_->RunInSelectLoop(NewCallback(selector_,
242                                            &net::Selector::MakeLoopExit));
243   }
244   void Shutdown();
245
246   void ProcessMediaRequest(http::ServerRequest* req);
247   void ProcessAuthorizedMediaRequest(
248       http::ServerRequest* req,
249       streaming::Request* mreq,
250       streaming::Authorizer* auth);
251
252  private:
253   net::Selector* selector_;
254   MediaMapper* media_mapper_;
255
256   SSL_CTX* http_ssl_ctx_;
257   net::NetFactory* http_net_factory_;
258   http::Server* http_server_;
259   http::ServerParams* http_server_params_;
260
261   rtmp::StandardStreamManager* rtmp_stream_manager_;
262   rtmp::ServerAcceptor* rtmp_server_;
263   rtmp::ProtocolFlags rtmp_flags_;
264
265   net::SimpleUserAuthenticator* admin_authenticator_;
266   rpc::HttpServer* rpc_stat_processor_;
267   rpc::HttpServer* rpc_processor_;
268
269   //////////////////////////////////////////////////////////////////////
270
271   streaming::StatsCollector* stats_collector_;
272
273   map<string, string> ext2ct_;   // maps extensions to content type
274
275   vector<const net::IpClassifier*>* classifiers_;
276   int64 connection_id_;
277 };
278
279 //////////////////////////////////////////////////////////////////////
280
281 Whispercast::Whispercast(int &argc, char **&argv)
282   : app::App(argc, argv),
283     selector_(NULL),
284     media_mapper_(NULL),
285     http_ssl_ctx_(NULL),
286     http_net_factory_(NULL),
287     http_server_(NULL),
288     http_server_params_(NULL),
289     rtmp_stream_manager_(NULL),
290     rtmp_server_(NULL),
291     admin_authenticator_(NULL),
292     rpc_stat_processor_(NULL),
293     rpc_processor_(NULL),
294     stats_collector_(NULL),
295     classifiers_(NULL),
296     connection_id_(0) {
297   google::SetUsageMessage("Whispercast - stream broadcasting server.");
298 }
299
300 Whispercast::~Whispercast() {
301   CHECK_NULL(selector_);
302   CHECK_NULL(media_mapper_);
303   CHECK_NULL(http_ssl_ctx_);
304   CHECK_NULL(http_net_factory_);
305   CHECK_NULL(http_server_);
306   CHECK_NULL(http_server_params_);
307   CHECK_NULL(rtmp_stream_manager_);
308   CHECK_NULL(rtmp_server_);
309   CHECK_NULL(rpc_stat_processor_);
310   CHECK_NULL(rpc_processor_);
311   CHECK_NULL(stats_collector_);
312   CHECK_NULL(classifiers_);
313 }
314
315
316 int Whispercast::Initialize() {
317   common::Init(argc_, argv_);
318
319   //////////////////////////////////////////////////////////////////////
320
321   selector_ = new net::Selector();
322
323   //////////////////////////////////////////////////////////////////////
324
325   // Create the classifiers
326   if ( !FLAGS_ip_classifiers.empty() ) {
327       classifiers_ = new vector<const net::IpClassifier*>();
328       vector<string> components;
329       CHECK(strutil::SplitBracketedString(FLAGS_ip_classifiers.c_str(),
330                                           ',', '(', ')',  &components))
331                                             << " Invalid classifier spec: ["
332                                             << FLAGS_ip_classifiers << "]";
333       for ( int i = 0; i < components.size(); ++i ) {
334         classifiers_->push_back(
335           net::IpClassifier::CreateClassifier(components[i]));
336       }
337     }
338
339   //////////////////////////////////////////////////////////////////////
340
341   // Initialize the stats collection
342   vector<streaming::StatsSaver*> stats_savers;
343   if(FLAGS_log_stats_dir != "") {
344     stats_savers.push_back(new streaming::LogStatsSaver(
345                                 FLAGS_log_stats_dir.c_str(),
346                                 FLAGS_log_stats_file_base.c_str()));
347   }
348   stats_collector_ = new streaming::StatsCollector(
349       selector_, FLAGS_server_id, timer::Date::Now(), &stats_savers);
350   CHECK(stats_collector_->Start());
351
352   //////////////////////////////////////////////////////////////////////
353
354   // Create the HTTP server
355
356   // TODO : add http_connection_write_ahead
357   http_net_factory_ = new net::NetFactory(selector_);
358   if ( FLAGS_http_ssl_certificate != "" &&
359        FLAGS_http_ssl_key != "" ) {
360     http_ssl_ctx_ = net::SslConnection::SslCreateContext(
361         FLAGS_http_ssl_certificate, FLAGS_http_ssl_key);
362     if ( http_ssl_ctx_ != NULL ) {
363       net::SslConnectionParams ssl_connection_params;
364       ssl_connection_params.ssl_context_ = http_ssl_ctx_;
365       net::SslAcceptorParams ssl_acceptor_params(ssl_connection_params);
366       http_net_factory_->SetSslParams(ssl_acceptor_params,
367                                       ssl_connection_params);
368     }
369   }
370
371   http_server_params_ = new http::ServerParams();
372   http::ServerParams* const p = http_server_params_;  // shortcut
373   p->max_reply_buffer_size_ = FLAGS_http_connection_max_media_outbuf_size;
374   p->reply_write_timeout_ms_ = FLAGS_http_connection_write_timeout;
375   p->request_read_timeout_ms_ = FLAGS_http_connection_read_timeout;
376   p->dlog_level_ = FLAGS_http_connection_dlog_level;
377   p->max_concurrent_connections_ = FLAGS_http_max_num_connections;
378   // Is essential to have the keep-alive active as our rpc-s should keep-alive
379   // The loss for streaming is not big deal..
380   //
381   // p->keep_alive_timeout_sec_ = 0;  // no keep-alive for us
382
383   http_server_ = new http::Server(
384     "Whispercast 0.03",
385     selector_,
386     http_net_factory_,
387     http_server_params_,
388     NewPermanentCallback(&http::SimpleServerConnectionFactory),
389     0);  // no thread pool -- all processing in thread :)
390
391   http_server_->RegisterProcessor(FLAGS_media_path,
392                                   NewPermanentCallback(
393                                       this, &Whispercast::ProcessMediaRequest),
394                                   true);
395
396   //////////////////////////////////////////////////////////////////////
397
398   // Add RPC statistics mapping to HTTP server
399
400   if ( !FLAGS_authorization_realm.empty() ) {
401     admin_authenticator_ =
402         new net::SimpleUserAuthenticator(FLAGS_authorization_realm);
403     admin_authenticator_->set_user_password(string("admin"), FLAGS_admin_passwd);
404   } else {
405     admin_authenticator_ = NULL;
406   }
407   rpc_stat_processor_ = new rpc::HttpServer(selector_,
408                                             http_server_,
409                                             NULL,  // authenticator_,
410                                             "/rpc-stats", true,
411                                             true, 10,
412                                             FLAGS_rpc_stats_allow_classifier);
413   rpc_processor_ = new rpc::HttpServer(selector_,
414                                        http_server_,
415                                        admin_authenticator_,
416                                        "/rpc-config", true,
417                                        true, 10,
418                                        FLAGS_rpc_config_allow_classifier);
419
420   rtmp_stream_manager_ = new rtmp::StandardStreamManager(
421       selector_,
422       NULL,  // will set later..
423       stats_collector_,
424       FLAGS_rtmp_publishing_application);
425
426   //////////////////////////////////////////////////////////////////////
427
428   CHECK(!FLAGS_media_config_dir.empty() &&
429         io::IsDir(FLAGS_media_config_dir.c_str()))
430     << " [" << FLAGS_media_config_dir << "]";
431   CHECK(FLAGS_base_media_dir.empty() ||
432         io::IsDir(FLAGS_base_media_dir.c_str()))
433     << " [" << FLAGS_base_media_dir << "]";
434   media_mapper_ = new MediaMapper(selector_,
435                                   http_server_,
436                                   rpc_processor_,
437                                   rtmp_stream_manager_,
438                                   admin_authenticator_,
439                                   FLAGS_media_config_dir,
440                                   FLAGS_base_media_dir,
441                                   FLAGS_media_state_dir,
442                                   FLAGS_media_state_name,
443                                   FLAGS_local_media_state_name);
444   const MediaMapper::LoadError load_err = media_mapper_->Load();
445   if ( load_err != MediaMapper::LOAD_OK ) {
446     // TODO email
447     LOG_ERROR << "\n=========================== BAD ERROR: ==================="
448               << "\nSome error loading config file - this may render some "
449               << "elements unavailable!"
450               << "\n=========================================================";
451     CHECK_EQ(load_err, MediaMapper::LOAD_FILE_ERROR)
452       << " We encountered some data consistency error loading configuration "
453       << " - failing !!";
454   }
455   rtmp_stream_manager_->set_element_mapper(media_mapper_->mapper());
456
457   //////////////////////////////////////////////////////////////////////
458
459   // TODO: make private
460   CHECK(rpc_processor_->RegisterService(media_mapper_));
461   CHECK(rpc_stat_processor_->RegisterService(stats_collector_));
462
463   //////////////////////////////////////////////////////////////////////
464
465   // TODO: Add Stream elements
466   //       Add exported streams
467
468   //////////////////////////////////////////////////////////////////////
469
470   // Create the RTMP server
471   rtmp::GetProtocolFlags(&rtmp_flags_);
472   rtmp_server_ = new rtmp::ServerAcceptor(selector_,
473                                           rtmp_stream_manager_,
474                                           "rtmp",
475                                           stats_collector_,
476                                           classifiers_,
477                                           &rtmp_flags_);
478
479   //////////////////////////////////////////////////////////////////////
480
481   // Register various path mappings
482
483   // Register robots porcessor
484   http_server_->RegisterProcessor(
485     "/robots.txt", NewPermanentCallback(&RobotsProcessor), true);
486   // Register stats processor
487   http_server_->RegisterProcessor(
488     "/__stats__", NewPermanentCallback(QuickStats,  stats_collector_),
489     true);  // public -> todo make private ..
490
491   //////////////////////////////////////////////////////////////////////
492
493   if ( !FLAGS_http_extensions_to_content_types_file.empty() ) {
494     const string s = io::FileInputStream::ReadFileOrDie(
495         FLAGS_http_extensions_to_content_types_file.c_str());
496     vector< pair<string, string> > exts;
497     strutil::SplitPairs(s, "\n",  ":", &exts);
498     for ( int i = 0; i < exts.size(); ++i ) {
499       ext2ct_[strutil::StrTrim(exts[i].first)] =
500           strutil::StrTrim(exts[i].second);
501     }
502   }
503
504   // Start http serving
505   if ( FLAGS_http_port != 0 ) {
506     http_server_->AddAcceptor(net::PROTOCOL_TCP,
507                               net::HostPort(FLAGS_http_local_address.c_str(),
508                                             FLAGS_http_port));
509   }
510   if ( FLAGS_https_port != 0 && http_ssl_ctx_ != NULL ) {
511     http_server_->AddAcceptor(net::PROTOCOL_SSL,
512                               net::HostPort(FLAGS_https_local_address.c_str(),
513                                             FLAGS_https_port));
514   }
515   selector_->RunInSelectLoop(NewCallback(http_server_,
516                                          &http::Server::StartServing));
517   // Start rtmp serving
518   selector_->RunInSelectLoop(NewCallback(rtmp_server_,
519                                          &rtmp::ServerAcceptor::StartServing,
520                                          FLAGS_rtmp_port,
521                                          FLAGS_rtmp_local_address.c_str()));
522
523   return 0;
524 }
525
526 void Whispercast::Shutdown() {
527   //////////////////////////////////////////////////////////////////////
528
529   delete media_mapper_;
530   media_mapper_ = NULL;
531
532   LOG_INFO << " Media mapper deleted !";
533
534   //////////////////////////////////////////////////////////////////////
535
536
537   stats_collector_->Stop();
538   delete stats_collector_;
539   stats_collector_ = NULL;
540
541   //////////////////////////////////////////////////////////////////////
542
543   // Delete the classifiers
544   if ( classifiers_ != NULL ) {
545     for ( int i = 0; i < classifiers_->size(); ++i ) {
546       delete (*classifiers_)[i];
547     }
548     delete classifiers_;
549     classifiers_ = NULL;
550   }
551
552   // Delete the HTTP server
553   delete http_server_;
554   http_server_ = NULL;
555   delete http_server_params_;
556   http_server_params_ = NULL;
557   delete http_net_factory_;
558   http_net_factory_ = NULL;
559   net::SslConnection::SslDeleteContext(http_ssl_ctx_);
560   http_ssl_ctx_ = NULL;
561
562   //////////////////////////////////////////////////////////////////////
563
564   // Close the RTMP server
565
566   delete rtmp_stream_manager_;
567   rtmp_stream_manager_ = NULL;
568   rtmp_server_ = NULL;  // auto-deleted...
569
570   //////////////////////////////////////////////////////////////////////
571
572   // Stop RPC
573   delete rpc_processor_;
574   rpc_processor_ = NULL;
575   delete rpc_stat_processor_;
576   rpc_stat_processor_ = NULL;
577   delete admin_authenticator_;
578   admin_authenticator_ = NULL;
579
580   //////////////////////////////////////////////////////////////////////
581
582   delete selector_;
583   selector_ = NULL;
584 }
585
586 void Whispercast::ProcessMediaRequest(http::ServerRequest* req) {
587   URL* const url = req->request()->url();
588   CHECK(url != NULL);
589   const string url_path(url->UrlUnescape(url->path().c_str(),
590                                            url->path().size()));
591   const string media_path(url_path.substr(FLAGS_media_path.size()));
592   int ip_class = -1;
593   if ( classifiers_ != NULL ) {
594     for ( int i = 0; i < classifiers_->size(); ++i ) {
595       if ( (*classifiers_)[i]->IsInClass(
596                req->remote_address().ip_object()) ) {
597         ip_class = i;
598         break;
599       }
600     }
601   }
602   streaming::Request* mreq = new streaming::Request(*url);
603
604   mreq->mutable_info()->remote_address_ =
605       req->remote_address();
606   mreq->mutable_info()->local_address_ =
607       net::HostPort("127.0.0.1", FLAGS_http_port);
608   mreq->mutable_info()->ip_class_ = ip_class;
609
610   mreq->mutable_info()->auth_req_.net_address_ =
611       req->remote_address().ToString();
612   mreq->mutable_info()->auth_req_.resource_ = url->spec();
613   mreq->mutable_info()->auth_req_.action_ = streaming::kActionView;
614
615   if ( mreq->mutable_info()->auth_req_.token_.empty() ) {
616     mreq->mutable_info()->auth_req_.token_ =
617         req->request()->client_header()->FindField(http::kHeaderCookie);
618   }
619   if ( mreq->mutable_info()->auth_req_.user_.empty() ) {
620     req->request()->client_header()->GetAuthorizationField(
621         &mreq->mutable_info()->auth_req_.user_,
622         &mreq->mutable_info()->auth_req_.passwd_);
623   }
624
625   if ( !media_mapper_->mapper()->GetMediaDetails("http",
626                                                  media_path,
627                                                  mreq) ) {
628     delete mreq;
629     req->ReplyWithStatus(http::NOT_FOUND);
630     return;
631   }
632   if ( mreq->serving_info().flavour_mask_is_set_ ) {
633     mreq->mutable_caps()->flavour_mask_ = mreq->serving_info().flavour_mask_;
634   }
635   if ( mreq->serving_info().authorizer_name_.empty() ) {
636     mreq->mutable_auth_reply()->allowed_ = true;
637     ProcessAuthorizedMediaRequest(req, mreq, NULL);
638   } else {
639     streaming::Authorizer* const auth = media_mapper_->mapper()->GetAuthorizer(
640         mreq->serving_info().authorizer_name_);
641     if ( auth == NULL ) {
642       mreq->mutable_auth_reply()->allowed_ = true;
643       ProcessAuthorizedMediaRequest(req, mreq, NULL);
644     } else {
645       auth->IncRef();
646       auth->Authorize(mreq->info().auth_req_,
647                       mreq->mutable_auth_reply(),
648                       NewCallback(this,
649                                   &Whispercast::ProcessAuthorizedMediaRequest,
650                                   req, mreq, auth));
651     }
652   }
653 }
654
655 void Whispercast::ProcessAuthorizedMediaRequest(
656     http::ServerRequest* req,
657     streaming::Request* mreq,
658     streaming::Authorizer* auth) {
659   if ( !mreq->auth_reply().allowed_ ) {
660     LOG_INFO << "Unauthorized request: " << req->ToString();
661     req->request()->server_data()->Write("Unauthorized");
662     req->ReplyWithStatus(http::UNAUTHORIZED);
663     auth->DecRef();
664     delete mreq;
665     return;
666   }
667   streaming::AuthorizeHelper* auth_helper = NULL;
668   if ( auth != NULL ) {
669     if ( mreq->auth_reply().reauthorize_interval_ms_ > 0 ) {
670       auth_helper = new streaming::AuthorizeHelper(auth);
671       *(auth_helper->mutable_req()) = mreq->info().auth_req_;
672     }
673     auth->DecRef();
674   }
675
676   URL* const url = req->request()->url();
677   CHECK(url != NULL);
678   const string url_path(url->UrlUnescape(url->path().c_str(),
679                                          url->path().size()));
680   // send the custom headers, if any
681   const vector< pair<string, string> >&
682       extra = mreq->serving_info().extra_headers_;
683   if ( !extra.empty() ) {
684     for ( int i = 0; i < extra.size(); ++i ) {
685       req->request()->server_header()->AddField(extra[i].first,
686                                                 extra[i].second,
687                                                 true);
688     }
689   }
690
691   // content type setting
692   if ( mreq->serving_info().content_type_.empty() ) {
693     const size_t dot_pos = url_path.rfind('.');
694     if ( dot_pos != string::npos ) {
695       const string ext(url_path.substr(dot_pos));
696       map<string, string>::const_iterator it_ext = ext2ct_.find(ext);
697       if ( it_ext != ext2ct_.end() ) {
698         req->request()->server_header()->AddField(
699             http::kHeaderContentType, it_ext->second, true);
700       } else {
701         req->request()->server_header()->AddField(
702             http::kHeaderContentType, FLAGS_http_default_content_type, true);
703       }
704     } else {
705       req->request()->server_header()->AddField(
706           http::kHeaderContentType, FLAGS_http_default_content_type, true);
707     }
708   } else {
709     req->request()->server_header()->AddField(
710         http::kHeaderContentType, mreq->serving_info().content_type_, true);
711   }
712
713   // Will autodelete if StartRequest succeeds
714   StreamRequest* const sreq = new StreamRequest(selector_,
715                                                 connection_id_++,
716                                                 stats_collector_,
717                                                 media_mapper_->mapper(),
718                                                 mreq, req, auth_helper);
719   if ( !sreq->StartRequest() ) {
720     delete sreq;
721     req->ReplyWithStatus(http::NOT_FOUND);
722     return;
723   }
724   // else, will auto-delete
725 }
726
727 int main(int argc, char* argv[]) {
728   Whispercast app(argc, argv);
729   return app.Main();
730 }
Note: See TracBrowser for help on using the browser.