00001
00002
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif
00016
00017 #ifndef ACE_WIN32
00018 # include <algorithm>
00019 #endif
00020
00021 ACE_RCSID (ace,
00022 Select_Reactor_Base,
00023 "$Id: Select_Reactor_Base.cpp 87255 2009-10-29 08:09:29Z olli $")
00024
00025
00026 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 template<typename iterator>
00029 inline ACE_Event_Handler *
00030 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
00031 {
00032 #ifdef ACE_WIN32
00033 return (*i).item ();
00034 #else
00035 return (*i);
00036 #endif
00037 }
00038
00039
00040
00041 bool
00042 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00043 {
00044 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00045 #if defined (ACE_WIN32)
00046
00047
00048 if (handle == ACE_INVALID_HANDLE)
00049 #else
00050 if (handle < 0
00051 || static_cast<size_type> (handle) >= this->event_handlers_.size ())
00052 #endif
00053 {
00054 errno = EINVAL;
00055 return true;
00056 }
00057
00058 return false;
00059 }
00060
00061
00062
00063 bool
00064 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00065 {
00066 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00067 #if defined (ACE_WIN32)
00068
00069
00070 if (handle != ACE_INVALID_HANDLE)
00071 #else
00072 if (handle >= 0 && handle < this->max_handlep1_)
00073 #endif
00074 {
00075 return true;
00076 }
00077
00078
00079
00080
00081
00082 return false;
00083 }
00084
00085 int
00086 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00087 {
00088 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00089
00090 #if defined (ACE_WIN32)
00091 if (this->event_handlers_.open (size) == -1)
00092 return -1;
00093 #else
00094 if (this->event_handlers_.size (size) == -1)
00095 return -1;
00096
00097
00098 std::fill (this->event_handlers_.begin (),
00099 this->event_handlers_.end (),
00100 static_cast<ACE_Event_Handler *> (0));
00101
00102 this->max_handlep1_ = 0;
00103 #endif
00104
00105
00106
00107 return ACE::set_handle_limit (static_cast<int> (size), 1);
00108 }
00109
00110
00111
00112 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00113 : select_reactor_ (select_reactor),
00114 #ifndef ACE_WIN32
00115 max_handlep1_ (0),
00116 #endif
00117 event_handlers_ ()
00118 {
00119 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00120 }
00121
00122 int
00123 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00124 {
00125
00126 #ifdef ACE_WIN32
00127 map_type::iterator const end = this->event_handlers_.end ();
00128 for (map_type::iterator pos = this->event_handlers_.begin ();
00129 pos != end;
00130 )
00131 {
00132
00133
00134
00135 map_type::iterator const the_pos (pos++);
00136
00137 ACE_HANDLE const handle = (*the_pos).key ();
00138 (void) this->unbind (handle,
00139 the_pos,
00140 ACE_Event_Handler::ALL_EVENTS_MASK);
00141 }
00142 #else
00143
00144
00145
00146 map_type::iterator pos =
00147 this->event_handlers_.begin ();
00148
00149 for (ACE_HANDLE handle = 0;
00150 handle < this->max_handlep1_;
00151 ++handle)
00152 {
00153 (void) this->unbind (handle,
00154 pos,
00155 ACE_Event_Handler::ALL_EVENTS_MASK);
00156 ++pos;
00157 }
00158 #endif
00159
00160 return 0;
00161 }
00162
00163 int
00164 ACE_Select_Reactor_Handler_Repository::close (void)
00165 {
00166 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00167
00168 return this->unbind_all ();
00169 }
00170
00171 ACE_Select_Reactor_Handler_Repository::map_type::iterator
00172 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
00173 {
00174 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
00175
00176 map_type::iterator pos (this->event_handlers_.end ());
00177
00178
00179 #if defined (ACE_WIN32)
00180 this->event_handlers_.find (handle, pos);
00181 #else
00182 map_type::iterator const tmp = &this->event_handlers_[handle];
00183
00184 if (*tmp != 0)
00185 pos = tmp;
00186 #endif
00187
00188 return pos;
00189 }
00190
00191
00192 int
00193 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00194 ACE_Event_Handler *event_handler,
00195 ACE_Reactor_Mask mask)
00196 {
00197 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00198
00199 if (event_handler == 0)
00200 return -1;
00201
00202 if (handle == ACE_INVALID_HANDLE)
00203 handle = event_handler->get_handle ();
00204
00205 if (this->invalid_handle (handle))
00206 return -1;
00207
00208
00209 bool existing_handle = false;
00210
00211 #if defined (ACE_WIN32)
00212
00213 map_type::ENTRY * entry = 0;
00214
00215 int const result =
00216 this->event_handlers_.bind (handle, event_handler, entry);
00217
00218 if (result == -1)
00219 {
00220 return -1;
00221 }
00222 else if (result == 1)
00223 {
00224
00225 if (event_handler != entry->item ())
00226 {
00227 return -1;
00228 }
00229 else
00230 {
00231
00232
00233 existing_handle = true;
00234 }
00235 }
00236
00237 #else
00238
00239
00240 ACE_Event_Handler * const current_handler =
00241 this->event_handlers_[handle];
00242
00243 if (current_handler)
00244 {
00245
00246 if (current_handler != event_handler)
00247 return -1;
00248
00249
00250
00251 existing_handle = true;
00252 }
00253
00254 this->event_handlers_[handle] = event_handler;
00255
00256 if (this->max_handlep1_ < handle + 1)
00257 this->max_handlep1_ = handle + 1;
00258
00259 #endif
00260
00261 if (this->select_reactor_.is_suspended_i (handle))
00262 {
00263 this->select_reactor_.bit_ops (handle,
00264 mask,
00265 this->select_reactor_.suspend_set_,
00266 ACE_Reactor::ADD_MASK);
00267 }
00268 else
00269 {
00270 this->select_reactor_.bit_ops (handle,
00271 mask,
00272 this->select_reactor_.wait_set_,
00273 ACE_Reactor::ADD_MASK);
00274
00275
00276
00277
00278
00279 }
00280
00281
00282 if (!existing_handle)
00283 event_handler->add_reference ();
00284
00285 return 0;
00286 }
00287
00288
00289
00290 int
00291 ACE_Select_Reactor_Handler_Repository::unbind (
00292 ACE_HANDLE handle,
00293 map_type::iterator pos,
00294 ACE_Reactor_Mask mask)
00295 {
00296 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00297
00298
00299
00300
00301 ACE_Event_Handler * const event_handler =
00302 (pos == this->event_handlers_.end ()
00303 ? 0
00304 : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
00305
00306
00307 this->select_reactor_.bit_ops (handle,
00308 mask,
00309 this->select_reactor_.wait_set_,
00310 ACE_Reactor::CLR_MASK);
00311
00312
00313 this->select_reactor_.bit_ops (handle,
00314 mask,
00315 this->select_reactor_.suspend_set_,
00316 ACE_Reactor::CLR_MASK);
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 bool const has_any_wait_mask =
00327 (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328 || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329 || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330 bool const has_any_suspend_mask =
00331 (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332 || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333 || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334
00335 bool complete_removal = false;
00336
00337 if (!has_any_wait_mask && !has_any_suspend_mask)
00338 {
00339 #if defined (ACE_WIN32)
00340 if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
00341 return -1;
00342 #else
00343 this->event_handlers_[handle] = 0;
00344
00345 if (this->max_handlep1_ == handle + 1)
00346 {
00347
00348
00349
00350 ACE_HANDLE const wait_rd_max =
00351 this->select_reactor_.wait_set_.rd_mask_.max_set ();
00352 ACE_HANDLE const wait_wr_max =
00353 this->select_reactor_.wait_set_.wr_mask_.max_set ();
00354 ACE_HANDLE const wait_ex_max =
00355 this->select_reactor_.wait_set_.ex_mask_.max_set ();
00356
00357 ACE_HANDLE const suspend_rd_max =
00358 this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00359 ACE_HANDLE const suspend_wr_max =
00360 this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00361 ACE_HANDLE const suspend_ex_max =
00362 this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00363
00364
00365 this->max_handlep1_ = wait_rd_max;
00366 if (this->max_handlep1_ < wait_wr_max)
00367 this->max_handlep1_ = wait_wr_max;
00368 if (this->max_handlep1_ < wait_ex_max)
00369 this->max_handlep1_ = wait_ex_max;
00370
00371 if (this->max_handlep1_ < suspend_rd_max)
00372 this->max_handlep1_ = suspend_rd_max;
00373 if (this->max_handlep1_ < suspend_wr_max)
00374 this->max_handlep1_ = suspend_wr_max;
00375 if (this->max_handlep1_ < suspend_ex_max)
00376 this->max_handlep1_ = suspend_ex_max;
00377
00378 ++this->max_handlep1_;
00379 }
00380
00381 #endif
00382
00383
00384 complete_removal = true;
00385 }
00386
00387 if (event_handler == 0)
00388 return -1;
00389
00390 bool const requires_reference_counting =
00391 event_handler->reference_counting_policy ().value () ==
00392 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00393
00394
00395
00396 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00397 (void) event_handler->handle_close (handle, mask);
00398
00399
00400
00401 if (complete_removal && requires_reference_counting)
00402 {
00403 (void) event_handler->remove_reference ();
00404 }
00405
00406 return 0;
00407 }
00408
00409 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00410 (ACE_Select_Reactor_Handler_Repository const * s)
00411 : rep_ (s),
00412 current_ (s->event_handlers_.begin ())
00413 {
00414 #ifndef ACE_WIN32
00415
00416
00417 const_base_iterator const end =
00418 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00419
00420
00421
00422
00423 while (this->current_ != end && (*(this->current_) == 0))
00424 ++this->current_;
00425 #endif
00426 }
00427
00428
00429
00430
00431 bool
00432 ACE_Select_Reactor_Handler_Repository_Iterator::next (
00433 ACE_Event_Handler *&next_item)
00434 {
00435 bool result = true;
00436
00437 if (this->done ())
00438 result = false;
00439 else
00440 next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
00441
00442 return result;
00443 }
00444
00445
00446
00447 bool
00448 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00449 {
00450 #ifdef ACE_WIN32
00451
00452
00453 const_base_iterator const end = this->rep_->event_handlers_.end ();
00454 #else
00455
00456
00457 const_base_iterator const end =
00458 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00459 #endif
00460
00461 if (this->current_ != end)
00462 ++this->current_;
00463
00464 #ifndef ACE_WIN32
00465
00466
00467
00468 while (this->current_ != end && (*(this->current_) == 0))
00469 ++this->current_;
00470 #endif
00471
00472 return this->current_ != end;
00473 }
00474
00475
00476
00477 void
00478 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00479 {
00480 #if defined (ACE_HAS_DUMP)
00481 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00482
00483 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00484 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
00485 # ifdef ACE_WIN32
00486 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
00487 this->current_.dump ();
00488 # else
00489 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
00490 # endif
00491 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00492 #endif
00493 }
00494
00495 void
00496 ACE_Select_Reactor_Handler_Repository::dump (void) const
00497 {
00498 #if defined (ACE_HAS_DUMP)
00499 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00500
00501 # ifdef ACE_WIN32
00502 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
00503 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
00504 # else
00505 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
00506 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
00507 # endif
00508
00509 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00510 ACE_DEBUG ((LM_DEBUG,
00511 ACE_TEXT ("max_handlep1_ = ")
00512 ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
00513 ACE_TEXT ("\n"),
00514 this->max_handlep1 ()));
00515 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("[")));
00516
00517 ACE_Event_Handler *event_handler = 0;
00518
00519 for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00520 iter.next (event_handler) != 0;
00521 iter.advance ())
00522 ACE_DEBUG ((LM_DEBUG,
00523 ACE_TEXT (" (event_handler = %@,")
00524 ACE_TEXT (" event_handler->handle_ = ")
00525 ACE_HANDLE_FORMAT_SPECIFIER
00526 ACE_TEXT ("\n"),
00527 event_handler,
00528 event_handler->get_handle ()));
00529
00530 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
00531 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00532 #endif
00533 }
00534
00535 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00536
00537 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00538 : max_notify_iterations_ (-1)
00539 {
00540 }
00541
00542 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00543 {
00544 }
00545
00546 void
00547 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00548 {
00549
00550 if (iterations == 0)
00551 iterations = 1;
00552
00553 this->max_notify_iterations_ = iterations;
00554 }
00555
00556 int
00557 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00558 {
00559 return this->max_notify_iterations_;
00560 }
00561
00562
00563
00564
00565
00566
00567
00568 int
00569 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00570 ACE_Reactor_Mask mask )
00571 {
00572 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00573
00574 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00575
00576 return notification_queue_.purge_pending_notifications(eh, mask);
00577
00578 #else
00579 ACE_UNUSED_ARG (eh);
00580 ACE_UNUSED_ARG (mask);
00581 ACE_NOTSUP_RETURN (-1);
00582 #endif
00583 }
00584
00585 void
00586 ACE_Select_Reactor_Notify::dump (void) const
00587 {
00588 #if defined (ACE_HAS_DUMP)
00589 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00590
00591 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00592 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00593 this->notification_pipe_.dump ();
00594 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00595 #endif
00596 }
00597
00598 int
00599 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00600 ACE_Timer_Queue *,
00601 int disable_notify_pipe)
00602 {
00603 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00604
00605 if (disable_notify_pipe == 0)
00606 {
00607 this->select_reactor_ =
00608 dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00609
00610 if (select_reactor_ == 0)
00611 {
00612 errno = EINVAL;
00613 return -1;
00614 }
00615
00616 if (this->notification_pipe_.open () == -1)
00617 return -1;
00618 #if defined (F_SETFD)
00619 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00620 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00621 #endif
00622
00623 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00624 if (notification_queue_.open() == -1)
00625 {
00626 return -1;
00627 }
00628 #endif
00629
00630
00631
00632 if (ACE::set_flags (this->notification_pipe_.read_handle (),
00633 ACE_NONBLOCK) == -1)
00634 return -1;
00635 else
00636 return this->select_reactor_->register_handler
00637 (this->notification_pipe_.read_handle (),
00638 this,
00639 ACE_Event_Handler::READ_MASK);
00640 }
00641 else
00642 {
00643 this->select_reactor_ = 0;
00644 return 0;
00645 }
00646 }
00647
00648 int
00649 ACE_Select_Reactor_Notify::close (void)
00650 {
00651 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00652
00653 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00654 notification_queue_.reset();
00655 #else
00656 if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00657 {
00658
00659
00660
00661
00662
00663 ACE_Notification_Buffer b;
00664 for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00665 r > 0;
00666 r = read_notify_pipe(notification_pipe_.read_handle(), b))
00667 {
00668 if (b.eh_ != 0)
00669 {
00670 b.eh_->remove_reference();
00671 }
00672 }
00673 }
00674 #endif
00675
00676 return this->notification_pipe_.close ();
00677 }
00678
00679 int
00680 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00681 ACE_Reactor_Mask mask,
00682 ACE_Time_Value *timeout)
00683 {
00684 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00685
00686
00687
00688 if (this->select_reactor_ == 0)
00689 return 0;
00690
00691 ACE_Event_Handler_var safe_handler (event_handler);
00692
00693 if (event_handler)
00694 event_handler->add_reference ();
00695
00696 ACE_Notification_Buffer buffer (event_handler, mask);
00697
00698 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00699 int const notification_required =
00700 notification_queue_.push_new_notification(buffer);
00701
00702 if (notification_required == -1)
00703 {
00704 return -1;
00705 }
00706
00707 if (notification_required == 0)
00708 {
00709
00710 safe_handler.release ();
00711
00712 return 0;
00713 }
00714 #endif
00715
00716 ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00717 (char *) &buffer,
00718 sizeof buffer,
00719 timeout);
00720 if (n == -1)
00721 return -1;
00722
00723
00724 safe_handler.release ();
00725
00726 return 0;
00727 }
00728
00729
00730
00731
00732 int
00733 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00734 ACE_Handle_Set &rd_mask)
00735 {
00736 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00737
00738 ACE_HANDLE const read_handle =
00739 this->notification_pipe_.read_handle ();
00740
00741 if (read_handle != ACE_INVALID_HANDLE
00742 && rd_mask.is_set (read_handle))
00743 {
00744 --number_of_active_handles;
00745 rd_mask.clr_bit (read_handle);
00746 return this->handle_input (read_handle);
00747 }
00748 else
00749 return 0;
00750 }
00751
00752
00753 ACE_HANDLE
00754 ACE_Select_Reactor_Notify::notify_handle (void)
00755 {
00756 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00757
00758 return this->notification_pipe_.read_handle ();
00759 }
00760
00761
00762 int
00763 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00764 {
00765 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00766 ACE_UNUSED_ARG(buffer);
00767 return 1;
00768 #else
00769
00770
00771
00772
00773
00774 if (buffer.eh_ != 0)
00775 {
00776 return 1;
00777 }
00778 else
00779 {
00780
00781 return 0;
00782 }
00783 #endif
00784 }
00785
00786 int
00787 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00788 {
00789 int result = 0;
00790
00791 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00792
00793
00794
00795
00796 bool more_messages_queued = false;
00797 ACE_Notification_Buffer next;
00798
00799 result = notification_queue_.pop_next_notification(buffer,
00800 more_messages_queued,
00801 next);
00802
00803 if (result == 0 || result == -1)
00804 {
00805 return result;
00806 }
00807
00808 if(more_messages_queued)
00809 {
00810 (void) ACE::send(this->notification_pipe_.write_handle(),
00811 (char *)&next, sizeof(ACE_Notification_Buffer));
00812 }
00813 #endif
00814
00815
00816
00817
00818
00819
00820 if (buffer.eh_ != 0)
00821 {
00822 ACE_Event_Handler *event_handler = buffer.eh_;
00823
00824 bool const requires_reference_counting =
00825 event_handler->reference_counting_policy ().value () ==
00826 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00827
00828 switch (buffer.mask_)
00829 {
00830 case ACE_Event_Handler::READ_MASK:
00831 case ACE_Event_Handler::ACCEPT_MASK:
00832 result = event_handler->handle_input (ACE_INVALID_HANDLE);
00833 break;
00834 case ACE_Event_Handler::WRITE_MASK:
00835 result = event_handler->handle_output (ACE_INVALID_HANDLE);
00836 break;
00837 case ACE_Event_Handler::EXCEPT_MASK:
00838 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00839 break;
00840 case ACE_Event_Handler::QOS_MASK:
00841 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00842 break;
00843 case ACE_Event_Handler::GROUP_QOS_MASK:
00844 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00845 break;
00846 default:
00847
00848 ACE_ERROR ((LM_ERROR,
00849 ACE_TEXT ("invalid mask = %d\n"),
00850 buffer.mask_));
00851 }
00852
00853 if (result == -1)
00854 event_handler->handle_close (ACE_INVALID_HANDLE,
00855 ACE_Event_Handler::EXCEPT_MASK);
00856
00857 if (requires_reference_counting)
00858 {
00859 event_handler->remove_reference ();
00860 }
00861 }
00862
00863 return 1;
00864 }
00865
00866 int
00867 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00868 ACE_Notification_Buffer &buffer)
00869 {
00870 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891 ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00892
00893 if (n > 0)
00894 {
00895
00896 if (n != sizeof buffer)
00897 {
00898 ssize_t const remainder = sizeof buffer - n;
00899
00900
00901
00902
00903
00904 if (ACE::recv_n (handle,
00905 ((char *) &buffer) + n,
00906 remainder) != remainder)
00907 return -1;
00908 }
00909
00910
00911 return 1;
00912 }
00913
00914
00915 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00916 return -1;
00917
00918 return 0;
00919 }
00920
00921
00922 int
00923 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
00924 {
00925 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00926
00927
00928
00929 int number_dispatched = 0;
00930 int result = 0;
00931 ACE_Notification_Buffer buffer;
00932
00933
00934
00935
00936 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00937 {
00938
00939
00940 if (this->dispatch_notify (buffer) > 0)
00941 ++number_dispatched;
00942
00943
00944
00945
00946 if (number_dispatched == this->max_notify_iterations_)
00947 break;
00948 }
00949
00950
00951
00952 if (result < 0)
00953 number_dispatched = -1;
00954
00955
00956
00957
00958
00959 this->select_reactor_->renew ();
00960 return number_dispatched;
00961 }
00962
00963
00964
00965 int
00966 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
00967 ACE_Reactor_Mask mask)
00968 {
00969 if (this->notify_handler_ == 0)
00970 return 0;
00971 else
00972 return this->notify_handler_->purge_pending_notifications (eh, mask);
00973 }
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986 int
00987 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
00988 ACE_Reactor_Mask mask,
00989 ACE_Select_Reactor_Handle_Set &handle_set,
00990 int ops)
00991 {
00992 ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
00993 if (this->handler_rep_.handle_in_range (handle) == 0)
00994 return -1;
00995
00996 #if !defined (ACE_WIN32)
00997 ACE_Sig_Guard sb (0,
00998 this->mask_signals_);
00999 #endif
01000
01001 ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
01002 u_long omask = ACE_Event_Handler::NULL_MASK;
01003
01004
01005
01006 if (handle_set.rd_mask_.is_set (handle))
01007 ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01008 if (handle_set.wr_mask_.is_set (handle))
01009 ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01010 if (handle_set.ex_mask_.is_set (handle))
01011 ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01012
01013 switch (ops)
01014 {
01015 case ACE_Reactor::GET_MASK:
01016
01017
01018 break;
01019 case ACE_Reactor::CLR_MASK:
01020 ptmf = &ACE_Handle_Set::clr_bit;
01021
01022
01023
01024
01025 this->clear_dispatch_mask (handle, mask);
01026
01027 case ACE_Reactor::SET_MASK:
01028
01029 case ACE_Reactor::ADD_MASK:
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01041 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01042 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01043 {
01044 (handle_set.rd_mask_.*ptmf) (handle);
01045 }
01046 else if (ops == ACE_Reactor::SET_MASK)
01047 handle_set.rd_mask_.clr_bit (handle);
01048
01049
01050 if (ACE_BIT_ENABLED (mask,
01051 ACE_Event_Handler::WRITE_MASK)
01052 || ACE_BIT_ENABLED (mask,
01053 ACE_Event_Handler::CONNECT_MASK))
01054 {
01055 (handle_set.wr_mask_.*ptmf) (handle);
01056 }
01057 else if (ops == ACE_Reactor::SET_MASK)
01058 handle_set.wr_mask_.clr_bit (handle);
01059
01060
01061
01062 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01063 #if defined (ACE_WIN32)
01064 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01065 #endif
01066 )
01067 {
01068 (handle_set.ex_mask_.*ptmf) (handle);
01069 }
01070 else if (ops == ACE_Reactor::SET_MASK)
01071 handle_set.ex_mask_.clr_bit (handle);
01072 break;
01073 default:
01074 return -1;
01075 }
01076 return omask;
01077 }
01078
01079 void
01080 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01081 ACE_Reactor_Mask mask)
01082 {
01083 ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01105 ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01106 {
01107 this->dispatch_set_.rd_mask_.clr_bit (handle);
01108 }
01109 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01110 {
01111 this->dispatch_set_.wr_mask_.clr_bit (handle);
01112 }
01113 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01114 {
01115 this->dispatch_set_.ex_mask_.clr_bit (handle);
01116 }
01117
01118
01119
01120 this->state_changed_ = true;
01121 }
01122
01123
01124 int
01125 ACE_Select_Reactor_Impl::resumable_handler (void)
01126 {
01127
01128
01129
01130 return 0;
01131 }
01132
01133 ACE_END_VERSIONED_NAMESPACE_DECL