ACE_Notification_Queue Class Reference

Implements a user-space queue to send Reactor notifications. More...

#include <Notification_Queue.h>

Inheritance diagram for ACE_Notification_Queue:
Inheritance graph
[legend]
Collaboration diagram for ACE_Notification_Queue:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ACE_Notification_Queue ()
 ~ACE_Notification_Queue ()
int open ()
 Pre-allocate resources in the queue.
void reset ()
 Release all resources in the queue.
int purge_pending_notifications (ACE_Event_Handler *eh, ACE_Reactor_Mask mask)
 Remove all elements in the queue matching eh and mask.
int push_new_notification (ACE_Notification_Buffer const &buffer)
 Add a new notification to the queue.
int pop_next_notification (ACE_Notification_Buffer &current, bool &more_messages_queued, ACE_Notification_Buffer &next)
 Extract the next notification from the queue.

Private Types

typedef ACE_Intrusive_List
< ACE_Notification_Queue_Node
Buffer_List

Private Member Functions

int allocate_more_buffers ()
 Allocate more memory for the queue.

Private Attributes

ACE_Unbounded_Queue
< ACE_Notification_Queue_Node * > 
alloc_queue_
Buffer_List notify_queue_
 Keeps track of all pending notifications.
Buffer_List free_queue_
 Keeps track of all free buffers.
ACE_SYNCH_MUTEX notify_queue_lock_
 Synchronization for handling of queues.

Detailed Description

Implements a user-space queue to send Reactor notifications.

The ACE_Reactor uses a pipe to send wake up the thread running the event loop from other threads. This pipe can be limited in size under some operating systems. For some applications, this limit presents a problem. A user-space notification queue is used to overcome those limitations. The queue tries to use as few resources on the pipe as possible, while keeping all the data in user space.

This code was refactored from Select_Reactor_Base.

Definition at line 80 of file Notification_Queue.h.


Member Typedef Documentation

Definition at line 136 of file Notification_Queue.h.


Constructor & Destructor Documentation

ACE_Notification_Queue::ACE_Notification_Queue (  ) 

Definition at line 14 of file Notification_Queue.cpp.

00015   : ACE_Copy_Disabled()
00016   , alloc_queue_()
00017   , notify_queue_()
00018   , free_queue_()
00019 {
00020 }

ACE_Notification_Queue::~ACE_Notification_Queue (  ) 

Definition at line 23 of file Notification_Queue.cpp.

00024 {
00025   reset();
00026 }


Member Function Documentation

int ACE_Notification_Queue::allocate_more_buffers (  )  [private]

Allocate more memory for the queue.

Definition at line 77 of file Notification_Queue.cpp.

00078 {
00079   ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers");
00080 
00081   ACE_Notification_Queue_Node *temp = 0;
00082 
00083   ACE_NEW_RETURN (temp,
00084                   ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00085                   -1);
00086 
00087   if (this->alloc_queue_.enqueue_head (temp) == -1)
00088     {
00089       delete [] temp;
00090       return -1;
00091     }
00092 
00093   for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00094     {
00095       free_queue_.push_front(temp + i);
00096     }
00097 
00098   return 0;
00099 }

int ACE_Notification_Queue::open ( void   ) 

Pre-allocate resources in the queue.

Definition at line 29 of file Notification_Queue.cpp.

00030 {
00031   ACE_TRACE ("ACE_Notification_Queue::open");
00032 
00033   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00034 
00035   if (!this->free_queue_.is_empty ())
00036     return 0;
00037 
00038   return allocate_more_buffers();
00039 }

int ACE_Notification_Queue::pop_next_notification ( ACE_Notification_Buffer current,
bool &  more_messages_queued,
ACE_Notification_Buffer next 
)

Extract the next notification from the queue.

Returns:
-1 on failure, 1 if a message was popped, 0 otherwise

Definition at line 195 of file Notification_Queue.cpp.

00199 {
00200   ACE_TRACE ("ACE_Notification_Queue::pop_next_notification");
00201 
00202   more_messages_queued = false;
00203 
00204   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00205 
00206   if (notify_queue_.is_empty ())
00207     {
00208       return 0;
00209     }
00210 
00211   ACE_Notification_Queue_Node * node = notify_queue_.pop_front();
00212 
00213   current = node->get();
00214   free_queue_.push_front(node);
00215 
00216   if(!this->notify_queue_.is_empty())
00217     {
00218       more_messages_queued = true;
00219       next = notify_queue_.head()->get();
00220     }
00221 
00222   return 1;
00223 }

int ACE_Notification_Queue::purge_pending_notifications ( ACE_Event_Handler eh,
ACE_Reactor_Mask  mask 
)

