ACE_Select_Reactor_Notify Class Reference

Unblock the ACE_Select_Reactor from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:
Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ACE_Select_Reactor_Notify (void)
 Constructor.
virtual ~ACE_Select_Reactor_Notify (void)
 Destructor.
virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT)
 Initialize.
virtual int close (void)
 Destroy.
virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.
virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the ACE_Select_Reactor from its event loop.

This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.

Definition at line 133 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify ( void   ) 

Constructor.

Definition at line 537 of file Select_Reactor_Base.cpp.

00538   : max_notify_iterations_ (-1)
00539 {
00540 }

ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify ( void   )  [virtual]

Destructor.

Definition at line 542 of file Select_Reactor_Base.cpp.

00543 {
00544 }


Member Function Documentation

int ACE_Select_Reactor_Notify::close ( void   )  [virtual]

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 649 of file Select_Reactor_Base.cpp.

00650 {
00651   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00652 
00653 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00654   notification_queue_.reset();
00655 #else
00656   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00657     {
00658       // Please see Bug 2820, if we just close the pipe then we break
00659       // the reference counting rules.  Basically, all the event
00660       // handlers "stored" in the pipe had their reference counts
00661       // increased.  We need to decrease them before closing the
00662       // pipe....
00663       ACE_Notification_Buffer b;
00664       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00665            r > 0;
00666            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00667         {
00668           if (b.eh_ != 0)
00669             {
00670               b.eh_->remove_reference();
00671             }
00672         }
00673     }
00674 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00675 
00676   return this->notification_pipe_.close ();
00677 }

int ACE_Select_Reactor_Notify::dispatch_notifications ( int &  number_of_active_handles,
ACE_Handle_Set rd_mask 
) [virtual]

Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 733 of file Select_Reactor_Base.cpp.

00735 {
00736   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00737 
00738   ACE_HANDLE const read_handle =
00739     this->notification_pipe_.read_handle ();
00740 
00741   if (read_handle != ACE_INVALID_HANDLE
00742       && rd_mask.is_set (read_handle))
00743     {
00744       --number_of_active_handles;
00745       rd_mask.clr_bit (read_handle);
00746       return this->handle_input (read_handle);
00747     }
00748   else
00749     return 0;
00750 }

int ACE_Select_Reactor_Notify::dispatch_notify ( ACE_Notification_Buffer buffer  )  [virtual]

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl

Implements ACE_Reactor_Notify.

Definition at line 787 of file Select_Reactor_Base.cpp.

00788 {
00789   int result = 0;
00790 
00791 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00792   // Dispatch one message from the notify queue, and put another in
00793   // the pipe if one is available.  Remember, the idea is to keep
00794   // exactly one message in the pipe at a time.
00795 
00796   bool more_messages_queued = false;
00797   ACE_Notification_Buffer next;
00798 
00799   result = notification_queue_.pop_next_notification(buffer,
00800                                                      more_messages_queued,
00801                                                      next);
00802 
00803   if (result == 0 || result == -1)
00804     {
00805       return result;
00806     }
00807 
00808   if(more_messages_queued)
00809     {
00810       (void) ACE::send(this->notification_pipe_.write_handle(),
00811             (char *)&next, sizeof(ACE_Notification_Buffer));
00812     }
00813 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00814 
00815   // If eh == 0 then another thread is unblocking the
00816   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00817   // internal structures.  Otherwise, we need to dispatch the
00818   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00819   // we've been passed.
00820   if (buffer.eh_ != 0)
00821     {
00822       ACE_Event_Handler *event_handler = buffer.eh_;
00823 
00824       bool const requires_reference_counting =
00825         event_handler->reference_counting_policy ().value () ==
00826         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00827 
00828       switch (buffer.mask_)
00829         {
00830         case ACE_Event_Handler::READ_MASK:
00831         case ACE_Event_Handler::ACCEPT_MASK:
00832           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00833           break;
00834         case ACE_Event_Handler::WRITE_MASK:
00835           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00836           break;
00837         case ACE_Event_Handler::EXCEPT_MASK:
00838           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00839           break;
00840         case ACE_Event_Handler::QOS_MASK:
00841           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00842           break;
00843         case ACE_Event_Handler::GROUP_QOS_MASK:
00844           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00845           break;
00846         default:
00847           // Should we bail out if we get an invalid mask?
00848           ACE_ERROR ((LM_ERROR,
00849                       ACE_TEXT ("invalid mask = %d\n"),
00850                       buffer.mask_));
00851         }
00852 
00853       if (result == -1)
00854         event_handler->handle_close (ACE_INVALID_HANDLE,
00855                                      ACE_Event_Handler::EXCEPT_MASK);
00856 
00857       if (requires_reference_counting)
00858         {
00859           event_handler->remove_reference ();
00860         }
00861     }
00862 
00863   return 1;
00864 }

