root/trunk/whisperlib/net/base/selector.cc

Revision 7, 13.6 kB (checked in by whispercastorg, 2 years ago)

version 0.2.0

Line 
1 // Copyright (c) 2009, Whispersoft s.r.l.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above
11 // copyright notice, this list of conditions and the following disclaimer
12 // in the documentation and/or other materials provided with the
13 // distribution.
14 // * Neither the name of Whispersoft s.r.l. nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Authors: Cosmin Tudorache & Catalin Popescu
31
32 #include "net/base/selector.h"
33
34 #include <pthread.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37
38 #include "common/base/log.h"
39 #include "common/base/errno.h"
40 #include "common/base/timer.h"
41
42 #include "net/base/selectable.h"
43
44 //////////////////////////////////////////////////////////////////////
45
46 DEFINE_bool(selector_high_alarm_precission,
47             false,
48             "Loose some CPU time and gain that extra milisecond precission "
49             "for selector alarms..");
50
51 DEFINE_int32(debug_check_long_callbacks_ms,
52              500,
53              "If greater than zero, we check (in debug mode only !) "
54              "that processing functions, callbacks and alarm functions take "
55              "less then this amount of time, in miliseconds");
56
57 //////////////////////////////////////////////////////////////////////
58
59 namespace net {
60
61 Selector::Selector()
62   : tid_(0),
63     should_end_(false),
64     epfd_(INVALID_FD_VALUE),
65     now_(timer::TicksMsec()) {
66   epfd_ = epoll_create(10);
67   CHECK_GE(epfd_, 0) << "epoll_create() failed: "
68                      << GetLastSystemErrorDescription();
69   CHECK(!pipe(signal_pipe_)) <<  "pipe() failed:  "
70                              << GetLastSystemErrorDescription();
71   for ( int i = 0; i < NUMBEROF(signal_pipe_); ++i ) {
72     const int flags = fcntl(signal_pipe_[i], F_GETFL, 0);
73     CHECK_GE(flags, 0) << " fcntl fail: " << GetLastSystemErrorDescription();
74     CHECK_GE(fcntl(signal_pipe_[i], F_SETFL, flags | O_NONBLOCK), 0)
75       << "fcntl fail: " << GetLastSystemErrorDescription();
76   }
77   CHECK(EpollCtlAdd(signal_pipe_[0], NULL, EPOLLIN | EPOLLRDHUP | EPOLLHUP));
78 }
79
80 Selector::~Selector() {
81   CHECK_EQ(tid_, 0);
82   CHECK(registered_.empty());
83 }
84
85 void Selector::Loop() {
86   CHECK_EQ(tid_, 0) << "Loop already started -- bad !";
87   should_end_ = false;
88   tid_ = pthread_self();
89   LOG_INFO << "Starting selector loop";
90   while ( !should_end_ ) {
91     struct epoll_event event = { 0, };
92     int32 to_sleep = kStandardWakeUpTimeMs;
93     if ( FLAGS_selector_high_alarm_precission ) {
94       now_ = timer::TicksMsec();
95     }
96     if ( !alarms_.empty() ) {
97       if ( alarms_.begin()->first < now_ + to_sleep ) {
98         to_sleep = alarms_.begin()->first - now_;
99       }
100     }
101     const int num_events = epoll_wait(epfd_, &event, 1, to_sleep);
102     if ( num_events == -1 && errno != EINTR ) {
103       LOG_ERROR << "epoll_wait() error: " << GetLastSystemErrorDescription();
104       break;
105     }
106     now_ = timer::TicksMsec();
107     if ( num_events > 0 ) {
108       DCHECK_EQ(num_events, 1);
109       Selectable* const s = reinterpret_cast<Selectable *>(event.data.ptr);
110 #ifdef _DEBUG
111       const int64 processing_begin =
112           FLAGS_debug_check_long_callbacks_ms > 0 ? timer::TicksMsec() : 0;
113 #endif
114       if ( s != NULL ) {
115         // During HandleXEvent the obj may be closed loosing so track of
116         // it's fd value.
117         bool keep_processing = true;
118         if ( event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP) ) {
119           keep_processing = s->HandleErrorEvent(event.events) &&
120                             s->GetFd() != INVALID_FD_VALUE;
121         }
122         if ( keep_processing && (event.events & (EPOLLIN | EPOLLPRI)) ) {
123           keep_processing = s->HandleReadEvent(event.events) &&
124                             s->GetFd() != INVALID_FD_VALUE;
125         }
126         if ( keep_processing && (event.events & EPOLLOUT) ) {
127           s->HandleWriteEvent(event.events);
128         }
129       }  // else was probably a wake signal..
130 #ifdef _DEBUG
131       if ( FLAGS_debug_check_long_callbacks_ms > 0 ) {
132         const int64 processing_end = timer::TicksMsec();
133         if ( processing_end - processing_begin >
134              FLAGS_debug_check_long_callbacks_ms ) {
135           LOG_ERROR << " ====>> Unexpectedly long event processing: "
136                     << " event: " << event.events
137                     << " time spent:  "
138                     << processing_end - processing_begin;
139         }
140       }
141 #endif
142     }  // else, was probably a timeout
143     // Runs some closures..
144     bool are_some_closures = false;
145     if ( FLAGS_selector_high_alarm_precission ) {
146       now_ = timer::TicksMsec();
147     }
148     do {
149       mutex_.Lock();
150       are_some_closures = !to_run_.empty();
151       mutex_.Unlock();
152       RunAllClosures();
153     } while ( are_some_closures );
154     // Run the alarms
155     if ( FLAGS_selector_high_alarm_precission ) {
156       now_ = timer::TicksMsec();
157     }
158     while ( !alarms_.empty() ) {
159       if ( alarms_.begin()->first <= now_ ) {
160         Closure* closure = alarms_.begin()->second;
161         const bool erased = reverse_alarms_.erase(closure);
162         alarms_.erase(alarms_.begin());
163 #ifdef _DEBUG
164       const int64 processing_begin =
165           FLAGS_debug_check_long_callbacks_ms > 0 ? timer::TicksMsec() : 0;
166 #endif
167       if ( erased ) {
168         closure->Run();
169       } else {
170         LOG_ERROR << " Strange alarm mismatching: " << closure;
171       }
172 #ifdef _DEBUG
173       if ( FLAGS_debug_check_long_callbacks_ms > 0 ) {
174         const int64 processing_end = timer::TicksMsec();
175         if ( processing_end - processing_begin >
176              FLAGS_debug_check_long_callbacks_ms ) {
177           LOG_ERROR << " ====>> Unexpectedly long alarm processing: "
178                     << " callback: " << closure
179                     << " time spent:  "
180                     << processing_end - processing_begin;
181         }
182       }
183 #endif
184       } else {
185         break;
186       }
187     }
188   }
189
190   LOG_INFO << "Closing all the active connections in the selector...";
191   CleanAndCloseAll();
192
193   // Run all the remaining closures...
194   bool are_some_closures = false;
195   int32 are_some_closures_count = 0;
196   do {
197     LOG_INFO << "Running the selector closures on shutdown, iteration "
198              << are_some_closures_count++;
199     RunAllClosures();
200     mutex_.Lock();
201     are_some_closures = !to_run_.empty();
202     mutex_.Unlock();
203   } while ( are_some_closures );
204
205   // cleanup epoll
206   ::close(epfd_);
207   epfd_ = INVALID_FD_VALUE;
208   tid_ = 0;
209
210   LOG_INFO << "Selector loop terminated";
211 }
212
213 void Selector::MakeLoopExit() {
214   CHECK(IsInSelectThread());
215   should_end_ = true;
216 }
217
218 bool Selector::IsInSelectThread() const {
219   return tid_ == pthread_self();
220 }
221
222 bool Selector::RunInSelectLoop(Closure* callback) {
223   #ifdef _DEBUG
224   callback->set_selector_registered(true);
225   #endif
226   mutex_.Lock();
227   to_run_.push_back(callback);
228   mutex_.Unlock();
229   if ( !IsInSelectThread() ) {
230     SendWakeSignal();
231   }
232   return true;
233 }
234 int64 Selector::RegisterAlarm(Closure* callback, int64 timeout_in_ms) {
235   CHECK(callback != NULL);
236   CHECK(reverse_alarms_.find(callback) == reverse_alarms_.end())
237     << "Same alarm added multiple times -- BAD";
238   // const int64 now = timer::TicksMsec();
239   const int64 wake_up_time = now_ + timeout_in_ms;
240   CHECK(timeout_in_ms < 0 || timeout_in_ms <= wake_up_time)
241     << "Overflow, timeout_in_ms: " << timeout_in_ms << " is too big";
242   alarms_.insert(make_pair(wake_up_time, callback));
243   reverse_alarms_[callback] = wake_up_time;
244
245   // We do not need to wake .. we are in the select loop :)
246   return wake_up_time;
247 }
248 bool Selector::UnregisterAlarm(Closure* callback) {
249   ReverseAlarmsMap::iterator it = reverse_alarms_.find(callback);
250   if ( it == reverse_alarms_.end() ) {
251     return false;
252   }
253   alarms_.erase(make_pair(it->second, callback));
254   reverse_alarms_.erase(it);
255   return true;
256 }
257
258 bool Selector::ReregisterAlarm(Closure* callback, int64 timeout_in_ms) {
259   CHECK(callback != NULL);
260   ReverseAlarmsMap::iterator it = reverse_alarms_.find(callback);
261   if ( it != reverse_alarms_.end() ) {
262     alarms_.erase(make_pair(it->second, callback));
263     const int64 wake_up_time = now_ + timeout_in_ms;
264     alarms_.insert(make_pair(wake_up_time, callback));
265     it->second = wake_up_time;
266     return true;
267   } else {
268     RegisterAlarm(callback, timeout_in_ms);
269     return false;
270   }
271 }
272
273 //////////////////////////////////////////////////////////////////////
274
275 void Selector::CleanAndCloseAll() {
276   // It is some discussion, whether to do some CHECK if connections are
277   // left at this point or to close them or to just skip it. The ideea is
278   // that we preffer forcing a close on them for the server case and also
279   // client connections when we end a program.
280   if ( !registered_.empty() ) {
281     DLOG_DEBUG << "Select loop ended with " << registered_.size()
282               << " registered connections.";
283     // We just close the fd and care about nothing ..
284     while ( !registered_.empty() ) {
285       (*registered_.begin())->Close();
286     }
287   }
288 }
289
290 void Selector::RunAllClosures() {
291   char buffer[1024];
292   int cb;
293   while ( (cb = ::read(signal_pipe_[0], buffer, sizeof(buffer))) > 0 ) {
294     DLOG(10) << " Cleaned some " << cb << " bytes from signal pipe.";
295   }
296   mutex_.Lock();
297   list<Closure*> to_run(to_run_.size());
298   copy(to_run_.begin(), to_run_.end(), to_run.begin());
299   to_run_.clear();
300   mutex_.Unlock();
301   while ( !to_run.empty() ) {
302 #ifdef _DEBUG
303     const int64 processing_begin =
304         FLAGS_debug_check_long_callbacks_ms > 0 ? timer::TicksMsec() : 0;
305     to_run.front()->set_selector_registered(false);
306 #endif
307     to_run.front()->Run();
308 #ifdef _DEBUG
309     if ( FLAGS_debug_check_long_callbacks_ms > 0 ) {
310       const int64 processing_end = timer::TicksMsec();
311       if ( processing_end - processing_begin >
312            FLAGS_debug_check_long_callbacks_ms ) {
313         LOG_ERROR << " ====>> Unexpectedly long callback processing: "
314                   << " callback: " << to_run.front()
315                   << " time spent:  "
316                   << processing_end - processing_begin;
317       }
318     }
319 #endif
320     to_run.pop_front();
321   }
322 }
323
324 void Selector::SendWakeSignal() {
325   int8 byte = 0;
326   if ( sizeof(byte) != ::write(signal_pipe_[1], &byte, sizeof(byte)) ) {
327     LOG_ERROR << "Error writing a wake-up byte in selector signal pipe";
328   }
329 }
330
331 //////////////////////////////////////////////////////////////////////
332
333 bool Selector::Register(Selectable* s) {
334   DCHECK(IsInSelectThread() || tid_ == 0);
335   CHECK(s->selector() == this);
336   const int fd = s->GetFd();
337
338   SelectableSet::const_iterator it = registered_.find(s);
339   if ( it != registered_.end() ) {
340     // selectable obj already registered
341     CHECK_EQ((*it)->GetFd(), fd);
342     return true;
343   }
344   // Insert in the local set of registered objs
345   registered_.insert(s);
346
347   // Insert in epoll
348   if ( fd != INVALID_FD_VALUE ) {
349     if ( !EpollCtlAdd(fd, s, s->GetDesiredEvents()) ) {
350       return false;
351     }
352   }
353   return true;
354 }
355
356 void Selector::Unregister(Selectable* s) {
357   DCHECK(IsInSelectThread() || tid_ == 0);
358   CHECK(s->selector() == this);
359   const int fd = s->GetFd();
360
361   const SelectableSet::iterator it = registered_.find(s);
362   DCHECK(it != registered_.end()) << " Cannot unregister : " << fd;
363
364   // remove from epoll and internal lists
365   if ( fd != INVALID_FD_VALUE ) {
366     EpollCtlDel(fd);
367   }
368   registered_.erase(it);
369 }
370
371 //////////////////////////////////////////////////////////////////////
372
373 bool Selector::EpollCtlAdd(int fd, void* user_data, int events) {
374   epoll_event event = { events, };
375   event.data.ptr = user_data;
376
377   DLOG_DEBUG << "  Adding to epoll: " << fd;
378   if ( epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &event) < 0 ) {
379     LOG_ERROR  << "System error on epoll_ctl: "
380                << GetLastSystemErrorDescription()
381                << " for events: " << hex << events << dec;
382     return false;
383   }
384   return true;
385 }
386
387 bool Selector::EpollCtlDel(int fd) {
388   epoll_event event = { 0, };
389   DLOG_DEBUG << "  Removing from epoll: " << fd;
390   if ( epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, &event) < 0 ) {
391     LOG_ERROR  << "System error on epoll_ctl: "
392                << GetLastSystemErrorDescription()
393                << " for fd: " << fd;
394     return false;
395   }
396   return true;
397 }
398
399 void Selector::UpdateDesire(Selectable* s, bool enable, int32 desire) {
400   // DCHECK(registered_.find(s) != registered_.end()) << fd;
401   if ( ((s->desire_ & desire) && enable) ||
402        ((~s->desire_ & desire) && !enable) ) {
403     return;  // already set ..
404   }
405   if ( enable ) {
406     s->desire_ |= desire;
407   } else {
408     s->desire_ &= ~desire;
409   }
410   const int fd = s->GetFd();
411   DCHECK_NE(fd, INVALID_FD_VALUE);
412   epoll_event event = { s->GetDesiredEvents(), };
413   event.data.ptr = s;
414   if ( epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &event) ) {
415     LOG_ERROR  << "Error in epoll_ctl: "  << GetLastSystemErrorDescription();
416   }
417 }
418
419 //////////////////////////////////////////////////////////////////////
420 }
Note: See TracBrowser for help on using the browser.