Remove all elements in the queue matching eh and mask.

I suggest reading the documentation in ACE_Reactor to find a more detailed description. This is just a helper function.

Definition at line 102 of file Notification_Queue.cpp.

00105 {
00106   ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications");
00107 
00108   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00109 
00110   if (this->notify_queue_.is_empty ())
00111     return 0;
00112 
00113   int number_purged = 0;
00114   ACE_Notification_Queue_Node * node = notify_queue_.head();
00115   while(node != 0)
00116     {
00117       if (!node->matches_for_purging(eh))
00118         {
00119           // Easy case, skip to the next node
00120           node = node->next();
00121           continue;
00122         }
00123 
00124       if (!node->mask_disables_all_notifications(mask))
00125         {
00126           // ... another easy case, skip this node too, but clear the
00127           // mask first ...
00128           node->clear_mask(mask);
00129           node = node->next();
00130           continue;
00131         }
00132 
00133       // ... this is the more complicated case, we want to remove the
00134       // node from the notify_queue_ list.  First save the next node
00135       // on the list:
00136       ACE_Notification_Queue_Node * next = node->next();
00137 
00138       // ... then remove it ...
00139       notify_queue_.unsafe_remove(node);
00140       ++number_purged;
00141 
00142       // ... release resources ...
00143       ACE_Event_Handler *event_handler = node->get().eh_;
00144       event_handler->remove_reference ();
00145 
00146       // ... now this is a free node ...
00147       free_queue_.push_front(node);
00148 
00149       // ... go to the next node, if there is one ...
00150       node = next;
00151     }
00152 
00153   return number_purged;
00154 }

int ACE_Notification_Queue::push_new_notification ( ACE_Notification_Buffer const &  buffer  ) 

Add a new notification to the queue.

Returns:
-1 on failure, 1 if a new message should be sent through the pipe and 0 otherwise.

Definition at line 157 of file Notification_Queue.cpp.

00159 {
00160   ACE_TRACE ("ACE_Notification_Queue::push_new_notification");
00161 
00162   bool notification_required = false;
00163 
00164   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00165 
00166   // No pending notifications.
00167   if (this->notify_queue_.is_empty ())
00168     notification_required = true;
00169 
00170   if (free_queue_.is_empty())
00171     {
00172       if (allocate_more_buffers() == -1)
00173         {
00174           return -1;
00175         }
00176     }
00177 
00178   ACE_Notification_Queue_Node * node =
00179     free_queue_.pop_front();
00180 
00181   ACE_ASSERT (node != 0);
00182   node->set(buffer);
00183 
00184   notify_queue_.push_back(node);
00185 
00186   if (!notification_required)
00187     {
00188       return 0;
00189     }
00190 
00191   return 1;
00192 }

void ACE_Notification_Queue::reset ( void   ) 

Release all resources in the queue.

Definition at line 42 of file Notification_Queue.cpp.

00043 {
00044   ACE_TRACE ("ACE_Notification_Queue::reset");
00045 
00046   // Release all the event handlers still in the queue ...
00047   for (ACE_Notification_Queue_Node * node = notify_queue_.head();
00048        node != 0;
00049        node = node->next())
00050     {
00051       if (node->get().eh_ == 0)
00052         {
00053           continue;
00054         }
00055       (void) node->get().eh_->remove_reference();
00056     }
00057 
00058   // ... free up the dynamically allocated resources ...
00059   ACE_Notification_Queue_Node **b = 0;
00060   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_);
00061        alloc_iter.next (b) != 0;
00062        alloc_iter.advance ())
00063     {
00064       delete [] *b;
00065       *b = 0;
00066     }
00067 
00068   // ... cleanup the list of allocated blocks ...
00069   this->alloc_queue_.reset ();
00070 
00071   // ... swap with empty lists to reset the contents ...
00072   Buffer_List().swap(notify_queue_);
00073   Buffer_List().swap(free_queue_);
00074 }


Member Data Documentation

Keeps track of allocated arrays of type ACE_Notification_Buffer. The idea is to amortize allocation costs by allocating multiple ACE_Notification_Buffer objects at a time.

Definition at line 134 of file Notification_Queue.h.

Keeps track of all free buffers.

Definition at line 142 of file Notification_Queue.h.

Keeps track of all pending notifications.

Definition at line 139 of file Notification_Queue.h.

ACE_SYNCH_MUTEX ACE_Notification_Queue::notify_queue_lock_ [private]

Synchronization for handling of queues.

Definition at line 145 of file Notification_Queue.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on Fri Nov 6 23:24:54 2009 for ACE by  doxygen 1.6.1