Implements a user-space queue to send Reactor notifications. More...
#include <Notification_Queue.h>


Public Member Functions | |
| ACE_Notification_Queue () | |
| ~ACE_Notification_Queue () | |
| int | open () |
| Pre-allocate resources in the queue. | |
| void | reset () |
| Release all resources in the queue. | |
| int | purge_pending_notifications (ACE_Event_Handler *eh, ACE_Reactor_Mask mask) |
Remove all elements in the queue matching eh and mask. | |
| int | push_new_notification (ACE_Notification_Buffer const &buffer) |
| Add a new notification to the queue. | |
| int | pop_next_notification (ACE_Notification_Buffer ¤t, bool &more_messages_queued, ACE_Notification_Buffer &next) |
| Extract the next notification from the queue. | |
Private Types | |
| typedef ACE_Intrusive_List < ACE_Notification_Queue_Node > | Buffer_List |
Private Member Functions | |
| int | allocate_more_buffers () |
| Allocate more memory for the queue. | |
Private Attributes | |
| ACE_Unbounded_Queue < ACE_Notification_Queue_Node * > | alloc_queue_ |
| Buffer_List | notify_queue_ |
| Keeps track of all pending notifications. | |
| Buffer_List | free_queue_ |
| Keeps track of all free buffers. | |
| ACE_SYNCH_MUTEX | notify_queue_lock_ |
| Synchronization for handling of queues. | |
Implements a user-space queue to send Reactor notifications.
The ACE_Reactor uses a pipe to send wake up the thread running the event loop from other threads. This pipe can be limited in size under some operating systems. For some applications, this limit presents a problem. A user-space notification queue is used to overcome those limitations. The queue tries to use as few resources on the pipe as possible, while keeping all the data in user space.
This code was refactored from Select_Reactor_Base.
Definition at line 80 of file Notification_Queue.h.
typedef ACE_Intrusive_List<ACE_Notification_Queue_Node> ACE_Notification_Queue::Buffer_List [private] |
Definition at line 136 of file Notification_Queue.h.
| ACE_Notification_Queue::ACE_Notification_Queue | ( | ) |
Definition at line 14 of file Notification_Queue.cpp.
00015 : ACE_Copy_Disabled() 00016 , alloc_queue_() 00017 , notify_queue_() 00018 , free_queue_() 00019 { 00020 }
| ACE_Notification_Queue::~ACE_Notification_Queue | ( | ) |
Definition at line 23 of file Notification_Queue.cpp.
00024 { 00025 reset(); 00026 }
| int ACE_Notification_Queue::allocate_more_buffers | ( | ) | [private] |
Allocate more memory for the queue.
Definition at line 77 of file Notification_Queue.cpp.
00078 { 00079 ACE_TRACE ("ACE_Notification_Queue::allocate_more_buffers"); 00080 00081 ACE_Notification_Queue_Node *temp = 0; 00082 00083 ACE_NEW_RETURN (temp, 00084 ACE_Notification_Queue_Node[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], 00085 -1); 00086 00087 if (this->alloc_queue_.enqueue_head (temp) == -1) 00088 { 00089 delete [] temp; 00090 return -1; 00091 } 00092 00093 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) 00094 { 00095 free_queue_.push_front(temp + i); 00096 } 00097 00098 return 0; 00099 }
| int ACE_Notification_Queue::open | ( | void | ) |
Pre-allocate resources in the queue.
Definition at line 29 of file Notification_Queue.cpp.
00030 { 00031 ACE_TRACE ("ACE_Notification_Queue::open"); 00032 00033 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00034 00035 if (!this->free_queue_.is_empty ()) 00036 return 0; 00037 00038 return allocate_more_buffers(); 00039 }
| int ACE_Notification_Queue::pop_next_notification | ( | ACE_Notification_Buffer & | current, | |
| bool & | more_messages_queued, | |||
| ACE_Notification_Buffer & | next | |||
| ) |
Extract the next notification from the queue.
Definition at line 195 of file Notification_Queue.cpp.
00199 { 00200 ACE_TRACE ("ACE_Notification_Queue::pop_next_notification"); 00201 00202 more_messages_queued = false; 00203 00204 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00205 00206 if (notify_queue_.is_empty ()) 00207 { 00208 return 0; 00209 } 00210 00211 ACE_Notification_Queue_Node * node = notify_queue_.pop_front(); 00212 00213 current = node->get(); 00214 free_queue_.push_front(node); 00215 00216 if(!this->notify_queue_.is_empty()) 00217 { 00218 more_messages_queued = true; 00219 next = notify_queue_.head()->get(); 00220 } 00221 00222 return 1; 00223 }
| int ACE_Notification_Queue::purge_pending_notifications | ( | ACE_Event_Handler * | eh, | |
| ACE_Reactor_Mask | mask | |||
| ) |
Remove all elements in the queue matching eh and mask.
I suggest reading the documentation in ACE_Reactor to find a more detailed description. This is just a helper function.
Definition at line 102 of file Notification_Queue.cpp.
00105 { 00106 ACE_TRACE ("ACE_Notification_Queue::purge_pending_notifications"); 00107 00108 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00109 00110 if (this->notify_queue_.is_empty ()) 00111 return 0; 00112 00113 int number_purged = 0; 00114 ACE_Notification_Queue_Node * node = notify_queue_.head(); 00115 while(node != 0) 00116 { 00117 if (!node->matches_for_purging(eh)) 00118 { 00119 // Easy case, skip to the next node 00120 node = node->next(); 00121 continue; 00122 } 00123 00124 if (!node->mask_disables_all_notifications(mask)) 00125 { 00126 // ... another easy case, skip this node too, but clear the 00127 // mask first ... 00128 node->clear_mask(mask); 00129 node = node->next(); 00130 continue; 00131 } 00132 00133 // ... this is the more complicated case, we want to remove the 00134 // node from the notify_queue_ list. First save the next node 00135 // on the list: 00136 ACE_Notification_Queue_Node * next = node->next(); 00137 00138 // ... then remove it ... 00139 notify_queue_.unsafe_remove(node); 00140 ++number_purged; 00141 00142 // ... release resources ... 00143 ACE_Event_Handler *event_handler = node->get().eh_; 00144 event_handler->remove_reference (); 00145 00146 // ... now this is a free node ... 00147 free_queue_.push_front(node); 00148 00149 // ... go to the next node, if there is one ... 00150 node = next; 00151 } 00152 00153 return number_purged; 00154 }
| int ACE_Notification_Queue::push_new_notification | ( | ACE_Notification_Buffer const & | buffer | ) |
Add a new notification to the queue.
Definition at line 157 of file Notification_Queue.cpp.
00159 { 00160 ACE_TRACE ("ACE_Notification_Queue::push_new_notification"); 00161 00162 bool notification_required = false; 00163 00164 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00165 00166 // No pending notifications. 00167 if (this->notify_queue_.is_empty ()) 00168 notification_required = true; 00169 00170 if (free_queue_.is_empty()) 00171 { 00172 if (allocate_more_buffers() == -1) 00173 { 00174 return -1; 00175 } 00176 } 00177 00178 ACE_Notification_Queue_Node * node = 00179 free_queue_.pop_front(); 00180 00181 ACE_ASSERT (node != 0); 00182 node->set(buffer); 00183 00184 notify_queue_.push_back(node); 00185 00186 if (!notification_required) 00187 { 00188 return 0; 00189 } 00190 00191 return 1; 00192 }
| void ACE_Notification_Queue::reset | ( | void | ) |
Release all resources in the queue.
Definition at line 42 of file Notification_Queue.cpp.
00043 { 00044 ACE_TRACE ("ACE_Notification_Queue::reset"); 00045 00046 // Release all the event handlers still in the queue ... 00047 for (ACE_Notification_Queue_Node * node = notify_queue_.head(); 00048 node != 0; 00049 node = node->next()) 00050 { 00051 if (node->get().eh_ == 0) 00052 { 00053 continue; 00054 } 00055 (void) node->get().eh_->remove_reference(); 00056 } 00057 00058 // ... free up the dynamically allocated resources ... 00059 ACE_Notification_Queue_Node **b = 0; 00060 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Queue_Node *> alloc_iter (this->alloc_queue_); 00061 alloc_iter.next (b) != 0; 00062 alloc_iter.advance ()) 00063 { 00064 delete [] *b; 00065 *b = 0; 00066 } 00067 00068 // ... cleanup the list of allocated blocks ... 00069 this->alloc_queue_.reset (); 00070 00071 // ... swap with empty lists to reset the contents ... 00072 Buffer_List().swap(notify_queue_); 00073 Buffer_List().swap(free_queue_); 00074 }
Keeps track of allocated arrays of type ACE_Notification_Buffer. The idea is to amortize allocation costs by allocating multiple ACE_Notification_Buffer objects at a time.
Definition at line 134 of file Notification_Queue.h.
Keeps track of all free buffers.
Definition at line 142 of file Notification_Queue.h.
Keeps track of all pending notifications.
Definition at line 139 of file Notification_Queue.h.
ACE_SYNCH_MUTEX ACE_Notification_Queue::notify_queue_lock_ [private] |
Synchronization for handling of queues.
Definition at line 145 of file Notification_Queue.h.
1.6.1