void ACE_Select_Reactor_Notify::dump ( void   )  const [virtual]

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 586 of file Select_Reactor_Base.cpp.

00587 {
00588 #if defined (ACE_HAS_DUMP)
00589   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00590 
00591   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00592   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00593   this->notification_pipe_.dump ();
00594   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00595 #endif /* ACE_HAS_DUMP */
00596 }

int ACE_Select_Reactor_Notify::handle_input ( ACE_HANDLE  handle  )  [virtual]

Called back by the ACE_Select_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 923 of file Select_Reactor_Base.cpp.

00924 {
00925   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00926   // Precondition: this->select_reactor_.token_.current_owner () ==
00927   // ACE_Thread::self ();
00928 
00929   int number_dispatched = 0;
00930   int result = 0;
00931   ACE_Notification_Buffer buffer;
00932 
00933   // If there is only one buffer in the pipe, this will loop and call
00934   // read_notify_pipe() twice.  The first time will read the buffer, and
00935   // the second will read the fact that the pipe is empty.
00936   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00937     {
00938       // Dispatch the buffer
00939       // NOTE: We count only if we made any dispatches ie. upcalls.
00940       if (this->dispatch_notify (buffer) > 0)
00941         ++number_dispatched;
00942 
00943       // Bail out if we've reached the <notify_threshold_>.  Note that
00944       // by default <notify_threshold_> is -1, so we'll loop until all
00945       // the notifications in the pipe have been dispatched.
00946       if (number_dispatched == this->max_notify_iterations_)
00947         break;
00948     }
00949 
00950   // Reassign number_dispatched to -1 if things have gone seriously
00951   // wrong.
00952   if (result < 0)
00953     number_dispatched = -1;
00954 
00955   // Enqueue ourselves into the list of waiting threads.  When we
00956   // reacquire the token we'll be off and running again with ownership
00957   // of the token.  The postcondition of this call is that
00958   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00959   this->select_reactor_->renew ();
00960   return number_dispatched;
00961 }

int ACE_Select_Reactor_Notify::is_dispatchable ( ACE_Notification_Buffer buffer  )  [virtual]

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 763 of file Select_Reactor_Base.cpp.

00764 {
00765 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00766   ACE_UNUSED_ARG(buffer);
00767   return 1;
00768 #else
00769   // If eh == 0 then another thread is unblocking the
00770   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00771   // internal structures.  Otherwise, we need to dispatch the
00772   // appropriate handle_* method on the <ACE_Event_Handler>
00773   // pointer we've been passed.
00774   if (buffer.eh_ != 0)
00775     {
00776       return 1;
00777     }
00778   else
00779     {
00780       // has no dispatchable buffer
00781       return 0;
00782     }
00783 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00784 }

int ACE_Select_Reactor_Notify::max_notify_iterations ( void   )  [virtual]

Get the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop.

Implements ACE_Reactor_Notify.

Definition at line 557 of file Select_Reactor_Base.cpp.

00558 {
00559   return this->max_notify_iterations_;
00560 }

void ACE_Select_Reactor_Notify::max_notify_iterations ( int  iterations  )  [virtual]

Set the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 547 of file Select_Reactor_Base.cpp.

00548 {
00549   // Must always be > 0 or < 0 to optimize the loop exit condition.
00550   if (iterations == 0)
00551     iterations = 1;
00552 
00553   this->max_notify_iterations_ = iterations;
00554 }

int ACE_Select_Reactor_Notify::notify ( ACE_Event_Handler event_handler = 0,
ACE_Reactor_Mask  mask = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0 
) [virtual]

Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 680 of file Select_Reactor_Base.cpp.

00683 {
00684   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00685 
00686   // Just consider this method a "no-op" if there's no
00687   // <ACE_Select_Reactor> configured.
00688   if (this->select_reactor_ == 0)
00689     return 0;
00690 
00691   ACE_Event_Handler_var safe_handler (event_handler);
00692 
00693   if (event_handler)
00694     event_handler->add_reference ();
00695 
00696   ACE_Notification_Buffer buffer (event_handler, mask);
00697 
00698 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00699   int const notification_required =
00700     notification_queue_.push_new_notification(buffer);
00701 
00702   if (notification_required == -1)
00703     {
00704       return -1;
00705     }
00706 
00707   if (notification_required == 0)
00708     {
00709       // No failures, the handler is now owned by the notification queue
00710       safe_handler.release ();
00711 
00712       return 0;
00713     }
00714 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00715 
00716   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00717                                (char *) &buffer,
00718                                sizeof buffer,
00719                                timeout);
00720   if (n == -1)
00721     return -1;
00722 
00723   // No failures.
00724   safe_handler.release ();
00725 
00726   return 0;
00727 }

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle ( void   )  [virtual]

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor

