root/trunk/whispercast/media_mapper.cc

Revision 7, 40.5 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Authors: Catalin Popescu
31
32 // TODO(cpopescu): config re-loading
33
34 #include "media_mapper.h"
35 #include <whisperlib/net/rpc/lib/codec/json/rpc_json_encoder.h>
36 #include <whisperlib/net/rpc/lib/codec/json/rpc_json_decoder.h>
37 #include <whisperstreamlib/elements/util/media_date.h>
38 #include <whisperlib/common/io/file/file.h>
39 #include <whisperlib/common/io/file/file_output_stream.h>
40 #include <whisperlib/common/io/file/file_input_stream.h>
41 #include <whisperlib/common/io/ioutil.h>
42 #include <whisperlib/common/io/buffer/io_memory_stream.h>
43 #include <whisperlib/common/base/ref_counted.h>
44
45 //////////////////////////////////////////////////////////////////////
46
47 DEFINE_string(saver_dir_prefix,
48               "saved",
49               "We write saved media under this directory - under the "
50               "one pointed by -base_media_dir");
51
52 DECLARE_string(saver_file_prefix);
53 DECLARE_string(saver_file_suffix);
54
55 DEFINE_string(saver_description_file,
56               "description.txt",
57               "Where to put the description of our saves");
58
59
60 DEFINE_int64(max_default_save_duration_sec,
61              10800,  // 3h
62              "When startin on command a save, we stop automatically "
63              "after these many seconds (even when no explicit stop command "
64              "is givven (to prevent `forgoten` save operations)");
65
66 //////////////////////////////////////////////////////////////////////
67
68 DEFINE_int32(media_state_checkpoint_interval_sec,
69              1200,
70              "We checkpoint the state every so often..");
71
72 //////////////////////////////////////////////////////////////////////
73
74 DEFINE_int32(media_aio_block_size,
75              4096,
76              "Alignment of aio memory blocks - should me multiple "
77              "of disk block size");
78 DEFINE_int32(media_aio_buffer_size,
79              64,
80              "We read media from disk in chunks of this size -- "
81              "IN BLOCKS !!");
82 DEFINE_int32(media_aio_buffer_pool_size,
83              1000,
84              "Basically this is the number of blocks that we can "
85              "have allocated at one time");
86 DEFINE_string(disk_devices,
87               "",
88               "Comma sepparated list of disk devices "
89               "(we create one aio manager)"
90               "per disk instance :)");
91
92 //////////////////////////////////////////////////////////////////////
93
94 DECLARE_string(element_libraries_dir);
95
96 //////////////////////////////////////////////////////////////////////
97
98 MediaMapper::MediaMapper(net::Selector* selector,
99                          http::Server* http_server,
100                          rpc::HttpServer* rpc_server,
101                          rtmp::StreamManager* rtmp_manager,
102                          net::UserAuthenticator* admin_authenticator,
103                          const string& config_dir,
104                          const string& base_media_dir,
105                          const string& media_state_directory,
106                          const string& media_state_name,
107                          const string& local_media_state_name)
108   : ServiceInvokerMediaElementService(
109       ServiceInvokerMediaElementService::GetClassName()),
110     selector_(selector),
111     http_server_(http_server),
112     rpc_server_(rpc_server),
113     admin_authenticator_(admin_authenticator),
114     freelist_(FLAGS_media_aio_buffer_size,
115               FLAGS_media_aio_block_size,
116               FLAGS_media_aio_buffer_pool_size,
117               // allocate all buffers initially - per fragmentation issue:
118               true),
119     state_keeper_(media_state_directory.c_str(),
120                   media_state_name.c_str()),
121     local_state_keeper_(media_state_directory.c_str(),
122                         local_media_state_name.c_str()),
123     config_dir_(config_dir),
124     config_file_(config_dir + "/whispercast.config"),
125     hosts_aliases_config_file_(config_dir + "/hosts_aliases.config"),
126     factory_(&media_mapper_,
127              selector,
128              http_server,
129              rpc_server,
130              &aio_managers_,
131              rtmp_manager,
132              &freelist_,
133              &host_aliases_,
134              base_media_dir.c_str(),
135              &state_keeper_,
136              &local_state_keeper_),
137     media_mapper_(selector, &factory_,
138                   new io::StateKeepUser(&state_keeper_, "a/", 0)),
139     loaded_(false),
140     checkpointing_alarm_(
141         NewPermanentCallback(this, &MediaMapper::CheckpointState)),
142     admin_password_(),
143     is_deleting_(false) {
144   aio_managers_[""] = new io::AioManager("GLOBAL", selector);
145   if ( !FLAGS_disk_devices.empty() ) {
146     vector<string> devices;
147     strutil::SplitString(FLAGS_disk_devices, ",", &devices);
148       for ( int i = 0; i < devices.size(); ++i ) {
149         const string name(strutil::NormalizePath(devices[i]));
150         aio_managers_[name] = new io::AioManager(name.c_str(), selector);
151       }
152   }
153   if ( !state_keeper_.Initialize() ) {
154     LOG_ERROR << " ============= ERROR - cannot initialize the state keeper !";
155   }
156   if ( !local_state_keeper_.Initialize() ) {
157     LOG_ERROR << " ============= ERROR - cannot initialize the state keeper !";
158   }
159
160   selector_->RegisterAlarm(checkpointing_alarm_,
161                           FLAGS_media_state_checkpoint_interval_sec * 1000);
162   CHECK(factory_.InitializeLibraries(FLAGS_element_libraries_dir,
163                                      &rpc_library_paths_));
164
165
166   CHECK(http_server_->RegisterProcessor(
167             "/__admin__", NewPermanentCallback(this,
168                                                &MediaMapper::ConfigRootPage),
169             false));   // TODO(cpopescu) - make this protected..
170 }
171
172 MediaMapper::~MediaMapper() {
173   is_deleting_ = true;
174   LOG_INFO << "Deleting media mapper........";
175
176   selector_->UnregisterAlarm(checkpointing_alarm_);
177   delete checkpointing_alarm_;
178
179   LOG_INFO << "Deleting savers";
180   for ( SaverMap::const_iterator it_saver = savers_.begin();
181         it_saver != savers_.end(); ++it_saver ) {
182     delete it_saver->second;
183   }
184   for ( SaverStopAlarmsMap::const_iterator it_stop = savers_stoppers_.begin();
185         it_stop != savers_stoppers_.end(); ++it_stop ) {
186     selector_->UnregisterAlarm(it_stop->second);
187     delete it_stop->second;
188   }
189   LOG_INFO << " Deleting saver specs";
190   for ( SaverSpecMap::const_iterator it_saver = saver_specs_.begin();
191         it_saver != saver_specs_.end(); ++it_saver ) {
192     delete it_saver->second;
193   }
194   for ( AioManagersMap::const_iterator it_managers = aio_managers_.begin();
195         it_managers != aio_managers_.end(); ++it_managers ) {
196     delete it_managers->second;
197   }
198   is_deleting_ = false;
199 }
200
201 //////////////////////////////////////////////////////////////////////
202
203 // Various loading parts :)
204
205 MediaMapper::LoadError MediaMapper::LoadElements() {
206   // Loading and decoding
207   string config_str;
208   if ( !io::FileInputStream::TryReadFile(config_file_.c_str(),
209                                          &config_str) ) {
210     LOG_ERROR << " Cannot open config file: " << config_file_;
211     return LOAD_FILE_ERROR;
212   }
213   ElementConfigurationSpecs specs;
214   io::MemoryStream iomis;
215   iomis.Write(config_str);
216   rpc::JsonDecoder decoder(iomis);
217
218   LoadError err = LOAD_OK;
219   const rpc::DECODE_RESULT config_error = decoder.Decode(specs);
220   if ( config_error != rpc::DECODE_RESULT_SUCCESS ) {
221     LOG_ERROR << "Error decoding ElementConfigurationSpecs from file: "
222               << config_file_;
223     err = LOAD_DATA_ERROR;
224   }
225   rpc::Array<ElementExportSpec> exports;
226   const rpc::DECODE_RESULT exports_error = decoder.Decode(exports);
227   if ( exports_error != rpc::DECODE_RESULT_SUCCESS ) {
228     LOG_ERROR << "Error decoding rpc::Array<ElementExportSpec> from file: "
229               << config_file_;
230     err = LOAD_DATA_ERROR;
231   }
232   rpc::Array<MediaSaverSpec> saves;
233   const rpc::DECODE_RESULT saves_error = decoder.Decode(saves);
234   if ( saves_error != rpc::DECODE_RESULT_SUCCESS ) {
235     LOG_ERROR << "Error decoding rpc::Array<MediaSaverSpec> from file: "
236               << config_file_;
237     err = LOAD_DATA_ERROR;
238   }
239
240   // Creation
241   vector<streaming::ElementFactory::ErrorData> errors;
242   if ( !factory_.AddSpecs(specs, &errors) ) {
243     LOG_ERROR << " Errors encountered while adding specs to media factory: ";
244     for ( int i = 0; i < errors.size(); ++i ) {
245       LOG_ERROR << "ERROR: " << errors[i].description_;
246       err = LOAD_DATA_ERROR;
247     }
248   }
249   for ( int i = 0; i < exports.Size(); ++i ) {
250     string error;
251     if ( !ExportElement(&exports.Get(i), &error) ) {
252       LOG_ERROR << "Error exporting element: " << exports.Get(i) << " : "
253                 << error;
254       err = LOAD_DATA_ERROR;
255     }
256   }
257   for ( int i = 0; i < saves.Size(); ++i ) {
258     LOG_INFO << " Adding save: " << saves.Get(i);
259     saver_specs_.insert(make_pair(saves.Get(i).name_.Get().StdStr(),
260                                   new MediaSaverSpec(saves.Get(i))));
261   }
262   return err;
263 }
264
265 void MediaMapper::InitializeSavers() {
266   for ( SaverSpecMap::const_iterator it = saver_specs_.begin();
267         it != saver_specs_.end(); ++it ) {
268     const string key_prefix(
269         strutil::StringPrintf("savers/%s/", it->first.c_str()));
270     bool started_on_command;
271     int64 last_start_time;
272     if ( local_state_keeper_.GetValue(key_prefix + "started_on_command",
273                                       &started_on_command,
274                                       sizeof(started_on_command)) &&
275          local_state_keeper_.GetValue(key_prefix + "last_start_time",
276                                       &last_start_time,
277                                       sizeof(last_start_time)) ) {
278       if ( started_on_command ) {
279         string error;
280         if ( !StartSaverInternal(it->first, "",
281                                  0, last_start_time, &error) ) {
282           LOG_ERROR << "Cannot restart a previously started saver: "
283                     << it->first << ". Error: " << error;
284         }
285       } else {
286         StartSaverAlarm(it->first, last_start_time);
287       }
288     } else {
289       StartSaverAlarm(it->first, 0);
290     }
291   }
292 }
293
294 MediaMapper::LoadError MediaMapper::LoadHostAliases() {
295   // Loading and decoding the current aliases..
296   string aliases_str;
297   if ( !io::FileInputStream::TryReadFile(hosts_aliases_config_file_.c_str(),
298                                          &aliases_str) ) {
299     LOG_ERROR << " Cannot open aliases config file: "
300               << hosts_aliases_config_file_;
301     return LOAD_FILE_ERROR;
302   }
303   rpc::Array<MediaHostAliasSpec> current_aliases;
304   MediaMapper::LoadError err = LOAD_OK;
305
306   if ( !rpc::JsonDecoder::DecodeObject(aliases_str, &current_aliases) ) {
307     LOG_ERROR << "Error decoding rpc::Array<MediaHostAlias> from file: ["
308               << hosts_aliases_config_file_
309               << "]\nDecoded so far: " << current_aliases;
310     err = LOAD_DATA_ERROR;
311   }
312   for ( int i = 0; i < current_aliases.Size(); ++i ) {
313     const string& name = current_aliases.Get(i).alias_name_.Get().StdStr();
314     const string& ip = current_aliases.Get(i).alias_ip_.Get().StdStr();
315     host_aliases_[name] = ip;
316     LOG_INFO << "Adding alias: [" << name << "] => [" << ip << "]";
317   }
318   return err;
319 }
320
321
322 // This loads the config file and creates the associated structures
323 MediaMapper::LoadError MediaMapper::Load() {
324   MediaMapper::LoadError err_aliases = LoadHostAliases();
325   MediaMapper::LoadError err_elements = LoadElements();
326
327   media_mapper_.Initialize();
328   InitializeSavers();
329
330   if ( err_elements == LOAD_OK ) {
331     return err_aliases;
332   }
333   return err_elements;
334 }
335
336 //////////////////////////////////////////////////////////////////////
337
338 // Saves the current configuration in the config file (safely :)
339 bool MediaMapper::Save() const {
340   const string tmp_file(config_file_ + "__temp__");
341   io::File* const f = new io::File();
342   if ( !f->Open(tmp_file.c_str(),
343                 io::File::GENERIC_READ_WRITE,
344                 io::File::CREATE_ALWAYS) ) {
345     LOG_ERROR << " Cannot open temporarely config file for writing: ["
346               << tmp_file << "]";
347     return false;
348   }
349   {
350     io::FileOutputStream fos(f);
351     {
352       io::MemoryStream iostream;
353       ElementConfigurationSpecs* specs = factory_.GetSpec();
354       rpc::JsonEncoder encoder_specs(iostream);
355       encoder_specs.Encode(*specs);
356       delete specs;
357       string s;
358       iostream.ReadString(&s);
359       if ( fos.WriteString(s) != s.size() ) {
360         LOG_ERROR << " Error writing config to: " << tmp_file;
361         return false;
362       }
363     }
364
365     {
366       io::MemoryStream iostream;
367       rpc::JsonEncoder encoder_exports(iostream);
368       rpc::Array< ElementExportSpec > ret;
369       GetElementExports(&ret);
370       encoder_exports.Encode(ret);
371       string s;
372       iostream.ReadString(&s);
373       if ( fos.WriteString(s) != s.size() ) {
374         LOG_ERROR << " Error writing config to: " << tmp_file;
375         return false;
376       }
377     }
378
379     {
380       io::MemoryStream iostream;
381       rpc::JsonEncoder encoder_saves(iostream);
382       rpc::Array<MediaSaverSpec> ret;
383       GetSavesConfig(&ret);
384       encoder_saves.Encode(ret);
385       string s;
386       iostream.ReadString(&s);
387       if ( fos.WriteString(s) != s.size() ) {
388         LOG_ERROR << " Error writing config to: " << tmp_file;
389         return false;
390       }
391     }
392   }
393   if ( rename(tmp_file.c_str(), config_file_.c_str()) ) {
394     LOG_ERROR << "Cannot move the temp config to the main config file";
395     return false;
396   }
397   return true;
398 }
399
400 bool MediaMapper::SaveHostAliases() const {
401   const string tmp_file(hosts_aliases_config_file_ + "__temp__");
402   io::File* const f = new io::File();
403   if ( !f->Open(tmp_file.c_str(),
404                 io::File::GENERIC_READ_WRITE,
405                 io::File::CREATE_ALWAYS) ) {
406     LOG_ERROR << " Cannot open temporarely config file for writing: ["
407               << tmp_file << "]";
408     return false;
409   }
410   {
411     io::FileOutputStream fos(f);
412     {
413       io::MemoryStream iostream;
414       rpc::Array <MediaHostAliasSpec> aliases;
415       GetHostAliases(&aliases);
416       rpc::JsonEncoder encoder_specs(iostream);
417       encoder_specs.Encode(aliases);
418       string s;
419       iostream.ReadString(&s);
420       if ( fos.WriteString(s) != s.size() ) {
421         LOG_ERROR << " Error writing config to: " << tmp_file;
422         return false;
423       }
424     }
425   }
426   if ( rename(tmp_file.c_str(), hosts_aliases_config_file_.c_str()) ) {
427     LOG_ERROR << "Cannot move the temp hosts aliases config to "
428               << "the main config aliasesfile";
429     return false;
430   }
431   return true;
432 }
433
434 void MediaMapper::CheckpointState() {
435   selector_->ReregisterAlarm(checkpointing_alarm_,
436                              FLAGS_media_state_checkpoint_interval_sec * 1000);
437   CHECK(state_keeper_.Checkpoint());
438   CHECK(local_state_keeper_.Checkpoint());
439 }
440
441
442 void MediaMapper::ConfigRootPage(http::ServerRequest* req) {
443   if ( !req->AuthenticateRequest(admin_authenticator_) ) {
444     // TODO: we may want to authenticate asynchronously
445     req->AnswerUnauthorizedRequest(admin_authenticator_);
446     return;
447   }
448   req->request()->server_data()->Write(
449       "<html><body><h2>Select your command path:</h2>");
450   req->request()->server_data()->Write(
451       strutil::StringPrintf(
452           "<a href=\"%s/MediaElementService/__forms\">Global Config</a><br/>",
453           rpc_server_->path().c_str()));
454   for ( int i = 0; i < rpc_library_paths_.size(); ++i ) {
455     req->request()->server_data()->Write(
456         strutil::StringPrintf(
457             "<a href=\"%s/__forms\">"
458             "Library [%s] Config</a><br/>",
459             rpc_library_paths_[i].second.c_str(),
460             rpc_library_paths_[i].first.c_str()));
461   }
462   req->request()->server_data()->Write("</body></html>");
463   req->Reply();
464 }
465
466   //////////////////////////////////////////////////////////////////////
467
468
469 void MediaMapper::AddPolicySpec(
470   rpc::CallContext< MediaOperationErrorData >* call,
471   const PolicySpecs* spec) {
472   MediaOperationErrorData ret;
473   streaming::ElementFactory::ErrorData err_data;
474   PolicySpecs* new_spec = new PolicySpecs(*spec);
475   const bool success = factory_.AddPolicySpec(new_spec, &err_data);
476   if ( !success ) {
477     delete new_spec;
478     ret.error_.Ref() = 1;
479     ret.description_.Ref() = err_data.description_;
480   } else {
481     ret.error_.Ref() = 0;
482   }
483   call->Complete(ret);
484 }
485
486 void MediaMapper::DeletePolicySpec(
487   rpc::CallContext< MediaOperationErrorData >* call,
488   const rpc::String* name) {
489   MediaOperationErrorData ret;
490   streaming::ElementFactory::ErrorData err_data;
491   const bool success = factory_.DeletePolicySpec(name->StdStr(), &err_data);
492   if ( !success ) {
493     ret.error_.Ref() = 1;
494     ret.description_.Ref() = err_data.description_;
495   } else {
496     // We delete just the specs .. we leave the created policies around ..
497     ret.error_.Ref() = 0;
498   }
499   call->Complete(ret);
500 }
501
502 void MediaMapper::AddAuthorizerSpec(
503   rpc::CallContext< MediaOperationErrorData >* call,
504   const AuthorizerSpecs* spec) {
505   MediaOperationErrorData ret;
506   streaming::ElementFactory::ErrorData err_data;
507   AuthorizerSpecs* new_spec = new AuthorizerSpecs(*spec);
508   const bool success = factory_.AddAuthorizerSpec(new_spec, &err_data);
509   if ( !success ) {
510     delete new_spec;
511     ret.error_.Ref() = 1;
512     ret.description_.Ref() = err_data.description_;
513   } else {
514     if ( !media_mapper_.AddAuthorizer(spec->name_.Get().StdStr()) ) {
515       factory_.DeleteAuthorizerSpec(spec->name_.Get().StdStr(), &err_data);
516       call->Complete(MediaOperationErrorData(
517                          1, "Failed to create authorizer; "
518                          "check whispercast log."));
519       return;
520     }
521     ret.error_.Ref() = 0;
522   }
523   call->Complete(ret);
524 }
525
526 void MediaMapper::DeleteAuthorizerSpec(
527   rpc::CallContext< MediaOperationErrorData >* call,
528   const rpc::String* name) {
529   MediaOperationErrorData ret;
530   streaming::ElementFactory::ErrorData err_data;
531   const bool success = factory_.DeleteAuthorizerSpec(name->StdStr(), &err_data);
532   if ( !success ) {
533     ret.error_.Ref() = 1;
534     ret.description_.Ref() = err_data.description_;
535   } else {
536     media_mapper_.RemoveAuthorizer(name->StdStr());
537     ret.error_.Ref() = 0;
538   }
539   call->Complete(ret);
540 }
541
542
543 void MediaMapper::AddElementSpec(
544     rpc::CallContext< MediaOperationErrorData >* call,
545     const MediaElementSpecs* spec) {
546   streaming::ElementFactory::ErrorData err_data;
547   MediaElementSpecs* new_spec = new MediaElementSpecs(*spec);
548   bool success = factory_.AddElementSpec(new_spec,
549                                          &err_data);
550   if ( !success ) {
551     delete new_spec;
552     call->Complete(MediaOperationErrorData(1, err_data.description_));
553     return;
554   }
555
556   const string name(spec->name_.Get().StdStr());
557   if ( !media_mapper_.AddElement(name, spec->is_global_.Get().Get()) ) {
558     factory_.DeleteElementSpec(new_spec->name_.Get().StdStr(), &err_data);
559     call->Complete(MediaOperationErrorData(
560                        1, "failed to create element; check whispercast log."));
561     return;
562   }
563   call->Complete(MediaOperationErrorData(0, ""));
564 }
565
566 void MediaMapper::DeleteElementSpec(
567   rpc::CallContext< MediaOperationErrorData >* call,
568   const rpc::String* name) {
569   MediaOperationErrorData ret;
570   streaming::ElementFactory::ErrorData err_data;
571   const bool success = factory_.DeleteElementSpec(name->StdStr(), &err_data);
572   if ( !success ) {
573     ret.error_.Ref() = 1;
574     ret.description_.Ref() = err_data.description_;
575   } else {
576     media_mapper_.RemoveElement(name->StdStr());
577     ret.error_.Ref() = 0;
578   }
579   call->Complete(ret);
580 }
581
582 void MediaMapper::AddElementSaver(
583   rpc::CallContext< MediaOperationErrorData >* call,
584   const MediaSaverSpec* spec) {
585   MediaOperationErrorData ret;
586   streaming::ElementFactory::ErrorData err_data;
587   const string saver_name(spec->name_.Get().StdStr());
588   const string media_name(spec->media_name_.Get().StdStr());
589
590   if ( saver_specs_.find(saver_name) != saver_specs_.end() ) {
591     ret.error_.Ref() = 1;
592     ret.description_.Ref() = "Saver already exists:" + saver_name;
593   } else if ( media_mapper_.HasMedia(media_name.c_str()).is_invalid() ) {
594     ret.error_.Ref() = 1;
595     ret.description_.Ref() = "Unknown Media::" + media_name;
596   } else {
597     CHECK(saver_specs_.insert(
598             make_pair(saver_name, new MediaSaverSpec(*spec))).second);
599     ret.error_.Ref() = 0;
600     StartSaverAlarm(saver_name, 0);
601   }
602
603   call->Complete(ret);
604 }
605
606 void MediaMapper::DeleteElementSaver(
607   rpc::CallContext< MediaOperationErrorData >* call,
608   const rpc::String* name) {
609   const string saver_name(name->StdStr());
610   MediaOperationErrorData ret;
611   SaverSpecMap::iterator it = saver_specs_.find(saver_name);
612   if ( saver_specs_.find(saver_name) == saver_specs_.end() ) {
613     ret.error_.Ref() = 1;
614     ret.description_.Ref() = "Saver does not exist:" + saver_name;
615   } else {
616     if ( savers_.find(saver_name) != savers_.end() ) {
617       ret.description_.Ref() = "Warning: the saver is still active "
618         "(deleting the spec though)";
619     }
620     delete it->second;
621     saver_specs_.erase(it);
622     ret.error_.Ref() = 0;
623   }
624   call->Complete(ret);
625 }
626
627 //////////////////////////////////////////////////////////////////////
628
629 bool MediaMapper::ExportElement(const ElementExportSpec* spec,
630                                 string* error) {
631   const string media_name(spec->media_name_.Get().StdStr());
632   if ( spec->protocol_.Get().StdStr() != "http" &&
633        spec->protocol_.Get().StdStr() != "rtmp" ) {
634     *error = "Invalid protocol specified";
635     return false;
636   }
637   LOG_INFO << "=======> Exporting spec: " << spec->ToString();
638   streaming::RequestServingInfo* serving_info =
639       new streaming::RequestServingInfo();
640   serving_info->media_name_ = media_name;
641   if ( spec->extra_headers_.IsSet() ) {
642     for ( int i = 0; i < spec->extra_headers_.Get().Size(); ++i ) {
643       serving_info->extra_headers_.push_back(
644           make_pair(spec->extra_headers_.Get().Get(i).name_.Get().StdStr(),
645                     spec->extra_headers_.Get().Get(i).value_.Get().StdStr()));
646     }
647   }
648   if ( spec->content_type_.IsSet() ) {
649     serving_info->tag_type_ = streaming::GetStreamType(
650         spec->content_type_.Get().StdStr());
651     if ( serving_info->tag_type_ != streaming::UNKNOWN_STREAM_TYPE ) {
652       // Short content type specified
653       serving_info->content_type_ = GetContentTypeFromStreamType(
654           serving_info->tag_type_);
655     } else {
656       // Full content type specified
657       serving_info->tag_type_ = streaming::GetStreamTypeFromContentType (
658           spec->content_type_.Get().StdStr());
659       serving_info->content_type_ = spec->content_type_.Get().StdStr();
660     }
661    }
662   if ( spec->authorizer_name_.IsSet() ) {
663     serving_info->authorizer_name_ = spec->authorizer_name_.Get().StdStr();
664   }
665   if ( spec->enable_buffer_flow_control_.IsSet() ) {
666     serving_info->enable_buffer_flow_control_ =
667         spec->enable_buffer_flow_control_.Get().Get();
668   }
669   if ( spec->flavour_mask_.IsSet() ) {
670     serving_info->flavour_mask_is_set_ = true;
671     serving_info->flavour_mask_ = static_cast<uint32>(
672         spec->flavour_mask_.Get().Get());
673   }
674   if ( !media_mapper_.AddServingInfo(spec->protocol_.Get().StdStr(),
675                                      spec->path_.Get().StdStr(),
676                                      serving_info) ) {
677     *error = "Protocol:Path already registered";
678     delete serving_info;
679     return false;
680   }
681   return true;
682 }
683
684 void MediaMapper::StartExportElement(
685   rpc::CallContext< MediaOperationErrorData >* call,
686   const ElementExportSpec* spec) {
687   MediaOperationErrorData ret;
688   string error;
689   if ( !ExportElement(spec, &error) ) {
690     ret.error_.Ref() = 1;
691     ret.description_.Ref() = error;
692   } else {
693     ret.error_.Ref() = 0;
694   }
695   call->Complete(ret);
696 }
697
698 void MediaMapper::StopExportElement(
699   rpc::CallContext< MediaOperationErrorData > * call,
700   const rpc::String * protocol, const rpc::String * path) {
701   MediaOperationErrorData ret;
702   if ( !media_mapper_.RemoveServingInfo(protocol->StdStr(),
703                                         path->StdStr()) ) {
704     ret.error_.Ref() = 1;
705     ret.description_.Ref() = "Proto/path not exported: " + path->StdStr();
706   } else {
707     ret.error_.Ref() = 0;
708   }
709   call->Complete(ret);
710 }
711
712 //////////////////////////////////////////////////////////////////////
713
714 void MediaMapper::StartSaver(
715   rpc::CallContext< MediaOperationErrorData >* call,
716   const rpc::String* name,
717   const rpc::String* description) {
718   const string saver_name(name->StdStr());
719   MediaOperationErrorData ret;
720
721   if ( saver_specs_.find(saver_name) == saver_specs_.end() ) {
722     ret.error_.Ref() = 1;
723     ret.description_.Ref() = "Saver does not exist:" + saver_name;
724   } else if ( savers_.find(saver_name) != savers_.end() ) {
725     ret.error_.Ref() = 1;
726     ret.description_.Ref() = "Saver already started";
727   } else {
728     string error;
729     if ( !StartSaverInternal(saver_name, description->StdStr(),
730                              0, 0, &error) ) {
731       ret.error_.Ref() = 1;
732       ret.description_.Ref() = "Error starting saver: " + error;
733     } else {
734       ret.error_.Ref() = 0;
735     }
736   }
737   call->Complete(ret);
738 }
739
740 void MediaMapper::StopSaver(
741   rpc::CallContext< MediaOperationErrorData >* call,
742   const rpc::String* name) {
743   const string saver_name(name->StdStr());
744   MediaOperationErrorData ret;
745   SaverMap::iterator it_saver = savers_.find(saver_name);
746   if ( it_saver == savers_.end() ) {
747     ret.error_.Ref() = 1;
748     ret.description_.Ref() = "No active saver: " + saver_name;
749   } else {
750     ret.error_.Ref() = 0;
751     // will cause deletion and erasing from map:
752     LOG_INFO << " Stopping saver: " << saver_name;
753     it_saver->second->StopSaving();
754   }
755   call->Complete(ret);
756 }
757
758 //////////////////////////////////////////////////////////////////////
759
760 void MediaMapper::OnStartSaver(string* name,
761                                int duration_in_seconds,
762                                int64 last_start_time) {
763   string error;
764   if ( !StartSaverInternal(*name, "",
765                            duration_in_seconds, last_start_time,
766                            &error) ) {
767     LOG_WARNING << "OnStartSaver: " << error;
768   }
769   delete name;
770 }
771
772 void MediaMapper::StartSaverAlarm(const string& name,
773                                   int64 last_start_time) {
774   SaverSpecMap::const_iterator it = saver_specs_.find(name);
775   if ( it == saver_specs_.end() ) {
776     LOG_WARNING << "Cannot schedule a saver start for: " << name
777                 << " as we don't have the saver spec..";
778     return;
779   }
780   const timer::Date now(false);
781   int64 min_next_happening = kMaxInt64;
782   int duration_in_seconds = 0;
783   for ( int i = 0; i < it->second->timespecs_.Get().Size(); ++i ) {
784     const int64 crt = streaming::NextHappening(
785       it->second->timespecs_.Get().Get(i), now);
786     LOG_INFO << " Crt Next Happening for: " << name << ": " << crt;
787     if ( crt > min_next_happening ) {
788       continue;
789     }
790     min_next_happening = crt;
791     duration_in_seconds =
792       it->second->timespecs_.Get().Get(i).duration_in_seconds_.Get();
793     if ( 0 > min_next_happening ) {
794       if ( -min_next_happening <= duration_in_seconds * 1000 ) {
795         duration_in_seconds += min_next_happening / 1000;
796       }
797     }
798     LOG_INFO << " Min Next Happening for: " << name
799              << ": " << min_next_happening
800              << " with duration: " << duration_in_seconds;
801   }
802   if ( duration_in_seconds < 0 ||
803        min_next_happening == kMaxInt64 ) {
804     LOG_WARNING << "Cannot schedule a saver start for: " << name
805                 << " as it is manual or has no valid time spec.";
806     return;
807   }
808   const int64 to_wait = max(min_next_happening, static_cast<int64>(0));
809   CHECK_GE(to_wait, 0);
810   selector_->RegisterAlarm(NewCallback(this, &MediaMapper::OnStartSaver,
811                                        new string(name),
812                                        duration_in_seconds,
813                                        last_start_time),
814                            to_wait);
815 }
816
817 bool MediaMapper::StartSaverInternal(const string& name,
818                                      const string& description,
819                                      int duration_sec,
820                                      int64 last_start_time,
821                                      string* error) {
822   const SaverMap::const_iterator it_saver = savers_.find(name);
823   timer::Date now(false);
824   if ( it_saver != savers_.end() ) {
825     *error = strutil::StringPrintf(
826       "Cannot start start saver '%s', as it is already running for %lld ms",
827       name.c_str(),
828       static_cast<long long int>(now.GetTime() -
829                                  it_saver->second->start_time()));
830     return false;
831   }
832   const SaverSpecMap::const_iterator it_spec = saver_specs_.find(name);
833   if ( it_spec == saver_specs_.end() ) {
834     *error = ("Cannot start saver '" + name
835               + "' as we don't have an associated saver spec.");
836     return false;
837   }
838   const string media_name = it_spec->second->media_name_.Get().StdStr();
839   string dirname;
840   if ( last_start_time == 0 ) {
841     dirname = now.ToShortString();
842     LOG_INFO << " Started new save at: " << now.ToString();
843   } else {
844     timer::Date lt(last_start_time, false);
845     LOG_INFO << " Using retreived last start time: "
846              << lt.ToString();
847     dirname = lt.ToShortString();
848   }
849   const string saver_dir(
850     strutil::StringPrintf("%s/%s/%s/%s/",
851                           factory_.base_media_dir().c_str(),
852                           FLAGS_saver_dir_prefix.c_str(),
853                           name.c_str(),
854                           dirname.c_str()));
855   streaming::Saver* const saver = new streaming::Saver(
856       name,
857       &media_mapper_,
858       streaming::UNKNOWN_STREAM_TYPE,
859       media_name,
860       saver_dir,
861       FLAGS_saver_file_prefix,
862       FLAGS_saver_file_suffix,
863       last_start_time == 0 ? now.GetTime() : last_start_time,
864       duration_sec == 0,
865       NewCallback(this, &MediaMapper::OnSaveStopped));
866   if ( !saver->StartSaving() ) {
867     *error = ("Cannot seem to be able to start saver '"+name +
868         "'(most probably a directory error).");
869     delete saver;
870     return false;
871   }
872   if ( !description.empty() ) {
873     if ( !io::FileOutputStream::TryWriteFile(
874            (saver_dir + "/" + FLAGS_saver_description_file).c_str(),
875            description) ) {
876       LOG_ERROR << "Cannot write saver description file to: " << saver_dir;
877     }
878   }
879   if ( duration_sec > 0 ) {
880     Closure* stopper = NewPermanentCallback(saver,
881                                             &streaming::Saver::StopSaving);
882     selector_->RegisterAlarm(stopper,
883                              duration_sec * 1000);
884     CHECK(savers_stoppers_.insert(make_pair(name, stopper)).second);
885   } else if ( FLAGS_max_default_save_duration_sec > 0 ) {
886     Closure* stopper = NewPermanentCallback(saver,
887                                             &streaming::Saver::StopSaving);
888     selector_->RegisterAlarm(stopper,
889                              FLAGS_max_default_save_duration_sec * 1000);
890     CHECK(savers_stoppers_.insert(make_pair(name, stopper)).second);
891   }
892   CHECK(savers_.insert(make_pair(name, saver)).second);
893   const string key_prefix(strutil::StringPrintf("savers/%s/", name.c_str()));
894   const bool started_on_command = saver->started_on_command();
895   const int64 saver_last_start_time = saver->start_time();
896   local_state_keeper_.SetValue(
897       key_prefix + "started_on_command",
898       &started_on_command, sizeof(started_on_command));
899   local_state_keeper_.SetValue(
900       key_prefix + "last_start_time",
901       &saver_last_start_time, sizeof(saver_last_start_time));
902   return true;
903 }
904
905 void MediaMapper::OnSaveStopped(streaming::Saver* saver) {
906   const SaverMap::iterator it_saver = savers_.find(saver->name());
907   if ( it_saver != savers_.end() ) {
908     savers_.erase(it_saver);
909   } else {
910     LOG_ERROR << "Saver " << saver->name()
911               << " stopped - but I cannot find it in "
912               << "the active saver list.";
913   }
914   LOG_INFO << " Saver " << saver->name() << " Stopped..";
915   const SaverStopAlarmsMap::iterator
916     it_stop = savers_stoppers_.find(saver->name());
917   if ( it_stop != savers_stoppers_.end() ) {
918     selector_->UnregisterAlarm(it_stop->second);
919     // Deleting this way as we may be in it ..
920     selector_->DeleteInSelectLoop(it_stop->second);
921     savers_stoppers_.erase(it_stop);
922   } else {
923     LOG_ERROR << "Bad - no save stop callback found for: " << saver->name();
924   }
925   if ( !is_deleting_ && !selector_->IsExiting() ) {
926     saver->CreateSignalingFile(".save_done", "");
927     const string key_prefix(strutil::StringPrintf("savers/%s/",
928                                                   saver->name().c_str()));
929     local_state_keeper_.DeletePrefix(key_prefix);
930
931     // Schedule the next moment of saving
932     StartSaverAlarm(saver->name(), 0);
933   } else {
934     LOG_INFO << " Saver " << saver->name()
935              << " stopped on deleting - keeping its state.. ";
936   }
937   selector_->DeleteInSelectLoop(saver);
938 }
939
940 //////////////////////////////////////////////////////////////////////
941
942 void MediaMapper::GetSavesConfig(
943   rpc::Array< MediaSaverSpec >* saves) const {
944   for ( SaverSpecMap::const_iterator it = saver_specs_.begin();
945         it != saver_specs_.end(); ++it ) {
946     saves->PushBack(*it->second);
947   }
948 }
949
950 void MediaMapper::GetSavesConfig(
951   rpc::CallContext< rpc::Array< MediaSaverSpec > >* call) {
952   rpc::Array<MediaSaverSpec> ret;
953   GetSavesConfig(&ret);
954   call->Complete(ret);
955 }
956
957 ///////
958
959 void MediaMapper::GetCurrentSaves(
960   rpc::Array< MediaSaverState >* saves) const {
961   for ( SaverMap::const_iterator it = savers_.begin();
962         it != savers_.end(); ++it ) {
963     saves->PushBack(MediaSaverState(it->second->name(),
964                                     it->second->start_time(),
965                                     it->second->started_on_command()));
966   }
967 }
968
969 void MediaMapper::GetCurrentSaves(
970   rpc::CallContext< rpc::Array< MediaSaverState > >* call) {
971   rpc::Array<MediaSaverState> ret;
972   GetCurrentSaves(&ret);
973   call->Complete(ret);
974 }
975
976 ///////
977
978 void MediaMapper::GetHttpExportRoot(
979   rpc::CallContext< rpc::String >* call) {
980   call->Complete(rpc::String(factory_.base_media_dir()));
981 }
982
983 void MediaMapper::GetElementConfig(
984   rpc::CallContext< ElementConfigurationSpecs >* call) {
985   ElementConfigurationSpecs* specs = factory_.GetSpec();
986   call->Complete(*specs);
987   delete specs;
988 }
989
990 ///////
991
992 void MediaMapper::GetElementExports(
993   rpc::Array< ElementExportSpec >* exports) const {
994   const streaming::FactoryBasedElementMapper::ServingInfoMap&
995       serving_paths = media_mapper_.serving_paths();
996   for ( streaming::FactoryBasedElementMapper::ServingInfoMap::const_iterator
997             it = serving_paths.begin(); it  != serving_paths.end(); ++it ) {
998     size_t column_pos = it->first.find(':');
999     CHECK(column_pos != string::npos);
1000
1001     ElementExportSpec src_spec;
1002     src_spec.media_name_.Ref() = it->second->media_name_;
1003     src_spec.protocol_.Ref() = it->first.substr(0, column_pos);
1004     src_spec.path_.Ref() = it->first.substr(column_pos + 2);
1005
1006     if ( !it->second->content_type_.empty() ) {
1007       src_spec.content_type_.Ref() = it->second->content_type_;
1008           streaming::GetSmallTypeFromContentType(
1009               streaming::GetContentTypeFromStreamType(it->second->tag_type_));
1010     }
1011     for ( int i = 0; i < it->second->extra_headers_.size(); ++i ) {
1012       src_spec.extra_headers_.Ref().PushBack(
1013           ExtraHeaders(it->second->extra_headers_[i].first,
1014                        it->second->extra_headers_[i].second));
1015     }
1016     if ( !it->second->authorizer_name_.empty() ) {
1017       src_spec.authorizer_name_.Set(it->second->authorizer_name_);
1018     }
1019     src_spec.enable_buffer_flow_control_.Set(
1020         it->second->enable_buffer_flow_control_);
1021     if ( it->second->flavour_mask_is_set_ ) {
1022       src_spec.flavour_mask_.Set(
1023           it->second->flavour_mask_);
1024     }
1025
1026     exports->PushBack(src_spec);
1027   }
1028 }
1029
1030 void MediaMapper::GetElementExports(
1031   rpc::CallContext< rpc::Array< ElementExportSpec > >* call) {
1032   rpc::Array< ElementExportSpec > ret;
1033   GetElementExports(&ret);
1034   call->Complete(ret);
1035 }
1036
1037 ///////
1038
1039 void MediaMapper::SaveConfig(
1040   rpc::CallContext< rpc::Bool >* call) {
1041   rpc::Bool ret(Save());
1042   call->Complete(ret);
1043 }
1044
1045 void MediaMapper::ListMedia(rpc::CallContext< rpc::Array<rpc::String> >* call,
1046                             const rpc::String* media_name) {
1047   streaming::ElementDescriptions medias;
1048   media_mapper_.ListMedia(media_name->StdStr().c_str(), &medias);
1049   rpc::Array<rpc::String> ret;
1050   for ( int i = 0; i < medias.size(); ++i ) {
1051     ret.PushBack(rpc::String(medias[i].first));
1052   }
1053   call->Complete(ret);
1054 }
1055
1056 //////////////////////////////////////////////////////////////////////
1057
1058 void MediaMapper::AddHostAlias(rpc::CallContext<MediaOperationErrorData>* call,
1059                                const rpc::String* alias_name,
1060                                const rpc::String* alias_ip) {
1061   MediaOperationErrorData ret;
1062   net::IpAddress ip(alias_ip->StdStr().c_str());
1063   if ( ip.IsInvalid() ) {
1064     ret.error_.Ref() = 1;
1065     ret.description_.Ref() = "Invalid IPV4 provided.";
1066   } else {
1067     Host2IpMap::iterator it = host_aliases_.find(alias_name->StdStr());
1068     if ( it != host_aliases_.end() ) {
1069       ret.description_.Ref() = string("Replacing old alias: [") + it->second + "]";
1070       it->second = alias_ip->StdStr();
1071     } else {
1072       host_aliases_.insert(make_pair(alias_name->StdStr(),
1073                                      alias_ip->StdStr()));
1074     }
1075     if ( !SaveHostAliases() ) {
1076       ret.error_.Ref() = 1;
1077       ret.description_.Ref() = "Error savind host aliases file!";
1078     } else {
1079       ret.error_.Ref() = 0;
1080     }
1081   }
1082   call->Complete(ret);
1083 }
1084
1085 void MediaMapper::DeleteHostAlias(rpc::CallContext<MediaOperationErrorData>* call,
1086                                   const rpc::String* alias_name) {
1087   MediaOperationErrorData ret;
1088   Host2IpMap::iterator it = host_aliases_.find(alias_name->StdStr());
1089   if ( it != host_aliases_.end() ) {
1090     host_aliases_.erase(it);
1091     if ( !SaveHostAliases() ) {
1092       ret.error_.Ref() = 1;
1093       ret.description_.Ref() = "Error savind host aliases file!";
1094     } else {
1095       ret.error_.Ref() = 0;
1096     }
1097   } else {
1098     ret.error_.Ref() = 1;
1099     ret.description_.Ref() = "Cannot find the given alias!";
1100   }
1101   call->Complete(ret);
1102 }
1103
1104 void MediaMapper::GetHostAliases(rpc::Array <MediaHostAliasSpec>* aliases) const {
1105   for ( Host2IpMap::const_iterator it = host_aliases_.begin();
1106         it != host_aliases_.end(); ++it ) {
1107     MediaHostAliasSpec alias;
1108     alias.alias_name_.Ref() = it->first;
1109     alias.alias_ip_.Ref() = it->second;
1110     aliases->PushBack(alias);
1111   }
1112 }
1113
1114 void MediaMapper::GetHostAliases(
1115   rpc::CallContext< rpc::Array <MediaHostAliasSpec> >* call) {
1116   rpc::Array <MediaHostAliasSpec> aliases;
1117   GetHostAliases(&aliases);
1118   call->Complete(aliases);
1119 }
1120
1121 //////////////////////////////////////////////////////////////////////
1122
1123 void MediaMapper::GetAllMediaAliases(
1124     rpc::CallContext< rpc::Array<MediaAliasSpec> >* call) {
1125   vector< pair<string, string> > vec_aliases;
1126   media_mapper_.GetAllMediaAliases(&vec_aliases);
1127   rpc::Array <MediaAliasSpec> aliases;
1128   for ( int i = 0; i < vec_aliases.size(); ++i ) {
1129     aliases.PushBack(MediaAliasSpec(vec_aliases[i].first,
1130                                     vec_aliases[i].second));
1131   }
1132   call->Complete(aliases);
1133 }
1134 void MediaMapper::SetMediaAlias(rpc::CallContext<MediaOperationErrorData>* call,
1135                                 const rpc::String* alias_name,
1136                                 const rpc::String* media_name) {
1137   MediaOperationErrorData ret;
1138   string error;
1139   if ( media_mapper_.SetMediaAlias(alias_name->StdStr(),
1140                                    media_name->StdStr(),
1141                                    &error) ) {
1142     ret.error_.Ref() = 0;
1143   } else {
1144     ret.error_.Ref() = 1;
1145     ret.description_.Ref() = error;
1146   }
1147   call->Complete(ret);
1148 }
1149 void MediaMapper::GetMediaAlias(rpc::CallContext<rpc::String>* call,
1150                                 const rpc::String* alias_name) {
1151   string ret;
1152   if ( media_mapper_.GetMediaAlias(alias_name->StdStr(), &ret) ) {
1153     call->Complete(rpc::String(ret));
1154   } else {
1155     call->Complete(rpc::String(""));
1156   }
1157 }
Note: See TracBrowser for help on using the browser.