00001
00002
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005
00006
00007
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif
00015
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018
00019 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00020 #include "ace/OS_NS_stdio.h"
00021 #include "ace/OS_NS_unistd.h"
00022 #include "ace/Monitor_Size.h"
00023 #endif
00024
00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00028 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00029 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00030 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N)
00031
00032 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00033 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00034 {
00035 #if defined (ACE_HAS_DUMP)
00036 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00037
00038 this->queue_.dump ();
00039 #endif
00040 }
00041
00042 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00043 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00044 {
00045 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00046
00047 this->queue_.message_bytes (new_value);
00048 }
00049
00050 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00051 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00052 {
00053 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00054
00055 this->queue_.message_length (new_value);
00056 }
00057
00058 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00059 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00060 size_t lwm,
00061 ACE_Notification_Strategy *ns)
00062 {
00063 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00064
00065 if (this->queue_.open (hwm, lwm, ns) == -1)
00066 ACE_ERROR ((LM_ERROR,
00067 ACE_TEXT ("ACE_Message_Queue_Ex")));
00068 }
00069
00070 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00071 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00072 {
00073 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00074 }
00075
00076 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00077 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00078 size_t lwm,
00079 ACE_Notification_Strategy *ns)
00080 {
00081 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00082
00083 return this->queue_.open (hwm, lwm, ns);
00084 }
00085
00086
00087
00088 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00089 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00090 {
00091 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00092
00093 return this->queue_.close ();
00094 }
00095
00096 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00097 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00098 {
00099 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00100
00101 return this->queue_.flush ();
00102 }
00103
00104 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00105 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00106 {
00107 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00108
00109 return this->queue_.flush_i ();
00110 }
00111
00112
00113
00114 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00115 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00116 ACE_Time_Value *timeout)
00117 {
00118 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00119
00120 ACE_Message_Block *mb = 0;
00121
00122 int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00123
00124 if (cur_count != -1)
00125 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00126
00127 return cur_count;
00128 }
00129
00130 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00131 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00132 ACE_Time_Value *timeout)
00133 {
00134 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00135
00136 ACE_Message_Block *mb = 0;
00137
00138 ACE_NEW_RETURN (mb,
00139 ACE_Message_Block ((char *) new_item,
00140 sizeof (*new_item),
00141 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00142 -1);
00143
00144 int const result = this->queue_.enqueue_head (mb, timeout);
00145 if (result == -1)
00146
00147 mb->release ();
00148 return result;
00149 }
00150
00151
00152
00153
00154
00155 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00156 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00157 ACE_Time_Value *timeout)
00158 {
00159 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00160
00161 return this->enqueue_prio (new_item, timeout);
00162 }
00163
00164 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00165 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00166 ACE_Time_Value *timeout,
00167 unsigned long priority)
00168 {
00169 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00170
00171 ACE_Message_Block *mb = 0;
00172
00173 ACE_NEW_RETURN (mb,
00174 ACE_Message_Block ((char *) new_item,
00175 sizeof (*new_item),
00176 priority),
00177 -1);
00178
00179 int const result = this->queue_.enqueue_prio (mb, timeout);
00180 if (result == -1)
00181
00182 mb->release ();
00183
00184 return result;
00185 }
00186
00187 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00188 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00189 ACE_Time_Value *timeout)
00190 {
00191 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00192
00193 ACE_Message_Block *mb = 0;
00194
00195 ACE_NEW_RETURN (mb,
00196 ACE_Message_Block ((char *) new_item,
00197 sizeof (*new_item),
00198 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00199 -1);
00200
00201 int const result = this->queue_.enqueue_deadline (mb, timeout);
00202 if (result == -1)
00203
00204 mb->release ();
00205
00206 return result;
00207 }
00208
00209
00210
00211
00212 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00213 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00214 ACE_Time_Value *timeout)
00215 {
00216 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00217
00218 ACE_Message_Block *mb = 0;
00219
00220 ACE_NEW_RETURN (mb,
00221 ACE_Message_Block ((char *) new_item,
00222 sizeof (*new_item),
00223 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00224 -1);
00225
00226 int const result = this->queue_.enqueue_tail (mb, timeout);
00227 if (result == -1)
00228
00229 mb->release ();
00230 return result;
00231 }
00232
00233
00234
00235
00236
00237 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00238 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00239 ACE_Time_Value *timeout)
00240 {
00241 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00242
00243 ACE_Message_Block *mb = 0;
00244
00245 int const cur_count = this->queue_.dequeue_head (mb, timeout);
00246
00247
00248 if (cur_count != -1)
00249 {
00250 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00251
00252 mb->release ();
00253 }
00254
00255 return cur_count;
00256 }
00257
00258
00259
00260
00261
00262 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00263 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00264 ACE_Time_Value *timeout)
00265 {
00266 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00267
00268 ACE_Message_Block *mb = 0;
00269
00270 int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00271
00272
00273 if (cur_count != -1)
00274 {
00275 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00276
00277 mb->release ();
00278 }
00279
00280 return cur_count;
00281 }
00282
00283
00284
00285
00286
00287 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00288 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00289 ACE_Time_Value *timeout)
00290 {
00291 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00292
00293 ACE_Message_Block *mb = 0;
00294
00295 int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00296
00297
00298 if (cur_count != -1)
00299 {
00300 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00301
00302 mb->release ();
00303 }
00304
00305 return cur_count;
00306 }
00307
00308
00309
00310
00311
00312 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00313 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00314 ACE_Time_Value *timeout)
00315 {
00316 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00317
00318 ACE_Message_Block *mb = 0;
00319
00320 int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00321
00322
00323 if (cur_count != -1)
00324 {
00325 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00326
00327 mb->release ();
00328 }
00329
00330 return cur_count;
00331 }
00332
00333 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00334 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00335 {
00336 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00337
00338 return this->queue_.notify ();
00339 }
00340
00341 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00342 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00343 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE> & queue)
00344 : iter_ (queue.queue_)
00345 {
00346
00347 }
00348
00349 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00350 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00351 next (ACE_MESSAGE_TYPE *&entry)
00352 {
00353 ACE_Message_Block * mb = 0;
00354 int retval = this->iter_.next (mb);
00355
00356 if (retval == 1)
00357 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00358
00359 return retval;
00360 }
00361
00362 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00363 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::done (void) const
00364 {
00365 return this->iter_.done ();
00366 }
00367
00368 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00369 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::advance (void)
00370 {
00371 return this->iter_.advance ();
00372 }
00373
00374 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00375 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00376 {
00377 this->iter_.dump ();
00378 }
00379
00380 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_Iterator)
00381
00382 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00383 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00384 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE> & queue)
00385 : iter_ (queue.queue_)
00386 {
00387
00388 }
00389
00390 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00391 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00392 next (ACE_MESSAGE_TYPE *&entry)
00393 {
00394 ACE_Message_Block * mb = 0;
00395 int retval = this->iter_.next (mb);
00396
00397 if (retval == 1)
00398 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00399
00400 return retval;
00401 }
00402
00403 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00404 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::done (void) const
00405 {
00406 return this->iter_.done ();
00407 }
00408
00409 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00410 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::advance (void)
00411 {
00412 return this->iter_.advance ();
00413 }
00414
00415 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00416 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00417 {
00418 this->iter_.dump ();
00419 }
00420
00421 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_Reverse_Iterator)
00422
00423 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00424 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N
00425 (size_t high_water_mark,
00426 size_t low_water_mark,
00427 ACE_Notification_Strategy *ns):
00428 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark,
00429 low_water_mark,
00430 ns)
00431 {
00432 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N");
00433 }
00434
00435 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00436 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void)
00437 {
00438 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N");
00439 }
00440
00441 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00442 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head
00443 (ACE_MESSAGE_TYPE *new_item,
00444 ACE_Time_Value *timeout)
00445 {
00446 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00447
00448
00449
00450 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00451 if (0 == mb)
00452 {
00453 return -1;
00454 }
00455
00456 int result = this->queue_.enqueue_head (mb, timeout);
00457 if (-1 == result)
00458 {
00459
00460 mb->release ();
00461 }
00462 return result;
00463 }
00464
00465 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00466 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail
00467 (ACE_MESSAGE_TYPE *new_item,
00468 ACE_Time_Value *timeout)
00469 {
00470 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00471
00472
00473
00474 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00475 if (0 == mb)
00476 {
00477 return -1;
00478 }
00479
00480 int result = this->queue_.enqueue_tail (mb, timeout);
00481 if (-1 == result)
00482 {
00483
00484 mb->release ();
00485 }
00486 return result;
00487 }
00488
00489 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block *
00490 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i
00491 (ACE_MESSAGE_TYPE *new_item)
00492 {
00493 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i");
00494
00495
00496 ACE_Message_Block *mb_head = 0;
00497
00498 ACE_NEW_RETURN (mb_head,
00499 ACE_Message_Block ((char *) new_item,
00500 sizeof (*new_item),
00501 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00502 0);
00503
00504
00505 ACE_Message_Block *mb_tail = mb_head;
00506
00507
00508 for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
00509 {
00510 ACE_Message_Block *mb_temp = 0;
00511 ACE_NEW_NORETURN (mb_temp,
00512 ACE_Message_Block ((char *) pobj,
00513 sizeof (*pobj),
00514 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY));
00515 if (mb_temp == 0)
00516 {
00517 mb_head->release ();
00518 mb_head = 0;
00519 break;
00520 }
00521
00522 mb_tail->next (mb_temp);
00523 mb_tail = mb_temp;
00524 }
00525
00526 return mb_head;
00527 }
00528
00529 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00530
00531 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00532 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00533 ACE_Time_Value *timeout)
00534 {
00535 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00536
00537 return this->dequeue_head (first_item, timeout);
00538 }
00539
00540 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00541 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00542 {
00543 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00544
00545 return this->queue_.notification_strategy ();
00546 }
00547
00548 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00549 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00550 {
00551 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00552
00553 this->queue_.notification_strategy (s);
00554 }
00555
00556
00557
00558 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00559 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00560 {
00561 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00562
00563 return this->queue_.is_empty ();
00564 }
00565
00566
00567
00568 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00569 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00570 {
00571 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00572
00573 return this->queue_.is_full ();
00574 }
00575
00576 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00577 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00578 {
00579 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00580
00581 return this->queue_.high_water_mark ();
00582 }
00583
00584 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00585 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00586 {
00587 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00588
00589 this->queue_.high_water_mark (hwm);
00590 }
00591
00592 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00593 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00594 {
00595 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00596
00597 return this->queue_.low_water_mark ();
00598 }
00599
00600 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00601 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00602 {
00603 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00604
00605 this->queue_.low_water_mark (lwm);
00606 }
00607
00608 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00609 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00610 {
00611 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00612
00613 return this->queue_.message_bytes ();
00614 }
00615
00616 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00617 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00618 {
00619 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00620
00621 return this->queue_.message_length ();
00622 }
00623
00624 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00625 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00626 {
00627 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00628
00629 return this->queue_.message_count ();
00630 }
00631
00632 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00633 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00634 {
00635 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00636
00637 return this->queue_.deactivate ();
00638 }
00639
00640 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00641 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00642 {
00643 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00644
00645 return this->queue_.activate ();
00646 }
00647
00648 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00649 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00650 {
00651 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00652
00653 return this->queue_.pulse ();
00654 }
00655
00656 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00657 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00658 {
00659 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00660
00661 return this->queue_.deactivated ();
00662 }
00663
00664 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00665 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00666 {
00667 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00668
00669 return this->queue_.state ();
00670 }
00671
00672 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00673 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00674 {
00675 return this->queue_.lock ();
00676 }
00677
00678 template <ACE_SYNCH_DECL>
00679 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00680 : queue_ (q),
00681 curr_ (q.head_)
00682 {
00683 }
00684
00685 template <ACE_SYNCH_DECL> int
00686 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00687 {
00688 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00689
00690 if (this->curr_ != 0)
00691 {
00692 entry = this->curr_;
00693 return 1;
00694 }
00695
00696 return 0;
00697 }
00698
00699 template <ACE_SYNCH_DECL> int
00700 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00701 {
00702 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00703
00704 return this->curr_ == 0;
00705 }
00706
00707 template <ACE_SYNCH_DECL> int
00708 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00709 {
00710 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00711
00712 if (this->curr_)
00713 this->curr_ = this->curr_->next ();
00714 return this->curr_ != 0;
00715 }
00716
00717 template <ACE_SYNCH_DECL> void
00718 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00719 {
00720 #if defined (ACE_HAS_DUMP)
00721 #endif
00722 }
00723
00724 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00725
00726 template <ACE_SYNCH_DECL>
00727 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00728 : queue_ (q),
00729 curr_ (queue_.tail_)
00730 {
00731 }
00732
00733 template <ACE_SYNCH_DECL> int
00734 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00735 {
00736 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00737
00738 if (this->curr_ != 0)
00739 {
00740 entry = this->curr_;
00741 return 1;
00742 }
00743
00744 return 0;
00745 }
00746
00747 template <ACE_SYNCH_DECL> int
00748 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00749 {
00750 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00751
00752 return this->curr_ == 0;
00753 }
00754
00755 template <ACE_SYNCH_DECL> int
00756 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00757 {
00758 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00759
00760 if (this->curr_)
00761 this->curr_ = this->curr_->prev ();
00762 return this->curr_ != 0;
00763 }
00764
00765 template <ACE_SYNCH_DECL> void
00766 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00767 {
00768 #if defined (ACE_HAS_DUMP)
00769 #endif
00770 }
00771
00772 template <ACE_SYNCH_DECL> int
00773 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00774 ACE_Time_Value *timeout)
00775 {
00776 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00777 return this->dequeue_head (first_item, timeout);
00778 }
00779
00780 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00781 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00782 {
00783 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00784
00785 return this->notification_strategy_;
00786 }
00787
00788 template <ACE_SYNCH_DECL> void
00789 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00790 {
00791 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00792
00793 this->notification_strategy_ = s;
00794 }
00795
00796
00797
00798 template <ACE_SYNCH_DECL> bool
00799 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00800 {
00801 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00802 return this->tail_ == 0;
00803 }
00804
00805
00806
00807 template <ACE_SYNCH_DECL> bool
00808 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00809 {
00810 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00811 return this->cur_bytes_ >= this->high_water_mark_;
00812 }
00813
00814
00815
00816 template <ACE_SYNCH_DECL> bool
00817 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00818 {
00819 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00820 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00821
00822 return this->is_empty_i ();
00823 }
00824
00825
00826
00827 template <ACE_SYNCH_DECL> bool
00828 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00829 {
00830 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00831 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00832
00833 return this->is_full_i ();
00834 }
00835
00836 template <ACE_SYNCH_DECL> size_t
00837 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00838 {
00839 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00840 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00841
00842 return this->high_water_mark_;
00843 }
00844
00845 template <ACE_SYNCH_DECL> void
00846 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00847 {
00848 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00849 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00850
00851 this->high_water_mark_ = hwm;
00852 }
00853
00854 template <ACE_SYNCH_DECL> size_t
00855 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00856 {
00857 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00858 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00859
00860 return this->low_water_mark_;
00861 }
00862
00863 template <ACE_SYNCH_DECL> void
00864 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00865 {
00866 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00867 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00868
00869 this->low_water_mark_ = lwm;
00870 }
00871
00872 template <ACE_SYNCH_DECL> size_t
00873 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00874 {
00875 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00876 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00877
00878 return this->cur_bytes_;
00879 }
00880
00881 template <ACE_SYNCH_DECL> size_t
00882 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00883 {
00884 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00885 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00886
00887 return this->cur_length_;
00888 }
00889
00890 template <ACE_SYNCH_DECL> size_t
00891 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00892 {
00893 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00894 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00895
00896 return this->cur_count_;
00897 }
00898
00899 template <ACE_SYNCH_DECL> int
00900 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00901 {
00902 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00903 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00904
00905 return this->deactivate_i (0);
00906 }
00907
00908 template <ACE_SYNCH_DECL> int
00909 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00910 {
00911 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00912 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00913
00914 return this->activate_i ();
00915 }
00916
00917 template <ACE_SYNCH_DECL> int
00918 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00919 {
00920 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00921 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00922
00923 return this->deactivate_i (1);
00924 }
00925
00926 template <ACE_SYNCH_DECL> int
00927 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00928 {
00929 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00930
00931 return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00932 }
00933
00934 template <ACE_SYNCH_DECL> int
00935 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00936 {
00937 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00938
00939 return this->state_;
00940 }
00941
00942 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00943 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00944 {
00945 return this->lock_;
00946 }
00947
00948 template <ACE_SYNCH_DECL> void
00949 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00950 {
00951 #if defined (ACE_HAS_DUMP)
00952 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00953 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00954 switch (this->state_)
00955 {
00956 case ACE_Message_Queue_Base::ACTIVATED:
00957 ACE_DEBUG ((LM_DEBUG,
00958 ACE_TEXT ("state = ACTIVATED\n")));
00959 break;
00960 case ACE_Message_Queue_Base::DEACTIVATED:
00961 ACE_DEBUG ((LM_DEBUG,
00962 ACE_TEXT ("state = DEACTIVATED\n")));
00963 break;
00964 case ACE_Message_Queue_Base::PULSED:
00965 ACE_DEBUG ((LM_DEBUG,
00966 ACE_TEXT ("state = PULSED\n")));
00967 break;
00968 }
00969 ACE_DEBUG ((LM_DEBUG,
00970 ACE_TEXT ("low_water_mark = %d\n")
00971 ACE_TEXT ("high_water_mark = %d\n")
00972 ACE_TEXT ("cur_bytes = %d\n")
00973 ACE_TEXT ("cur_length = %d\n")
00974 ACE_TEXT ("cur_count = %d\n")
00975 ACE_TEXT ("head_ = %u\n")
00976 ACE_TEXT ("tail_ = %u\n"),
00977 this->low_water_mark_,
00978 this->high_water_mark_,
00979 this->cur_bytes_,
00980 this->cur_length_,
00981 this->cur_count_,
00982 this->head_,
00983 this->tail_));
00984 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond:\n")));
00985 not_full_cond_.dump ();
00986 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond:\n")));
00987 not_empty_cond_.dump ();
00988 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00989 #endif
00990 }
00991
00992 template <ACE_SYNCH_DECL> void
00993 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00994 {
00995 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00996 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00997
00998 this->cur_bytes_ = new_value;
00999 }
01000
01001 template <ACE_SYNCH_DECL> void
01002 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
01003 {
01004 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
01005 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
01006
01007 this->cur_length_ = new_value;
01008 }
01009
01010 template <ACE_SYNCH_DECL>
01011 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
01012 size_t lwm,
01013 ACE_Notification_Strategy *ns)
01014 : not_empty_cond_ (lock_)
01015 , not_full_cond_ (lock_)
01016 {
01017 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
01018
01019 if (this->open (hwm, lwm, ns) == -1)
01020 ACE_ERROR ((LM_ERROR,
01021 ACE_TEXT ("open")));
01022
01023 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01024 ACE_NEW (this->monitor_,
01025 ACE::Monitor_Control::Size_Monitor);
01026
01027
01028 char pid_buf[sizeof (int) + 1];
01029 ACE_OS::sprintf (pid_buf, "%d", ACE_OS::getpid ());
01030 pid_buf[sizeof (int)] = '\0';
01031
01032 const int addr_nibbles = 2 * sizeof (ptrdiff_t);
01033 char addr_buf[addr_nibbles + 1];
01034 ACE_OS::sprintf (addr_buf, "%p", this);
01035 addr_buf[addr_nibbles] = '\0';
01036
01037 ACE_CString name_str ("Message_Queue_");
01038 name_str += pid_buf;
01039 name_str += '_';
01040 name_str += addr_buf;
01041 this->monitor_->name (name_str.c_str ());
01042 this->monitor_->add_to_registry ();
01043 #endif
01044 }
01045
01046 template <ACE_SYNCH_DECL>
01047 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
01048 {
01049 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
01050 if (this->head_ != 0 && this->close () == -1)
01051 ACE_ERROR ((LM_ERROR,
01052 ACE_TEXT ("close")));
01053
01054 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01055 this->monitor_->remove_from_registry ();
01056 this->monitor_->remove_ref ();
01057 #endif
01058 }
01059
01060 template <ACE_SYNCH_DECL> int
01061 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
01062 {
01063 int number_flushed = 0;
01064
01065
01066
01067 for (this->tail_ = 0; this->head_ != 0; )
01068 {
01069 ++number_flushed;
01070
01071 size_t mb_bytes = 0;
01072 size_t mb_length = 0;
01073 this->head_->total_size_and_length (mb_bytes,
01074 mb_length);
01075
01076 this->cur_bytes_ -= mb_bytes;
01077 this->cur_length_ -= mb_length;
01078 --this->cur_count_;
01079
01080 ACE_Message_Block *temp = this->head_;
01081 this->head_ = this->head_->next ();
01082
01083
01084
01085 temp->release ();
01086 }
01087
01088 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01089
01090 if (number_flushed > 0)
01091 {
01092 this->monitor_->receive (this->cur_length_);
01093 }
01094 #endif
01095
01096 return number_flushed;
01097 }
01098
01099
01100
01101
01102
01103 template <ACE_SYNCH_DECL> int
01104 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
01105 size_t lwm,
01106 ACE_Notification_Strategy *ns)
01107 {
01108 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
01109 this->high_water_mark_ = hwm;
01110 this->low_water_mark_ = lwm;
01111 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01112 this->cur_bytes_ = 0;
01113 this->cur_length_ = 0;
01114 this->cur_count_ = 0;
01115 this->tail_ = 0;
01116 this->head_ = 0;
01117 this->notification_strategy_ = ns;
01118 return 0;
01119 }
01120
01121
01122
01123
01124 template <ACE_SYNCH_DECL> int
01125 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
01126 {
01127 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
01128 int const previous_state = this->state_;
01129
01130 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
01131 {
01132
01133 this->not_empty_cond_.broadcast ();
01134 this->not_full_cond_.broadcast ();
01135
01136 if (pulse)
01137 this->state_ = ACE_Message_Queue_Base::PULSED;
01138 else
01139 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
01140 }
01141
01142 return previous_state;
01143 }
01144
01145 template <ACE_SYNCH_DECL> int
01146 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
01147 {
01148 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
01149 int const previous_state = this->state_;
01150 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01151 return previous_state;
01152 }
01153
01154 template <ACE_SYNCH_DECL> int
01155 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
01156 {
01157 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
01158 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01159
01160
01161 return this->flush_i ();
01162 }
01163
01164
01165
01166 template <ACE_SYNCH_DECL> int
01167 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
01168 {
01169 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
01170 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01171
01172
01173
01174 this->deactivate_i ();
01175
01176
01177 return this->flush_i ();
01178 }
01179
01180 template <ACE_SYNCH_DECL> int
01181 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
01182 {
01183 if (this->not_full_cond_.signal () != 0)
01184 return -1;
01185 return 0;
01186 }
01187
01188 template <ACE_SYNCH_DECL> int
01189 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
01190 {
01191
01192 if (this->not_empty_cond_.signal () != 0)
01193 return -1;
01194 return 0;
01195 }
01196
01197
01198
01199
01200 template <ACE_SYNCH_DECL> int
01201 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
01202 {
01203 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
01204
01205 if (new_item == 0)
01206 return -1;
01207
01208
01209
01210
01211
01212
01213 ACE_Message_Block *seq_tail = new_item;
01214 ++this->cur_count_;
01215 new_item->total_size_and_length (this->cur_bytes_,
01216 this->cur_length_);
01217 while (seq_tail->next () != 0)
01218 {
01219 seq_tail->next ()->prev (seq_tail);
01220 seq_tail = seq_tail->next ();
01221 ++this->cur_count_;
01222 seq_tail->total_size_and_length (this->cur_bytes_,
01223 this->cur_length_);
01224 }
01225
01226
01227 if (this->tail_ == 0)
01228 {
01229 this->head_ = new_item;
01230 this->tail_ = seq_tail;
01231
01232 new_item->prev (0);
01233 }
01234
01235 else
01236 {
01237
01238 this->tail_->next (new_item);
01239 new_item->prev (this->tail_);
01240 this->tail_ = seq_tail;
01241 }
01242
01243 if (this->signal_dequeue_waiters () == -1)
01244 return -1;
01245 else
01246 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01247 }
01248
01249
01250
01251 template <ACE_SYNCH_DECL> int
01252 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01253 {
01254 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01255
01256 if (new_item == 0)
01257 return -1;
01258
01259
01260
01261
01262
01263
01264 ACE_Message_Block *seq_tail = new_item;
01265 ++this->cur_count_;
01266 new_item->total_size_and_length (this->cur_bytes_,
01267 this->cur_length_);
01268 while (seq_tail->next () != 0)
01269 {
01270 seq_tail->next ()->prev (seq_tail);
01271 seq_tail = seq_tail->next ();
01272 ++this->cur_count_;
01273 seq_tail->total_size_and_length (this->cur_bytes_,
01274 this->cur_length_);
01275 }
01276
01277 new_item->prev (0);
01278 seq_tail->next (this->head_);
01279
01280 if (this->head_ != 0)
01281 this->head_->prev (seq_tail);
01282 else
01283 this->tail_ = seq_tail;
01284
01285 this->head_ = new_item;
01286
01287 if (this->signal_dequeue_waiters () == -1)
01288 return -1;
01289 else
01290 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01291 }
01292
01293
01294
01295
01296 template <ACE_SYNCH_DECL> int
01297 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01298 {
01299 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01300
01301 if (new_item == 0)
01302 return -1;
01303
01304
01305
01306
01307
01308 new_item->next (0);
01309
01310 if (this->head_ == 0)
01311
01312
01313 return this->enqueue_head_i (new_item);
01314 else
01315 {
01316 ACE_Message_Block *temp = 0;
01317
01318
01319
01320
01321
01322 for (temp = this->tail_;
01323 temp != 0;
01324 temp = temp->prev ())
01325 if (temp->msg_priority () >= new_item->msg_priority ())
01326
01327
01328 break;
01329
01330 if (temp == 0)
01331
01332
01333
01334 return this->enqueue_head_i (new_item);
01335 else if (temp->next () == 0)
01336
01337
01338
01339 return this->enqueue_tail_i (new_item);
01340 else
01341 {
01342
01343
01344
01345
01346 new_item->prev (temp);
01347 new_item->next (temp->next ());
01348 temp->next ()->prev (new_item);
01349 temp->next (new_item);
01350 }
01351 }
01352
01353
01354 new_item->total_size_and_length (this->cur_bytes_,
01355 this->cur_length_);
01356 ++this->cur_count_;
01357
01358 if (this->signal_dequeue_waiters () == -1)
01359 return -1;
01360 else
01361 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01362 }
01363
01364
01365
01366
01367 template <ACE_SYNCH_DECL> int
01368 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01369 {
01370 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01371 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01372
01373 if (new_item == 0)
01374 return -1;
01375
01376
01377
01378
01379
01380 new_item->next (0);
01381
01382 if (this->head_ == 0)
01383
01384
01385 return this->enqueue_head_i (new_item);
01386 else
01387 {
01388 ACE_Message_Block *temp = 0;
01389
01390
01391
01392
01393
01394 for (temp = this->head_;
01395 temp != 0;
01396 temp = temp->next ())
01397 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01398
01399
01400 break;
01401
01402 if (temp == 0 || temp->next () == 0)
01403
01404
01405
01406 return this->enqueue_tail_i (new_item);
01407 else
01408 {
01409
01410
01411
01412
01413 new_item->prev (temp);
01414 new_item->next (temp->next ());
01415 temp->next ()->prev (new_item);
01416 temp->next (new_item);
01417 }
01418 }
01419
01420
01421 new_item->total_size_and_length (this->cur_bytes_,
01422 this->cur_length_);
01423 ++this->cur_count_;
01424
01425 if (this->signal_dequeue_waiters () == -1)
01426 return -1;
01427 else
01428 return this->cur_count_;
01429 #else
01430 return this->enqueue_tail_i (new_item);
01431 #endif
01432 }
01433
01434
01435
01436
01437
01438 template <ACE_SYNCH_DECL> int
01439 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01440 {
01441 if (this->head_ ==0)
01442 ACE_ERROR_RETURN ((LM_ERROR,
01443 ACE_TEXT ("Attempting to dequeue from empty queue")),
01444 -1);
01445 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01446 first_item = this->head_;
01447 this->head_ = this->head_->next ();
01448
01449 if (this->head_ == 0)
01450 this->tail_ = 0;
01451 else
01452
01453 this->head_->prev (0);
01454
01455 size_t mb_bytes = 0;
01456 size_t mb_length = 0;
01457 first_item->total_size_and_length (mb_bytes,
01458 mb_length);
01459
01460 this->cur_bytes_ -= mb_bytes;
01461 this->cur_length_ -= mb_length;
01462 --this->cur_count_;
01463
01464 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01465 this->head_ = this->tail_ = 0;
01466
01467
01468 first_item->prev (0);
01469 first_item->next (0);
01470
01471 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01472 this->monitor_->receive (this->cur_length_);
01473 #endif
01474
01475
01476
01477 if (this->cur_bytes_ <= this->low_water_mark_
01478 && this->signal_enqueue_waiters () == -1)
01479 return -1;
01480 else
01481 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01482 }
01483
01484
01485
01486
01487
01488
01489 template <ACE_SYNCH_DECL> int
01490 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01491 {
01492 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01493
01494 if (this->head_ == 0)
01495 return -1;
01496
01497
01498
01499 ACE_Message_Block *chosen = 0;
01500 u_long priority = ULONG_MAX;
01501
01502 for (ACE_Message_Block *temp = this->tail_;
01503 temp != 0;
01504 temp = temp->prev ())
01505 {
01506
01507
01508 if (temp->msg_priority () <= priority)
01509 {
01510 priority = temp->msg_priority ();
01511 chosen = temp;
01512 }
01513 }
01514
01515
01516
01517 if (chosen == 0)
01518 chosen = this->head_;
01519
01520
01521
01522 if (chosen->prev () == 0)
01523 this->head_ = chosen->next ();
01524 else
01525 chosen->prev ()->next (chosen->next ());
01526
01527 if (chosen->next () == 0)
01528 this->tail_ = chosen->prev ();
01529 else
01530 chosen->next ()->prev (chosen->prev ());
01531
01532
01533 dequeued = chosen;
01534
01535 size_t mb_bytes = 0;
01536 size_t mb_length = 0;
01537 dequeued->total_size_and_length (mb_bytes,
01538 mb_length);
01539
01540 this->cur_bytes_ -= mb_bytes;
01541 this->cur_length_ -= mb_length;
01542 --this->cur_count_;
01543
01544 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01545 this->head_ = this->tail_ = 0;
01546
01547
01548 dequeued->prev (0);
01549 dequeued->next (0);
01550
01551
01552
01553 if (this->cur_bytes_ <= this->low_water_mark_
01554 && this->signal_enqueue_waiters () == -1)
01555 return -1;
01556 else
01557 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01558 }
01559
01560
01561
01562
01563
01564 template <ACE_SYNCH_DECL> int
01565 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01566 {
01567 if (this->head_ == 0)
01568 ACE_ERROR_RETURN ((LM_ERROR,
01569 ACE_TEXT ("Attempting to dequeue from empty queue")),
01570 -1);
01571 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01572 dequeued = this->tail_;
01573 if (this->tail_->prev () == 0)
01574 {
01575 this->head_ = 0;
01576 this->tail_ = 0;
01577 }
01578 else
01579 {
01580 this->tail_->prev ()->next (0);
01581 this->tail_ = this->tail_->prev ();
01582 }
01583
01584 size_t mb_bytes = 0;
01585 size_t mb_length = 0;
01586 dequeued->total_size_and_length (mb_bytes,
01587 mb_length);
01588
01589 this->cur_bytes_ -= mb_bytes;
01590 this->cur_length_ -= mb_length;
01591 --this->cur_count_;
01592
01593 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01594 this->head_ = this->tail_ = 0;
01595
01596
01597 dequeued->prev (0);
01598 dequeued->next (0);
01599
01600
01601
01602 if (this->cur_bytes_ <= this->low_water_mark_
01603 && this->signal_enqueue_waiters () == -1)
01604 return -1;
01605 else
01606 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01607 }
01608
01609
01610
01611
01612
01613 template <ACE_SYNCH_DECL> int
01614 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01615 {
01616 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01617 if (this->head_ == 0)
01618 ACE_ERROR_RETURN ((LM_ERROR,
01619 ACE_TEXT ("Attempting to dequeue from empty queue")),
01620 -1);
01621 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01622
01623
01624 ACE_Message_Block* chosen = 0;
01625 ACE_Time_Value deadline = ACE_Time_Value::max_time;
01626 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01627 if (temp->msg_deadline_time () < deadline)
01628 {
01629 deadline = temp->msg_deadline_time ();
01630 chosen = temp;
01631 }
01632
01633
01634
01635 if (chosen == 0)
01636 chosen = this->head_;
01637
01638
01639
01640 if (chosen->prev () == 0)
01641 this->head_ = chosen->next ();
01642 else
01643 chosen->prev ()->next (chosen->next ());
01644
01645 if (chosen->next () == 0)
01646 this->tail_ = chosen->prev ();
01647 else
01648 chosen->next ()->prev (chosen->prev ());
01649
01650
01651 dequeued = chosen;
01652
01653 size_t mb_bytes = 0;
01654 size_t mb_length = 0;
01655 dequeued->total_size_and_length (mb_bytes,
01656 mb_length);
01657
01658 this->cur_bytes_ -= mb_bytes;
01659 this->cur_length_ -= mb_length;
01660 --this->cur_count_;
01661
01662 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01663 this->head_ = this->tail_ = 0;
01664
01665
01666 dequeued->prev (0);
01667 dequeued->next (0);
01668
01669
01670
01671 if (this->cur_bytes_ <= this->low_water_mark_
01672 && this->signal_enqueue_waiters () == -1)
01673 return -1;
01674 else
01675 return this->cur_count_;
01676 #else
01677 return this->dequeue_head_i (dequeued);
01678 #endif
01679 }
01680
01681
01682
01683 template <ACE_SYNCH_DECL> int
01684 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01685 ACE_Time_Value *timeout)
01686 {
01687 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01688 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01689
01690 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01691 {
01692 errno = ESHUTDOWN;
01693 return -1;
01694 }
01695
01696
01697
01698 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01699 return -1;
01700
01701 first_item = this->head_;
01702 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01703 }
01704
01705 template <ACE_SYNCH_DECL> int
01706 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01707 ACE_Time_Value *timeout)
01708 {
01709 int result = 0;
01710
01711
01712
01713 while (this->is_full_i ())
01714 {
01715 if (this->not_full_cond_.wait (timeout) == -1)
01716 {
01717 if (errno == ETIME)
01718 errno = EWOULDBLOCK;
01719 result = -1;
01720 break;
01721 }
01722 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01723 {
01724 errno = ESHUTDOWN;
01725 result = -1;
01726 break;
01727 }
01728 }
01729 return result;
01730 }
01731
01732 template <ACE_SYNCH_DECL> int
01733 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01734 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01735 {
01736 int result = 0;
01737
01738
01739
01740 while (this->is_empty_i ())
01741 {
01742 if (this->not_empty_cond_.wait (timeout) == -1)
01743 {
01744 if (errno == ETIME)
01745 errno = EWOULDBLOCK;
01746 result = -1;
01747 break;
01748 }
01749 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01750 {
01751 errno = ESHUTDOWN;
01752 result = -1;
01753 break;
01754 }
01755 }
01756 return result;
01757 }
01758
01759
01760
01761
01762 template <ACE_SYNCH_DECL> int
01763 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01764 ACE_Time_Value *timeout)
01765 {
01766 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01767 int queue_count = 0;
01768 ACE_Notification_Strategy *notifier = 0;
01769 {
01770 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01771
01772 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01773 {
01774 errno = ESHUTDOWN;
01775 return -1;
01776 }
01777
01778 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01779 return -1;
01780
01781 queue_count = this->enqueue_head_i (new_item);
01782 if (queue_count == -1)
01783 return -1;
01784
01785 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01786 this->monitor_->receive (this->cur_length_);
01787 #endif
01788 notifier = this->notification_strategy_;
01789 }
01790
01791 if (0 != notifier)
01792 notifier->notify();
01793 return queue_count;
01794 }
01795
01796
01797
01798
01799
01800 template <ACE_SYNCH_DECL> int
01801 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01802 ACE_Time_Value *timeout)
01803 {
01804 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01805 int queue_count = 0;
01806 ACE_Notification_Strategy *notifier = 0;
01807 {
01808 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01809
01810 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01811 {
01812 errno = ESHUTDOWN;
01813 return -1;
01814 }
01815
01816 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01817 return -1;
01818
01819 queue_count = this->enqueue_i (new_item);
01820
01821 if (queue_count == -1)
01822 return -1;
01823
01824 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01825 this->monitor_->receive (this->cur_length_);
01826 #endif
01827 notifier = this->notification_strategy_;
01828 }
01829 if (0 != notifier)
01830 notifier->notify ();
01831 return queue_count;
01832 }
01833
01834
01835
01836
01837
01838 template <ACE_SYNCH_DECL> int
01839 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01840 ACE_Time_Value *timeout)
01841 {
01842 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01843 int queue_count = 0;
01844 ACE_Notification_Strategy *notifier = 0;
01845 {
01846 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01847
01848 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01849 {
01850 errno = ESHUTDOWN;
01851 return -1;
01852 }
01853
01854 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01855 return -1;
01856
01857 queue_count = this->enqueue_deadline_i (new_item);
01858
01859 if (queue_count == -1)
01860 return -1;
01861
01862 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01863 this->monitor_->receive (this->cur_length_);
01864 #endif
01865 notifier = this->notification_strategy_;
01866 }
01867 if (0 != notifier)
01868 notifier->notify ();
01869 return queue_count;
01870 }
01871
01872 template <ACE_SYNCH_DECL> int
01873 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01874 ACE_Time_Value *timeout)
01875 {
01876 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01877 return this->enqueue_prio (new_item, timeout);
01878 }
01879
01880
01881
01882
01883 template <ACE_SYNCH_DECL> int
01884 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01885 ACE_Time_Value *timeout)
01886 {
01887 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01888 int queue_count = 0;
01889 ACE_Notification_Strategy *notifier = 0;
01890 {
01891 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01892
01893 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01894 {
01895 errno = ESHUTDOWN;
01896 return -1;
01897 }
01898
01899 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01900 return -1;
01901
01902 queue_count = this->enqueue_tail_i (new_item);
01903
01904 if (queue_count == -1)
01905 return -1;
01906
01907 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01908 this->monitor_->receive (this->cur_length_);
01909 #endif
01910 notifier = this->notification_strategy_;
01911 }
01912 if (0 != notifier)
01913 notifier->notify ();
01914 return queue_count;
01915 }
01916
01917
01918
01919
01920
01921 template <ACE_SYNCH_DECL> int
01922 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01923 ACE_Time_Value *timeout)
01924 {
01925 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01926 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01927
01928 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01929 {
01930 errno = ESHUTDOWN;
01931 return -1;
01932 }
01933
01934 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01935 return -1;
01936
01937 return this->dequeue_head_i (first_item);
01938 }
01939
01940
01941
01942
01943
01944 template <ACE_SYNCH_DECL> int
01945 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01946 ACE_Time_Value *timeout)
01947 {
01948 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01949 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01950
01951 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01952 {
01953 errno = ESHUTDOWN;
01954 return -1;
01955 }
01956
01957 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01958 return -1;
01959
01960 return this->dequeue_prio_i (dequeued);
01961 }
01962
01963
01964
01965
01966
01967 template <ACE_SYNCH_DECL> int
01968 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01969 ACE_Time_Value *timeout)
01970 {
01971 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01972 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01973
01974 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01975 {
01976 errno = ESHUTDOWN;
01977 return -1;
01978 }
01979
01980 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01981 return -1;
01982
01983 return this->dequeue_tail_i (dequeued);
01984 }
01985
01986
01987
01988
01989
01990 template <ACE_SYNCH_DECL> int
01991 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01992 ACE_Time_Value *timeout)
01993 {
01994 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01995 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01996
01997 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01998 {
01999 errno = ESHUTDOWN;
02000 return -1;
02001 }
02002
02003 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
02004 return -1;
02005
02006 return this->dequeue_deadline_i (dequeued);
02007 }
02008
02009 template <ACE_SYNCH_DECL> int
02010 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
02011 {
02012 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
02013
02014
02015 if (this->notification_strategy_ == 0)
02016 return 0;
02017 else
02018 return this->notification_strategy_->notify ();
02019 }
02020
02021
02022 template <ACE_SYNCH_DECL>
02023 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
02024 size_t hwm,
02025 size_t lwm,
02026 ACE_Notification_Strategy *ns)
02027 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02028 pending_head_ (0),
02029 pending_tail_ (0),
02030 late_head_ (0),
02031 late_tail_ (0),
02032 beyond_late_head_ (0),
02033 beyond_late_tail_ (0),
02034 message_strategy_ (message_strategy)
02035 {
02036
02037
02038
02039 }
02040
02041
02042
02043 template <ACE_SYNCH_DECL>
02044 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
02045 {
02046 delete &this->message_strategy_;
02047 }
02048
02049 template <ACE_SYNCH_DECL> int
02050 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
02051 ACE_Message_Block *&list_tail,
02052 u_int status_flags)
02053 {
02054
02055 list_head = 0;
02056 list_tail = 0;
02057
02058
02059 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02060
02061
02062 int result = this->refresh_queue (current_time);
02063 if (result < 0)
02064 return result;
02065
02066 if (ACE_BIT_ENABLED (status_flags,
02067 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
02068 && this->pending_head_
02069 && this->pending_tail_)
02070 {
02071
02072 if (this->pending_head_->prev ())
02073 {
02074 this->tail_ = this->pending_head_->prev ();
02075 this->pending_head_->prev ()->next (0);
02076 }
02077 else
02078 {
02079
02080 this->head_ = 0;
02081 this->tail_ = 0;
02082 }
02083
02084
02085 list_head = this->pending_head_;
02086 list_tail = this->pending_tail_;
02087
02088
02089 this->pending_head_->prev (0);
02090 this->pending_head_ = 0;
02091 this->pending_tail_ = 0;
02092 }
02093
02094 if (ACE_BIT_ENABLED (status_flags,
02095 (u_int) ACE_Dynamic_Message_Strategy::LATE)
02096 && this->late_head_
02097 && this->late_tail_)
02098 {
02099
02100
02101 if (this->late_tail_->next ())
02102 this->late_tail_->next ()->prev (this->late_head_->prev ());
02103 else
02104 this->tail_ = this->late_head_->prev ();
02105
02106 if (this->late_head_->prev ())
02107 this->late_head_->prev ()->next (this->late_tail_->next ());
02108 else
02109 this->head_ = this->late_tail_->next ();
02110
02111
02112 this->late_head_->prev (list_tail);
02113 if (list_tail)
02114 list_tail->next (this->late_head_);
02115 else
02116 list_head = this->late_head_;
02117
02118 list_tail = this->late_tail_;
02119
02120 this->late_tail_->next (0);
02121 this->late_head_ = 0;
02122 this->late_tail_ = 0;
02123 }
02124
02125 if (ACE_BIT_ENABLED (status_flags,
02126 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
02127 && this->beyond_late_head_
02128 && this->beyond_late_tail_)
02129 {
02130
02131 if (this->beyond_late_tail_->next ())
02132 {
02133 this->head_ = this->beyond_late_tail_->next ();
02134 this->beyond_late_tail_->next ()->prev (0);
02135 }
02136 else
02137 {
02138
02139 this->head_ = 0;
02140 this->tail_ = 0;
02141 }
02142
02143
02144
02145 if (list_tail)
02146 {
02147 this->beyond_late_head_->prev (list_tail);
02148 list_tail->next (this->beyond_late_head_);
02149 }
02150 else
02151 list_head = this->beyond_late_head_;
02152
02153 list_tail = this->beyond_late_tail_;
02154
02155 this->beyond_late_tail_->next (0);
02156 this->beyond_late_head_ = 0;
02157 this->beyond_late_tail_ = 0;
02158 }
02159
02160
02161 ACE_Message_Block *temp1;
02162
02163 for (temp1 = list_head;
02164 temp1 != 0;
02165 temp1 = temp1->next ())
02166 {
02167 --this->cur_count_;
02168
02169 size_t mb_bytes = 0;
02170 size_t mb_length = 0;
02171 temp1->total_size_and_length (mb_bytes,
02172 mb_length);
02173
02174 this->cur_bytes_ -= mb_bytes;
02175 this->cur_length_ -= mb_length;
02176 }
02177
02178 return result;
02179 }
02180
02181
02182
02183
02184
02185
02186
02187
02188
02189 template <ACE_SYNCH_DECL> int
02190 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
02191 ACE_Time_Value *timeout)
02192 {
02193 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02194
02195 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02196
02197 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02198 {
02199 errno = ESHUTDOWN;
02200 return -1;
02201 }
02202
02203 int result;
02204
02205
02206 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02207
02208
02209 result = this->refresh_queue (current_time);
02210 if (result < 0)
02211 return result;
02212
02213
02214 result = this->wait_not_empty_cond (ace_mon, timeout);
02215 if (result == -1)
02216 return result;
02217
02218
02219
02220
02221 result = this->dequeue_head_i (first_item);
02222
02223 return result;
02224 }
02225
02226
02227
02228
02229 template <ACE_SYNCH_DECL> void
02230 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
02231 {
02232 #if defined (ACE_HAS_DUMP)
02233 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02234 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02235
02236 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class):\n")));
02237 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02238
02239 ACE_DEBUG ((LM_DEBUG,
02240 ACE_TEXT ("pending_head_ = %u\n")
02241 ACE_TEXT ("pending_tail_ = %u\n")
02242 ACE_TEXT ("late_head_ = %u\n")
02243 ACE_TEXT ("late_tail_ = %u\n")
02244 ACE_TEXT ("beyond_late_head_ = %u\n")
02245 ACE_TEXT ("beyond_late_tail_ = %u\n"),
02246 this->pending_head_,
02247 this->pending_tail_,
02248 this->late_head_,
02249 this->late_tail_,
02250 this->beyond_late_head_,
02251 this->beyond_late_tail_));
02252
02253 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ :\n")));
02254 message_strategy_.dump ();
02255
02256 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02257 #endif
02258 }
02259
02260
02261 template <ACE_SYNCH_DECL> int
02262 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02263 {
02264 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02265
02266 if (new_item == 0)
02267 {
02268 return -1;
02269 }
02270
02271 int result = 0;
02272
02273
02274 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02275
02276
02277
02278 result = this->refresh_queue (current_time);
02279
02280 if (result < 0)
02281 {
02282 return result;
02283 }
02284
02285
02286 switch (message_strategy_.priority_status (*new_item,
02287 current_time))
02288 {
02289 case ACE_Dynamic_Message_Strategy::PENDING:
02290 if (this->pending_tail_ == 0)
02291 {
02292
02293
02294
02295 pending_head_ = new_item;
02296 pending_tail_ = pending_head_;
02297 return this->enqueue_tail_i (new_item);
02298 }
02299 else
02300 {
02301
02302
02303 result = sublist_enqueue_i (new_item,
02304 current_time,
02305 this->pending_head_,
02306 this->pending_tail_,
02307 ACE_Dynamic_Message_Strategy::PENDING);
02308 }
02309 break;
02310
02311 case ACE_Dynamic_Message_Strategy::LATE:
02312 if (this->late_tail_ == 0)
02313 {
02314 late_head_ = new_item;
02315 late_tail_ = late_head_;
02316
02317 if (this->pending_head_ == 0)
02318
02319
02320
02321 return this->enqueue_tail_i (new_item);
02322 else if (this->beyond_late_tail_ == 0)
02323
02324
02325 return this->enqueue_head_i (new_item);
02326 else
02327 {
02328
02329
02330
02331 this->beyond_late_tail_->next (new_item);
02332 new_item->prev (this->beyond_late_tail_);
02333 this->pending_head_->prev (new_item);
02334 new_item->next (this->pending_head_);
02335 }
02336 }
02337 else
02338 {
02339
02340
02341 result = sublist_enqueue_i (new_item,
02342 current_time,
02343 this->late_head_,
02344 this->late_tail_,
02345 ACE_Dynamic_Message_Strategy::LATE);
02346 }
02347 break;
02348
02349 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02350 if (this->beyond_late_tail_ == 0)
02351 {
02352
02353
02354
02355 beyond_late_head_ = new_item;
02356 beyond_late_tail_ = beyond_late_head_;
02357 return this->enqueue_head_i (new_item);
02358 }
02359 else
02360 {
02361
02362
02363
02364 if (this->beyond_late_tail_->next ())
02365 {
02366 this->beyond_late_tail_->next ()->prev (new_item);
02367 }
02368 else
02369 {
02370 this->tail_ = new_item;
02371 }
02372
02373 new_item->next (this->beyond_late_tail_->next ());
02374 this->beyond_late_tail_->next (new_item);
02375 new_item->prev (this->beyond_late_tail_);
02376 this->beyond_late_tail_ = new_item;
02377 }
02378
02379 break;
02380
02381
02382 default:
02383 result = -1;
02384 break;
02385 }
02386
02387 if (result < 0)
02388 {
02389 return result;
02390 }
02391
02392 size_t mb_bytes = 0;
02393 size_t mb_length = 0;
02394 new_item->total_size_and_length (mb_bytes,
02395 mb_length);
02396 this->cur_bytes_ += mb_bytes;
02397 this->cur_length_ += mb_length;
02398 ++this->cur_count_;
02399
02400 if (this->signal_dequeue_waiters () == -1)
02401 {
02402 return -1;
02403 }
02404 else
02405 {
02406 return ACE_Utils::truncate_cast<int> (this->cur_count_);
02407 }
02408 }
02409
02410
02411
02412
02413
02414
02415
02416 template <ACE_SYNCH_DECL> int
02417 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02418 const ACE_Time_Value ¤t_time,
02419 ACE_Message_Block *&sublist_head,
02420 ACE_Message_Block *&sublist_tail,
02421 ACE_Dynamic_Message_Strategy::Priority_Status status)
02422 {
02423 int result = 0;
02424 ACE_Message_Block *current_item = 0;
02425
02426
02427
02428 for (current_item = sublist_tail;
02429 current_item;
02430 current_item = current_item->prev ())
02431 {
02432 if (message_strategy_.priority_status (*current_item, current_time) == status)
02433 {
02434 if (current_item->msg_priority () >= new_item->msg_priority ())
02435 break;
02436 }
02437 else
02438 {
02439 sublist_head = new_item;
02440 break;
02441 }
02442 }
02443
02444 if (current_item == 0)
02445 {
02446
02447
02448 new_item->prev (0);
02449 new_item->next (this->head_);
02450 if (this->head_ != 0)
02451 this->head_->prev (new_item);
02452 else
02453 {
02454 this->tail_ = new_item;
02455 sublist_tail = new_item;
02456 }
02457 this->head_ = new_item;
02458 sublist_head = new_item;
02459 }
02460 else
02461 {
02462
02463 new_item->next (current_item->next ());
02464 new_item->prev (current_item);
02465
02466 if (current_item->next ())
02467 current_item->next ()->prev (new_item);
02468 else
02469 this->tail_ = new_item;
02470
02471 current_item->next (new_item);
02472
02473
02474
02475 if (current_item == sublist_tail)
02476 sublist_tail = new_item;
02477 }
02478
02479 return result;
02480 }
02481
02482
02483
02484
02485 template <ACE_SYNCH_DECL> int
02486 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02487 {
02488 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02489
02490 int result = 0;
02491 int last_in_subqueue = 0;
02492
02493
02494 if (this->pending_head_)
02495 {
02496 first_item = this->pending_head_;
02497
02498 if (0 == this->pending_head_->prev ())
02499 this->head_ = this->pending_head_->next ();
02500 else
02501 this->pending_head_->prev ()->next (this->pending_head_->next ());
02502
02503 if (0 == this->pending_head_->next ())
02504 {
02505 this->tail_ = this->pending_head_->prev ();
02506 this->pending_head_ = 0;
02507 this->pending_tail_ = 0;
02508 }
02509 else
02510 {
02511 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02512 this->pending_head_ = this->pending_head_->next ();
02513 }
02514
02515 first_item->prev (0);
02516 first_item->next (0);
02517 }
02518
02519
02520 else if (this->late_head_)
02521 {
02522 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02523
02524 first_item = this->late_head_;
02525
02526 if (0 == this->late_head_->prev ())
02527 this->head_ = this->late_head_->next ();
02528 else
02529 this->late_head_->prev ()->next (this->late_head_->next ());
02530
02531 if (0 == this->late_head_->next ())
02532 this->tail_ = this->late_head_->prev ();
02533 else
02534 {
02535 this->late_head_->next ()->prev (this->late_head_->prev ());
02536 this->late_head_ = this->late_head_->next ();
02537 }
02538
02539 if (last_in_subqueue)
02540 {
02541 this->late_head_ = 0;
02542 this->late_tail_ = 0;
02543 }
02544
02545 first_item->prev (0);
02546 first_item->next (0);
02547 }
02548
02549 else if (this->beyond_late_head_)
02550 {
02551 last_in_subqueue =
02552 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02553
02554 first_item = this->beyond_late_head_;
02555 this->head_ = this->beyond_late_head_->next ();
02556
02557 if (0 == this->beyond_late_head_->next ())
02558 {
02559 this->tail_ = this->beyond_late_head_->prev ();
02560 }
02561 else
02562 {
02563 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02564 this->beyond_late_head_ = this->beyond_late_head_->next ();
02565 }
02566
02567 if (last_in_subqueue)
02568 {
02569 this->beyond_late_head_ = 0;
02570 this->beyond_late_tail_ = 0;
02571 }
02572
02573 first_item->prev (0);
02574 first_item->next (0);
02575 }
02576 else
02577 {
02578
02579 first_item = 0;
02580 result = -1;
02581 }
02582
02583 if (result < 0)
02584 {
02585 return result;
02586 }
02587
02588 size_t mb_bytes = 0;
02589 size_t mb_length = 0;
02590 first_item->total_size_and_length (mb_bytes,
02591 mb_length);
02592
02593 this->cur_bytes_ -= mb_bytes;
02594 this->cur_length_ -= mb_length;
02595 --this->cur_count_;
02596
02597
02598
02599 if (this->cur_bytes_ <= this->low_water_mark_
02600 && this->signal_enqueue_waiters () == -1)
02601 {
02602 return -1;
02603 }
02604 else
02605 {
02606 return ACE_Utils::truncate_cast<int> (this->cur_count_);
02607 }
02608 }
02609
02610
02611
02612
02613
02614
02615
02616 template <ACE_SYNCH_DECL> int
02617 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time)
02618 {
02619 int result;
02620
02621 result = refresh_pending_queue (current_time);
02622
02623 if (result != -1)
02624 result = refresh_late_queue (current_time);
02625
02626 return result;
02627 }
02628
02629
02630
02631
02632 template <ACE_SYNCH_DECL> int
02633 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time)
02634 {
02635 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02636
02637
02638 if (this->pending_head_)
02639 {
02640 current_status = message_strategy_.priority_status (*this->pending_head_,
02641 current_time);
02642 switch (current_status)
02643 {
02644 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02645
02646
02647 this->beyond_late_head_ = this->head_;
02648
02649
02650
02651 this->late_head_ = 0;
02652 this->late_tail_ = 0;
02653
02654
02655 do
02656 {
02657 this->pending_head_ = this->pending_head_->next ();
02658
02659 if (this->pending_head_)
02660 current_status = message_strategy_.priority_status (*this->pending_head_,
02661 current_time);
02662 else
02663 break;
02664
02665 }
02666 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02667
02668 if (this->pending_head_)
02669 {
02670
02671 this->beyond_late_tail_ = this->pending_head_->prev ();
02672
02673 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02674
02675 break;
02676 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02677 {
02678
02679 ACE_ERROR_RETURN ((LM_ERROR,
02680 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02681 (int) current_status),
02682 -1);
02683 }
02684
02685 }
02686 else
02687 {
02688
02689
02690 this->beyond_late_tail_ = this->tail_;
02691 this->pending_head_ = 0;
02692 this->pending_tail_ = 0;
02693 break;
02694 }
02695
02696 case ACE_Dynamic_Message_Strategy::LATE:
02697
02698
02699
02700 if (this->late_head_ == 0)
02701 this->late_head_ = this->pending_head_;
02702
02703
02704 do
02705 {
02706 this->pending_head_ = this->pending_head_->next ();
02707
02708 if (this->pending_head_)
02709 current_status = message_strategy_.priority_status (*this->pending_head_,
02710 current_time);
02711 else
02712 break;
02713
02714 }
02715 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02716
02717 if (this->pending_head_)
02718 {
02719 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02720
02721 ACE_ERROR_RETURN((LM_ERROR,
02722 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02723 (int) current_status),
02724 -1);
02725
02726
02727 this->late_tail_ = this->pending_head_->prev ();
02728 }
02729 else
02730 {
02731
02732 this->late_tail_ = this->tail_;
02733 this->pending_head_ = 0;
02734 this->pending_tail_ = 0;
02735 }
02736
02737 break;
02738 case ACE_Dynamic_Message_Strategy::PENDING:
02739
02740 break;
02741 default:
02742
02743 ACE_ERROR_RETURN((LM_ERROR,
02744 ACE_TEXT ("Unknown message priority status [%d]"),
02745 (int) current_status),
02746 -1);
02747 }
02748 }
02749 return 0;
02750 }
02751
02752
02753
02754
02755 template <ACE_SYNCH_DECL> int
02756 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time)
02757 {
02758 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02759
02760 if (this->late_head_)
02761 {
02762 current_status = message_strategy_.priority_status (*this->late_head_,
02763 current_time);
02764 switch (current_status)
02765 {
02766 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02767
02768
02769
02770 this->beyond_late_head_ = this->head_;
02771
02772
02773 do
02774 {
02775 this->late_head_ = this->late_head_->next ();
02776
02777 if (this->late_head_)
02778 current_status = message_strategy_.priority_status (*this->late_head_,
02779 current_time);
02780 else
02781 break;
02782
02783 }
02784 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02785
02786 if (this->late_head_)
02787 {
02788
02789 this->beyond_late_tail_ = this->late_head_->prev ();
02790
02791 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02792 {
02793
02794 this->late_head_ = 0;
02795 this->late_tail_ = 0;
02796 }
02797 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02798
02799 ACE_ERROR_RETURN ((LM_ERROR,
02800 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02801 (int) current_status),
02802 -1);
02803 }
02804 else
02805 {
02806
02807 this->beyond_late_tail_ = this->tail_;
02808 this->late_head_ = 0;
02809 this->late_tail_ = 0;
02810 }
02811
02812 break;
02813
02814 case ACE_Dynamic_Message_Strategy::LATE:
02815
02816 break;
02817
02818 case ACE_Dynamic_Message_Strategy::PENDING:
02819
02820 ACE_ERROR_RETURN ((LM_ERROR,
02821 ACE_TEXT ("Unexpected message priority status ")
02822 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02823 (int) current_status),
02824 -1);
02825 default:
02826
02827 ACE_ERROR_RETURN ((LM_ERROR,
02828 ACE_TEXT ("Unknown message priority status [%d]"),
02829 (int) current_status),
02830 -1);
02831 }
02832 }
02833
02834 return 0;
02835 }
02836
02837
02838
02839
02840 template <ACE_SYNCH_DECL> int
02841 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02842 ACE_Time_Value *timeout)
02843 {
02844 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02845 timeout);
02846 }
02847
02848
02849
02850
02851 template <ACE_SYNCH_DECL> int
02852 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02853 ACE_Time_Value *timeout)
02854 {
02855 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02856 return this->enqueue_prio (new_item, timeout);
02857 }
02858
02859
02860
02861
02862
02863
02864 template <ACE_SYNCH_DECL> int
02865 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02866 ACE_Time_Value *timeout)
02867 {
02868 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02869 return this->enqueue_prio (new_item, timeout);
02870 }
02871
02872
02873
02874
02875
02876
02877 template <ACE_SYNCH_DECL>
02878 ACE_Message_Queue<ACE_SYNCH_USE> *
02879 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02880 size_t lwm,
02881 ACE_Notification_Strategy *ns)
02882 {
02883 ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02884
02885 ACE_NEW_RETURN (tmp,
02886 ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02887 0);
02888 return tmp;
02889 }
02890
02891
02892
02893 template <ACE_SYNCH_DECL>
02894 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02895 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02896 size_t lwm,
02897 ACE_Notification_Strategy *ns,
02898 u_long static_bit_field_mask,
02899 u_long static_bit_field_shift,
02900 u_long dynamic_priority_max,
02901 u_long dynamic_priority_offset)
02902 {
02903 ACE_Deadline_Message_Strategy *adms = 0;
02904
02905 ACE_NEW_RETURN (adms,
02906 ACE_Deadline_Message_Strategy (static_bit_field_mask,
02907 static_bit_field_shift,
02908 dynamic_priority_max,
02909 dynamic_priority_offset),
02910 0);
02911
02912 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02913 ACE_NEW_RETURN (tmp,
02914 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02915 0);
02916 return tmp;
02917 }
02918
02919
02920
02921
02922 template <ACE_SYNCH_DECL>
02923 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02924 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02925 size_t lwm,
02926 ACE_Notification_Strategy *ns,
02927 u_long static_bit_field_mask,
02928 u_long static_bit_field_shift,
02929 u_long dynamic_priority_max,
02930 u_long dynamic_priority_offset)
02931 {
02932 ACE_Laxity_Message_Strategy *alms = 0;
02933
02934 ACE_NEW_RETURN (alms,
02935 ACE_Laxity_Message_Strategy (static_bit_field_mask,
02936 static_bit_field_shift,
02937 dynamic_priority_max,
02938 dynamic_priority_offset),
02939 0);
02940
02941 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02942 ACE_NEW_RETURN (tmp,
02943 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02944 0);
02945 return tmp;
02946 }
02947
02948
02949
02950
02951 #if defined (ACE_VXWORKS)
02952
02953
02954 template <ACE_SYNCH_DECL>
02955 ACE_Message_Queue_Vx *
02956 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02957 size_t max_message_length,
02958 ACE_Notification_Strategy *ns)
02959 {
02960 ACE_Message_Queue_Vx *tmp = 0;
02961
02962 ACE_NEW_RETURN (tmp,
02963 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02964 0);
02965 return tmp;
02966 }
02967 #endif
02968
02969 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
02970
02971 template <ACE_SYNCH_DECL>
02972 ACE_Message_Queue_NT *
02973 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02974 {
02975 ACE_Message_Queue_NT *tmp = 0;
02976
02977 ACE_NEW_RETURN (tmp,
02978 ACE_Message_Queue_NT (max_threads),
02979 0);
02980 return tmp;
02981 }
02982
02983 #endif
02984
02985 ACE_END_VERSIONED_NAMESPACE_DECL
02986
02987 #endif