Implements ACE_Reactor_Notify.

Definition at line 754 of file Select_Reactor_Base.cpp.

00755 {
00756   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00757 
00758   return this->notification_pipe_.read_handle ();
00759 }

int ACE_Select_Reactor_Notify::open ( ACE_Reactor_Impl r,
ACE_Timer_Queue = 0,
int  disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT 
) [virtual]

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 599 of file Select_Reactor_Base.cpp.

00602 {
00603   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00604 
00605   if (disable_notify_pipe == 0)
00606     {
00607       this->select_reactor_ =
00608         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00609 
00610       if (select_reactor_ == 0)
00611         {
00612           errno = EINVAL;
00613           return -1;
00614         }
00615 
00616       if (this->notification_pipe_.open () == -1)
00617         return -1;
00618 #if defined (F_SETFD)
00619       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00620       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00621 #endif /* F_SETFD */
00622 
00623 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00624       if (notification_queue_.open() == -1)
00625         {
00626           return -1;
00627         }
00628 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00629 
00630       // There seems to be a Win32 bug with this...  Set this into
00631       // non-blocking mode.
00632       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00633                           ACE_NONBLOCK) == -1)
00634         return -1;
00635       else
00636         return this->select_reactor_->register_handler
00637           (this->notification_pipe_.read_handle (),
00638            this,
00639            ACE_Event_Handler::READ_MASK);
00640     }
00641   else
00642     {
00643       this->select_reactor_ = 0;
00644       return 0;
00645     }
00646 }

int ACE_Select_Reactor_Notify::purge_pending_notifications ( ACE_Event_Handler sh,
ACE_Reactor_Mask  mask = ACE_Event_Handler::ALL_EVENTS_MASK 
) [virtual]

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 569 of file Select_Reactor_Base.cpp.

00571 {
00572   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00573 
00574 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00575 
00576   return notification_queue_.purge_pending_notifications(eh, mask);
00577 
00578 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00579   ACE_UNUSED_ARG (eh);
00580   ACE_UNUSED_ARG (mask);
00581   ACE_NOTSUP_RETURN (-1);
00582 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00583 }

int ACE_Select_Reactor_Notify::read_notify_pipe ( ACE_HANDLE  handle,
ACE_Notification_Buffer buffer 
) [virtual]

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl

Return value semantics for this are: -1: nothing read, fatal, unrecoverable error 0: nothing read at all 1: complete buffer read

Implements ACE_Reactor_Notify.

Definition at line 867 of file Select_Reactor_Base.cpp.

00869 {
00870   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00871 
00872   // This is kind of a weird, fragile beast.  We first read with a
00873   // regular read.  The read side of this socket is non-blocking, so
00874   // the read may end up being short.
00875   //
00876   // If the read is short, then we do a recv_n to insure that we block
00877   // and read the rest of the buffer.
00878   //
00879   // Now, you might be tempted to say, "why don't we just replace the
00880   // first recv with a recv_n?"  I was, too.  But that doesn't work
00881   // because of how the calling code in handle_input() works.  In
00882   // handle_input, the event will only be dispatched if the return
00883   // value from read_notify_pipe() is > 0.  That means that we can't
00884   // return zero from this func unless it's an EOF condition.
00885   //
00886   // Thus, the return value semantics for this are:
00887   // -1: nothing read, fatal, unrecoverable error
00888   // 0: nothing read at all
00889   // 1: complete buffer read
00890 
00891   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00892 
00893   if (n > 0)
00894     {
00895       // Check to see if we've got a short read.
00896       if (n != sizeof buffer)
00897         {
00898           ssize_t const remainder = sizeof buffer - n;
00899 
00900           // If so, try to recover by reading the remainder.  If this
00901           // doesn't work we're in big trouble since the input stream
00902           // won't be aligned correctly.  I'm not sure quite what to
00903           // do at this point.  It's probably best just to return -1.
00904           if (ACE::recv_n (handle,
00905                            ((char *) &buffer) + n,
00906                            remainder) != remainder)
00907             return -1;
00908         }
00909 
00910 
00911       return 1;
00912     }
00913 
00914   // Return -1 if things have gone seriously  wrong.
00915   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00916     return -1;
00917 
00918   return 0;
00919 }


Member Data Documentation

Declare the dynamic allocation hooks.

Definition at line 233 of file Select_Reactor_Base.h.

Keeps track of the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 257 of file Select_Reactor_Base.h.

Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.

Definition at line 248 of file Select_Reactor_Base.h.

Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with disable_notify_pipe.

Definition at line 241 of file Select_Reactor_Base.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on Fri Nov 6 23:26:00 2009 for ACE by  doxygen 1.6.1