Generic definitions for the Transport class. More...
#include <Transport.h>


Classes | |
| struct | Drain_Result |
Public Types | |
| enum | Drain_Result_Enum { DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, DR_WOULDBLOCK = 2 } |
Public Member Functions | |
| TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE) | |
| Default creator, requires the tag value be supplied. | |
| virtual | ~TAO_Transport (void) |
| Destructor. | |
| CORBA::ULong | tag (void) const |
| Return the protocol tag. | |
| TAO_ORB_Core * | orb_core (void) const |
| Access the ORB that owns this connection. | |
| TAO_Transport_Mux_Strategy * | tms (void) const |
| Get the TAO_Tranport_Mux_Strategy used by this object. | |
| TAO_Wait_Strategy * | wait_strategy (void) const |
| Return the TAO_Wait_Strategy used by this object. | |
| Drain_Result | handle_output (TAO::Transport::Drain_Constraints const &c) |
| Callback method to reactively drain the outgoing data queue. | |
| int | bidirectional_flag (void) const |
| Get the bidirectional flag. | |
| void | bidirectional_flag (int flag) |
| Set the bidirectional flag. | |
| void | cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry) |
| Set the Cache Map entry. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry (void) |
| Get the Cache Map entry. | |
| size_t | id (void) const |
| Set and Get the identifier for this transport instance. | |
| void | id (size_t id) |
| TAO::Connection_Role | opened_as (void) const |
| void | opened_as (TAO::Connection_Role) |
| unsigned long | purging_order (void) const |
| void | purging_order (unsigned long value) |
| bool | queue_is_empty (void) |
| Check if there are messages pending in the queue. | |
| bool | register_if_necessary (void) |
| Register with the reactor via the wait strategy. | |
| void | provide_handler (TAO::Connection_Handler_Set &handlers) |
| Added event handler to the handlers set. | |
| bool | provide_blockable_handler (TAO::Connection_Handler_Set &handlers) |
| virtual int | register_handler (void) |
| Register the handler with the reactor. | |
| virtual ssize_t | send (iovec *iov, int iovcnt, size_t &bytes_transferred, ACE_Time_Value const *timeout)=0 |
| Write the complete Message_Block chain to the connection. | |
| virtual ssize_t | recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0 |
| Read len bytes from into buf. | |
Control connection lifecycle | |
| bool | idle_after_send (void) |
| bool | idle_after_reply (void) |
| virtual void | close_connection (void) |
| Call the implementation method after obtaining the lock. | |
Template methods | |
The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below. | |
| class | TAO_Reactive_Flushing_Strategy |
| class | TAO_Leader_Follower_Flushing_Strategy |
| class | TAO_Thread_Per_Connection_Handler |
| CORBA::ULong const | tag_ |
| IOP protocol tag. | |
| TAO_ORB_Core *const | orb_core_ |
| Global orbcore resource. | |
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | cache_map_entry_ |
| TAO_Transport_Mux_Strategy * | tms_ |
| TAO_Wait_Strategy * | ws_ |
| Strategy for waiting for the reply after sending the request. | |
| int | bidirectional_flag_ |
| TAO::Connection_Role | opening_connection_role_ |
| TAO_Queued_Message * | head_ |
| Implement the outgoing data queue. | |
| TAO_Queued_Message * | tail_ |
| TAO_Incoming_Message_Queue | incoming_message_queue_ |
| Queue of the consolidated, incoming messages.. | |
| TAO::Incoming_Message_Stack | incoming_message_stack_ |
| ACE_Time_Value | current_deadline_ |
| long | flush_timer_id_ |
| The timer ID. | |
| TAO_Transport_Timer | transport_timer_ |
| The adapter used to receive timeout callbacks from the Reactor. | |
| ACE_Lock * | handler_lock_ |
| size_t | id_ |
| A unique identifier for the transport. | |
| unsigned long | purging_order_ |
| Used by the LRU, LFU and FIFO Connection Purging Strategies. | |
| size_t | recv_buffer_size_ |
| Size of the buffer received. | |
| size_t | sent_byte_count_ |
| Number of bytes sent. | |
| bool | is_connected_ |
| TAO_GIOP_Message_Base * | messaging_object_ |
| Our messaging object. | |
| TAO_Codeset_Translator_Base * | char_translator_ |
| Additional member values required to support codeset translation. | |
| TAO_Codeset_Translator_Base * | wchar_translator_ |
| CORBA::Boolean | tcs_set_ |
| bool | first_request_ |
| ACE_Message_Block * | partial_message_ |
| Holds the partial GIOP message (if there is one). | |
| TAO::Transport::Stats * | stats_ |
| Statistics. | |
| bool | flush_in_post_open_ |
| Indicate that flushing needs to be done in post_open(). | |
| TAO_SYNCH_MUTEX | output_cdr_mutex_ |
| lock for synchronizing Transport OutputCDR access | |
| void | messaging_init (TAO_GIOP_Message_Version const &version) |
| virtual int | tear_listen_point_list (TAO_InputCDR &cdr) |
| virtual bool | post_connect_hook (void) |
| Hooks that can be overridden in concrete transports. | |
| ACE_Event_Handler::Reference_Count | add_reference (void) |
| Memory management routines. | |
| ACE_Event_Handler::Reference_Count | remove_reference (void) |
| TAO_GIOP_Message_Base * | messaging_object (void) |
| virtual ACE_Event_Handler * | event_handler_i (void)=0 |
| bool | is_connected (void) const |
| Is this transport really connected. | |
| bool | post_open (size_t id) |
| Perform all the actions when this transport get opened. | |
| void | pre_close (void) |
| do what needs to be done when closing the transport | |
| TAO_Connection_Handler * | connection_handler (void) |
| Get the connection handler for this transport. | |
| TAO_OutputCDR & | out_stream (void) |
| Accessor for the output CDR stream. | |
| TAO_SYNCH_MUTEX & | output_cdr_lock (void) |
| Accessor for synchronizing Transport OutputCDR access. | |
| void | set_flush_in_post_open (void) |
| Set the flush in post open flag. | |
| bool | can_be_purged (void) |
| Can the transport be purged? | |
| virtual void | set_bidir_context_info (TAO_Operation_Details &opdetails) |
| int | generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output) |
| virtual int | generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg) |
| int | recache_transport (TAO_Transport_Descriptor_Interface *desc) |
| Recache ourselves in the cache. | |
| virtual int | handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0) |
| Callback to read incoming data. | |
| virtual int | send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0 |
| virtual int | send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_Message_Semantics message_semantics=TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0 |
| virtual int | send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| Sent the contents of message_block. | |
| int | format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time, TAO_Stub *stub) |
| int | send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0) |
| int | send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, TAO::Transport::Drain_Constraints const &dc) |
| Send a message block chain, assuming the lock is held. | |
| int | purge_entry (void) |
| Cache management. | |
| int | make_idle (void) |
| Cache management. | |
| int | update_transport (void) |
| Cache management. | |
| int | handle_timeout (const ACE_Time_Value ¤t_time, const void *act) |
| size_t | recv_buffer_size (void) const |
| Accessor to recv_buffer_size_. | |
| size_t | sent_byte_count (void) const |
| Accessor to sent_byte_count_. | |
| TAO_Codeset_Translator_Base * | char_translator (void) const |
| CodeSet Negotiation - Get the char codeset translator factory. | |
| TAO_Codeset_Translator_Base * | wchar_translator (void) const |
| CodeSet Negotiation - Get the wchar codeset translator factory. | |
| void | char_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the char codeset translator factory. | |
| void | wchar_translator (TAO_Codeset_Translator_Base *) |
| CodeSet negotiation - Set the wchar codeset translator factory. | |
| void | assign_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| void | clear_translators (TAO_InputCDR *, TAO_OutputCDR *) |
| CORBA::Boolean | is_tcs_set () const |
| Return true if the tcs has been set. | |
| void | first_request_sent (bool flag=false) |
| Set the state of the first_request_ to flag. | |
| bool | first_request () const |
| Get the first request flag. | |
| void | send_connection_closed_notifications (void) |
| TAO::Transport::Stats * | stats (void) const |
| Transport statistics. | |
| virtual TAO_Connection_Handler * | connection_handler_i (void)=0 |
| int | process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| int | send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true) |
| ACE_Time_Value const * | io_timeout (TAO::Transport::Drain_Constraints const &dc) const |
| Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies. | |
| TAO::Transport_Cache_Manager & | transport_cache_manager (void) |
| Helper method that returns the Transport Cache Manager. | |
| Drain_Result | drain_queue (TAO::Transport::Drain_Constraints const &dc) |
| Send some of the data in the queue. | |
| Drain_Result | drain_queue_i (TAO::Transport::Drain_Constraints const &dc) |
| Implement drain_queue() assuming the lock is held. | |
| bool | queue_is_empty_i (void) const |
| Check if there are messages pending in the queue. | |
| Drain_Result | drain_queue_helper (int &iovcnt, iovec iov[], TAO::Transport::Drain_Constraints const &dc) |
| A helper routine used in drain_queue_i(). | |
| int | schedule_output_i (void) |
| Schedule handle_output() callbacks. | |
| int | cancel_output_i (void) |
| Cancel handle_output() callbacks. | |
| void | cleanup_queue (size_t byte_count) |
| Cleanup the queue. | |
| void | cleanup_queue_i () |
| Cleanup the complete queue. | |
| int | check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush) |
| Check if the buffering constraints have been reached. | |
| int | send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) |
| int | send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time) |
| int | flush_timer_pending (void) const |
| Check if the flush timer is still pending. | |
| void | reset_flush_timer (void) |
| void | report_invalid_event_handler (const char *caller) |
| Print out error messages if the event handler is not valid. | |
| int | handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data) |
| int | handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) |
| int | handle_input_parse_extra_messages (ACE_Message_Block &message_block) |
| int | consolidate_enqueue_message (TAO_Queued_Data *qd) |
| int | consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) |
| int | process_queue_head (TAO_Resume_Handle &rh) |
| int | notify_reactor (void) |
| void | send_connection_closed_notifications_i (void) |
| Assume the lock is held. | |
| void | allocate_partial_message_block (void) |
| bool | using_blocking_io_for_synch_messages () const |
| bool | using_blocking_io_for_asynch_messages () const |
Generic definitions for the Transport class.
The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!
The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.
One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.
Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.
TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.
Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.
One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.
Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.
Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.
The outgoing data path consist in several components:
The Transport object provides a single method to send request messages (send_request_message ()).
One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are
The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.
To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages
We solve the problems as follows
(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.
(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.
(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.
We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.
The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".
See Also:
Definition at line 321 of file Transport.h.
Definition at line 367 of file Transport.h.
00368 { 00369 DR_ERROR = -1, 00370 DR_OK = 0, 00371 DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue() 00372 DR_WOULDBLOCK = 2 00373 };
| TAO_Transport::TAO_Transport | ( | CORBA::ULong | tag, | |
| TAO_ORB_Core * | orb_core, | |||
| size_t | input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE | |||
| ) |
Default creator, requires the tag value be supplied.
| TAO_Transport::~TAO_Transport | ( | void | ) | [virtual] |
Destructor.
Definition at line 202 of file Transport.cpp.
00203 { 00204 delete this->messaging_object_; 00205 00206 delete this->ws_; 00207 00208 delete this->tms_; 00209 00210 delete this->handler_lock_; 00211 00212 if (!this->is_connected_) 00213 { 00214 // When we have a not connected transport we could have buffered 00215 // messages on this transport which we have to cleanup now. 00216 this->cleanup_queue_i(); 00217 } 00218 00219 // Release the partial message block, however we may 00220 // have never allocated one. 00221 ACE_Message_Block::release (this->partial_message_); 00222 00223 // By the time the destructor is reached here all the connection stuff 00224 // *must* have been cleaned up. 00225 00226 // The following assert is needed for the test "Bug_2494_Regression". 00227 // See the bugzilla bug #2494 for details. 00228 ACE_ASSERT (this->queue_is_empty_i ()); 00229 ACE_ASSERT (this->cache_map_entry_ == 0); 00230 00231 #if TAO_HAS_TRANSPORT_CURRENT == 1 00232 delete this->stats_; 00233 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 00234 00235 /* 00236 * Hook to add code that cleans up components 00237 * belong to the concrete protocol implementation. 00238 * Further additions to this Transport class will 00239 * need to add code *before* this hook. 00240 */ 00241 //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK 00242 }
| ACE_Event_Handler::Reference_Count TAO_Transport::add_reference | ( | void | ) |
Memory management routines.
Definition at line 2646 of file Transport.cpp.
02647 { 02648 return this->event_handler_i ()->add_reference (); 02649 }
| void TAO_Transport::allocate_partial_message_block | ( | void | ) | [private] |
Allocate a partial message block and store it in our partial_message_ data member.
Definition at line 2769 of file Transport.cpp.
02770 { 02771 if (this->partial_message_ == 0) 02772 { 02773 // This value must be at least large enough to hold a GIOP message 02774 // header plus a GIOP fragment header 02775 size_t const partial_message_size = 02776 this->messaging_object ()->header_length (); 02777 // + this->messaging_object ()->fragment_header_length (); 02778 // deprecated, conflicts with not-single_read_opt. 02779 02780 ACE_NEW (this->partial_message_, 02781 ACE_Message_Block (partial_message_size)); 02782 } 02783 }
| void TAO_Transport::assign_translators | ( | TAO_InputCDR * | inp, | |
| TAO_OutputCDR * | outp | |||
| ) |
Use the Transport's codeset factories to set the translator for input and output CDRs.
Definition at line 2616 of file Transport.cpp.
02617 { 02618 if (this->char_translator_) 02619 { 02620 this->char_translator_->assign (inp); 02621 this->char_translator_->assign (outp); 02622 } 02623 if (this->wchar_translator_) 02624 { 02625 this->wchar_translator_->assign (inp); 02626 this->wchar_translator_->assign (outp); 02627 } 02628 }
| void TAO_Transport::bidirectional_flag | ( | int | flag | ) |
Set the bidirectional flag.
Definition at line 45 of file Transport.inl.
00046 { 00047 this->bidirectional_flag_ = flag; 00048 }
| int TAO_Transport::bidirectional_flag | ( | void | ) | const |
Get the bidirectional flag.
Definition at line 39 of file Transport.inl.
00040 { 00041 return this->bidirectional_flag_; 00042 }
| TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry | ( | void | ) |
Get the Cache Map entry.
Definition at line 63 of file Transport.inl.
00064 { 00065 return this->cache_map_entry_; 00066 }
| void TAO_Transport::cache_map_entry | ( | TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * | entry | ) |
Set the Cache Map entry.
Definition at line 69 of file Transport.inl.
00071 { 00072 // Sync with TAO_Transport::purge_entry() 00073 ACE_GUARD (ACE_Lock, ace_mon, *this->handler_lock_); 00074 this->cache_map_entry_ = entry; 00075 }
| bool TAO_Transport::can_be_purged | ( | void | ) |
Can the transport be purged?
Definition at line 495 of file Transport.cpp.
00496 { 00497 return !this->tms_->has_request (); 00498 }
| int TAO_Transport::cancel_output_i | ( | void | ) | [private] |
Cancel handle_output() callbacks.
Definition at line 858 of file Transport.cpp.
00859 { 00860 ACE_Event_Handler * const eh = this->event_handler_i (); 00861 ACE_Reactor *const reactor = eh->reactor (); 00862 00863 if (TAO_debug_level > 3) 00864 { 00865 ACE_DEBUG ((LM_DEBUG, 00866 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"), 00867 this->id ())); 00868 } 00869 00870 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00871 }
| void TAO_Transport::char_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the char codeset translator factory.
Definition at line 151 of file Transport.inl.
00152 { 00153 this->char_translator_ = tf; 00154 this->tcs_set_ = 1; 00155 }
| TAO_Codeset_Translator_Base * TAO_Transport::char_translator | ( | void | ) | const |
CodeSet Negotiation - Get the char codeset translator factory.
Definition at line 139 of file Transport.inl.
00140 { 00141 return this->char_translator_; 00142 }
| int TAO_Transport::check_buffering_constraints_i | ( | TAO_Stub * | stub, | |
| bool & | must_flush | |||
| ) | [private] |
Check if the buffering constraints have been reached.
Definition at line 1223 of file Transport.cpp.
01224 { 01225 // First let's compute the size of the queue: 01226 size_t msg_count = 0; 01227 size_t total_bytes = 0; 01228 01229 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) 01230 { 01231 ++msg_count; 01232 total_bytes += i->message_length (); 01233 } 01234 01235 bool set_timer = false; 01236 ACE_Time_Value new_deadline; 01237 01238 TAO::Transport_Queueing_Strategy *queue_strategy = 01239 stub->transport_queueing_strategy (); 01240 01241 bool constraints_reached = true; 01242 01243 if (queue_strategy) 01244 { 01245 constraints_reached = 01246 queue_strategy->buffering_constraints_reached (stub, 01247 msg_count, 01248 total_bytes, 01249 must_flush, 01250 this->current_deadline_, 01251 set_timer, 01252 new_deadline); 01253 } 01254 else 01255 { 01256 must_flush = false; 01257 } 01258 01259 // ... set the new timer, also cancel any previous timers ... 01260 if (set_timer) 01261 { 01262 ACE_Event_Handler *eh = this->event_handler_i (); 01263 ACE_Reactor * const reactor = eh->reactor (); 01264 this->current_deadline_ = new_deadline; 01265 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday (); 01266 01267 if (this->flush_timer_pending ()) 01268 { 01269 reactor->cancel_timer (this->flush_timer_id_); 01270 } 01271 01272 this->flush_timer_id_ = 01273 reactor->schedule_timer (&this->transport_timer_, 01274 &this->current_deadline_, 01275 delay); 01276 } 01277 01278 return constraints_reached; 01279 }
| void TAO_Transport::cleanup_queue | ( | size_t | byte_count | ) | [private] |
Cleanup the queue.
Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.
Definition at line 1176 of file Transport.cpp.
01177 { 01178 while (!this->queue_is_empty_i () && byte_count > 0) 01179 { 01180 TAO_Queued_Message *i = this->head_; 01181 01182 if (TAO_debug_level > 4) 01183 { 01184 ACE_DEBUG ((LM_DEBUG, 01185 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01186 ACE_TEXT ("byte_count = %d\n"), 01187 this->id (), byte_count)); 01188 } 01189 01190 // Update the state of the first message 01191 i->bytes_transferred (byte_count); 01192 01193 if (TAO_debug_level > 4) 01194 { 01195 ACE_DEBUG ((LM_DEBUG, 01196 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ") 01197 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"), 01198 this->id (), byte_count, i->all_data_sent (), 01199 i->message_length ())); 01200 } 01201 01202 // ... if all the data was sent the message must be removed from 01203 // the queue... 01204 if (i->all_data_sent ()) 01205 { 01206 i->remove_from_list (this->head_, this->tail_); 01207 i->destroy (); 01208 } 01209 else if (byte_count == 0) 01210 { 01211 // If we have sent out a full message block, but we are not 01212 // finished with this message, we need to do something with the 01213 // message block chain held by our output stream. If we don't, 01214 // another thread can attempt to service this transport and end 01215 // up resetting the output stream which will release the 01216 // message that we haven't finished sending. 01217 i->copy_if_necessary (this->out_stream ().begin ()); 01218 } 01219 } 01220 }
| void TAO_Transport::cleanup_queue_i | ( | ) | [private] |
Cleanup the complete queue.
Definition at line 1133 of file Transport.cpp.
01134 { 01135 if (TAO_debug_level > 4) 01136 { 01137 ACE_DEBUG ((LM_DEBUG, 01138 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01139 ACE_TEXT ("cleaning up complete queue\n"), 01140 this->id ())); 01141 } 01142 01143 size_t byte_count = 0; 01144 int msg_count = 0; 01145 01146 // Cleanup all messages 01147 while (!this->queue_is_empty_i ()) 01148 { 01149 TAO_Queued_Message *i = this->head_; 01150 01151 if (TAO_debug_level > 4) 01152 { 01153 byte_count += i->message_length(); 01154 ++msg_count; 01155 } 01156 // @@ This is a good point to insert a flag to indicate that a 01157 // CloseConnection message was successfully received. 01158 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, 01159 this->orb_core_->leader_follower ()); 01160 01161 i->remove_from_list (this->head_, this->tail_); 01162 01163 i->destroy (); 01164 } 01165 01166 if (TAO_debug_level > 4) 01167 { 01168 ACE_DEBUG ((LM_DEBUG, 01169 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") 01170 ACE_TEXT ("discarded %d messages, %u bytes.\n"), 01171 this->id (), msg_count, byte_count)); 01172 } 01173 }
| void TAO_Transport::clear_translators | ( | TAO_InputCDR * | inp, | |
| TAO_OutputCDR * | outp | |||
| ) |
It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.
Definition at line 2631 of file Transport.cpp.
02632 { 02633 if (inp) 02634 { 02635 inp->char_translator (0); 02636 inp->wchar_translator (0); 02637 } 02638 if (outp) 02639 { 02640 outp->char_translator (0); 02641 outp->wchar_translator (0); 02642 } 02643 }
| void TAO_Transport::close_connection | ( | void | ) | [virtual] |
Call the implementation method after obtaining the lock.
Definition at line 359 of file Transport.cpp.
00360 { 00361 this->connection_handler_i ()->close_connection (); 00362 }
| TAO_Connection_Handler * TAO_Transport::connection_handler | ( | void | ) |
Get the connection handler for this transport.
Definition at line 191 of file Transport.inl.
00192 { 00193 return this->connection_handler_i(); 00194 }
| virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i | ( | void | ) | [protected, pure virtual] |
These classes need privileged access to:
| int TAO_Transport::consolidate_enqueue_message | ( | TAO_Queued_Data * | qd | ) | [private] |
Definition at line 1790 of file Transport.cpp.
01791 { 01792 // consolidate message on top of stack, only for fragmented messages 01793 01794 // paranoid check 01795 if (q_data->missing_data () != 0) 01796 { 01797 return -1; 01798 } 01799 01800 if (q_data->more_fragments () || 01801 q_data->msg_type () == GIOP::Fragment) 01802 { 01803 TAO_Queued_Data *new_q_data = 0; 01804 01805 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01806 { 01807 case -1: // error 01808 return -1; 01809 01810 case 0: // returning consolidated message in new_q_data 01811 if (!new_q_data) 01812 { 01813 if (TAO_debug_level > 0) 01814 { 01815 ACE_ERROR ((LM_ERROR, 01816 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ") 01817 ACE_TEXT ("error, consolidated message is NULL\n"), 01818 this->id ())); 01819 } 01820 return -1; 01821 } 01822 01823 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0) 01824 { 01825 TAO_Queued_Data::release (new_q_data); 01826 return -1; 01827 } 01828 break; 01829 01830 case 1: // fragment has been stored in messaging_oject() 01831 break; 01832 } 01833 } 01834 else 01835 { 01836 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0) 01837 { 01838 TAO_Queued_Data::release (q_data); 01839 return -1; 01840 } 01841 } 01842 01843 return 0; // success 01844 }
| int TAO_Transport::consolidate_process_message | ( | TAO_Queued_Data * | qd, | |
| TAO_Resume_Handle & | rh | |||
| ) | [private] |
Definition at line 1703 of file Transport.cpp.
01705 { 01706 // paranoid check 01707 if (q_data->missing_data () != 0) 01708 { 01709 if (TAO_debug_level > 0) 01710 { 01711 ACE_ERROR ((LM_ERROR, 01712 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01713 ACE_TEXT ("missing data\n"), 01714 this->id ())); 01715 } 01716 return -1; 01717 } 01718 01719 if (q_data->more_fragments () || 01720 q_data->msg_type () == GIOP::Fragment) 01721 { 01722 // consolidate message on top of stack, only for fragmented messages 01723 TAO_Queued_Data *new_q_data = 0; 01724 01725 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data)) 01726 { 01727 case -1: // error 01728 return -1; 01729 01730 case 0: // returning consolidated message in q_data 01731 if (!new_q_data) 01732 { 01733 if (TAO_debug_level > 0) 01734 { 01735 ACE_ERROR ((LM_ERROR, 01736 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01737 ACE_TEXT ("error, consolidated message is NULL\n"), 01738 this->id ())); 01739 } 01740 return -1; 01741 } 01742 01743 01744 if (this->process_parsed_messages (new_q_data, rh) == -1) 01745 { 01746 TAO_Queued_Data::release (new_q_data); 01747 01748 if (TAO_debug_level > 0) 01749 { 01750 ACE_ERROR ((LM_ERROR, 01751 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01752 ACE_TEXT ("error processing consolidated message\n"), 01753 this->id ())); 01754 } 01755 return -1; 01756 } 01757 01758 TAO_Queued_Data::release (new_q_data); 01759 01760 break; 01761 01762 case 1: // fragment has been stored in messaging_oject() 01763 break; 01764 } 01765 } 01766 else 01767 { 01768 if (this->process_parsed_messages (q_data, rh) == -1) 01769 { 01770 TAO_Queued_Data::release (q_data); 01771 01772 if (TAO_debug_level > 0) 01773 { 01774 ACE_ERROR ((LM_ERROR, 01775 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ") 01776 ACE_TEXT ("error processing message\n"), 01777 this->id ())); 01778 } 01779 return -1; 01780 } 01781 01782 TAO_Queued_Data::release (q_data); 01783 01784 } 01785 01786 return 0; 01787 }
| TAO_Transport::Drain_Result TAO_Transport::drain_queue | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Send some of the data in the queue.
As the outgoing data is drained this method is invoked to send as much of the current message as possible.
Definition at line 915 of file Transport.cpp.
00916 { 00917 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR); 00918 Drain_Result const retval = this->drain_queue_i (dc); 00919 00920 if (retval == DR_QUEUE_EMPTY) 00921 { 00922 // ... there is no current message or it was completely 00923 // sent, cancel output... 00924 TAO_Flushing_Strategy *flushing_strategy = 00925 this->orb_core ()->flushing_strategy (); 00926 00927 flushing_strategy->cancel_output (this); 00928 00929 return DR_OK; 00930 } 00931 00932 return retval; 00933 }
| TAO_Transport::Drain_Result TAO_Transport::drain_queue_helper | ( | int & | iovcnt, | |
| iovec | iov[], | |||
| TAO::Transport::Drain_Constraints const & | dc | |||
| ) | [private] |
A helper routine used in drain_queue_i().
Definition at line 936 of file Transport.cpp.
00938 { 00939 // As a side-effect, this decrements the timeout() pointed-to value by 00940 // the time used in this function. That might be important as there are 00941 // potentially long running system calls invoked from here. 00942 ACE_Countdown_Time countdown(dc.timeout()); 00943 00944 size_t byte_count = 0; 00945 00946 // ... send the message ... 00947 ssize_t retval = -1; 00948 00949 #if TAO_HAS_SENDFILE == 1 00950 if (this->mmap_allocator_) 00951 retval = this->sendfile (this->mmap_allocator_, 00952 iov, 00953 iovcnt, 00954 byte_count, 00955 dc); 00956 else 00957 #endif /* TAO_HAS_SENDFILE==1 */ 00958 retval = this->send (iov, iovcnt, byte_count, 00959 this->io_timeout (dc)); 00960 00961 if (TAO_debug_level == 5) 00962 { 00963 dump_iov (iov, iovcnt, this->id (), 00964 byte_count, ACE_TEXT("drain_queue_helper")); 00965 } 00966 00967 if (retval == 0) 00968 { 00969 if (TAO_debug_level > 4) 00970 { 00971 ACE_DEBUG ((LM_DEBUG, 00972 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00973 ACE_TEXT ("send() returns 0\n"), 00974 this->id ())); 00975 } 00976 return DR_ERROR; 00977 } 00978 else if (retval == -1) 00979 { 00980 if (TAO_debug_level > 4) 00981 { 00982 ACE_DEBUG ((LM_DEBUG, 00983 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 00984 ACE_TEXT ("error during send() (errno: %d) - %m\n"), 00985 this->id (), ACE_ERRNO_GET)); 00986 } 00987 00988 if (errno == EWOULDBLOCK || errno == EAGAIN) 00989 { 00990 return DR_WOULDBLOCK; 00991 } 00992 00993 return DR_ERROR; 00994 } 00995 00996 // ... now we need to update the queue, removing elements 00997 // that have been sent, and updating the last element if it 00998 // was only partially sent ... 00999 this->cleanup_queue (byte_count); 01000 iovcnt = 0; 01001 01002 // ... start over, how do we guarantee progress? Because if 01003 // no bytes are sent send() can only return 0 or -1 01004 01005 // Total no. of bytes sent for a send call 01006 this->sent_byte_count_ += byte_count; 01007 01008 if (TAO_debug_level > 4) 01009 { 01010 ACE_DEBUG ((LM_DEBUG, 01011 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ") 01012 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"), 01013 this->id(), byte_count, this->queue_is_empty_i ())); 01014 } 01015 01016 return DR_QUEUE_EMPTY; 01017 // drain_queue_i will check if the queue is actually empty 01018 }
| TAO_Transport::Drain_Result TAO_Transport::drain_queue_i | ( | TAO::Transport::Drain_Constraints const & | dc | ) | [private] |
Implement drain_queue() assuming the lock is held.
Definition at line 1021 of file Transport.cpp.
01022 { 01023 // This is the vector used to send data, it must be declared outside 01024 // the loop because after the loop there may still be data to be 01025 // sent 01026 int iovcnt = 0; 01027 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01028 iovec iov[ACE_IOV_MAX] = { { 0 , 0 } }; 01029 #else 01030 iovec iov[ACE_IOV_MAX]; 01031 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01032 01033 // We loop over all the elements in the queue ... 01034 TAO_Queued_Message *i = this->head_; 01035 01036 // Reset the value so that the counting is done for each new send 01037 // call. 01038 this->sent_byte_count_ = 0; 01039 01040 // Avoid calling this expensive function each time through the loop. Instead 01041 // we'll assume that the time is unlikely to change much during the loop. 01042 // If we are forced to send in the loop then we'll recompute the time. 01043 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); 01044 01045 while (i != 0) 01046 { 01047 if (i->is_expired (now)) 01048 { 01049 if (TAO_debug_level > 3) 01050 { 01051 ACE_DEBUG ((LM_DEBUG, 01052 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01053 ACE_TEXT ("Discarding expired queued message.\n"), 01054 this->id ())); 01055 } 01056 TAO_Queued_Message *next = i->next (); 01057 i->state_changed (TAO_LF_Event::LFS_TIMEOUT, 01058 this->orb_core_->leader_follower ()); 01059 i->remove_from_list (this->head_, this->tail_); 01060 i->destroy (); 01061 i = next; 01062 continue; 01063 } 01064 // ... each element fills the iovector ... 01065 i->fill_iov (ACE_IOV_MAX, iovcnt, iov); 01066 01067 // ... the vector is full, no choice but to send some data out. 01068 // We need to loop because a single message can span multiple 01069 // IOV_MAX elements ... 01070 if (iovcnt == ACE_IOV_MAX) 01071 { 01072 Drain_Result const retval = 01073 this->drain_queue_helper (iovcnt, iov, dc); 01074 01075 if (TAO_debug_level > 4) 01076 { 01077 ACE_DEBUG ((LM_DEBUG, 01078 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01079 ACE_TEXT ("helper retval = %d\n"), 01080 this->id (), static_cast<int> (retval.dre_))); 01081 } 01082 01083 if (retval != DR_QUEUE_EMPTY) 01084 { 01085 return retval; 01086 } 01087 01088 now = ACE_High_Res_Timer::gettimeofday_hr (); 01089 01090 i = this->head_; 01091 continue; 01092 } 01093 // ... notice that this line is only reached if there is still 01094 // room in the iovector ... 01095 i = i->next (); 01096 } 01097 01098 if (iovcnt != 0) 01099 { 01100 Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc); 01101 01102 if (TAO_debug_level > 4) 01103 { 01104 ACE_DEBUG ((LM_DEBUG, 01105 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ") 01106 ACE_TEXT ("helper retval = %d\n"), 01107 this->id (), static_cast<int> (retval.dre_))); 01108 } 01109 01110 if (retval != DR_QUEUE_EMPTY) 01111 { 01112 return retval; 01113 } 01114 } 01115 01116 if (this->queue_is_empty_i ()) 01117 { 01118 if (this->flush_timer_pending ()) 01119 { 01120 ACE_Event_Handler *eh = this->event_handler_i (); 01121 ACE_Reactor * const reactor = eh->reactor (); 01122 reactor->cancel_timer (this->flush_timer_id_); 01123 this->reset_flush_timer (); 01124 } 01125 01126 return DR_QUEUE_EMPTY; 01127 } 01128 01129 return DR_OK; 01130 }
| virtual ACE_Event_Handler* TAO_Transport::event_handler_i | ( | void | ) | [pure virtual] |
Return the event handler used to receive notifications from the Reactor. Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.
| bool TAO_Transport::first_request | ( | void | ) | const |
Get the first request flag.
Definition at line 178 of file Transport.inl.
00179 { 00180 return this->first_request_; 00181 }
| void TAO_Transport::first_request_sent | ( | bool | flag = false |
) |
Set the state of the first_request_ to flag.
Definition at line 172 of file Transport.inl.
00173 { 00174 this->first_request_ = flag; 00175 }
| int TAO_Transport::flush_timer_pending | ( | void | ) | const [private] |
Check if the flush timer is still pending.
Definition at line 116 of file Transport.inl.
00117 { 00118 return this->flush_timer_id_ != -1; 00119 }
| int TAO_Transport::format_queue_message | ( | TAO_OutputCDR & | stream, | |
| ACE_Time_Value * | max_wait_time, | |||
| TAO_Stub * | stub | |||
| ) |
Format and queue a message for stream
| max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 557 of file Transport.cpp.
00560 { 00561 if (this->messaging_object ()->format_message (stream, stub) != 0) 00562 return -1; 00563 00564 return this->queue_message_i (stream.begin (), max_wait_time); 00565 }
| int TAO_Transport::generate_locate_request | ( | TAO_Target_Specification & | spec, | |
| TAO_Operation_Details & | opdetails, | |||
| TAO_OutputCDR & | output | |||
| ) |
This is a request for the transport object to write a LocateRequest header before it is sent out.
Definition at line 419 of file Transport.cpp.
00423 { 00424 if (this->messaging_object ()->generate_locate_request_header (opdetails, 00425 spec, 00426 output) == -1) 00427 { 00428 if (TAO_debug_level > 0) 00429 { 00430 ACE_ERROR ((LM_ERROR, 00431 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ") 00432 ACE_TEXT ("error while marshalling the LocateRequest header\n"), 00433 this->id ())); 00434 } 00435 00436 return -1; 00437 } 00438 00439 return 0; 00440 }
| int TAO_Transport::generate_request_header | ( | TAO_Operation_Details & | opd, | |
| TAO_Target_Specification & | spec, | |||
| TAO_OutputCDR & | msg | |||
| ) | [virtual] |
This is a request for the transport object to write a request header before it sends out the request
Definition at line 443 of file Transport.cpp.
00447 { 00448 if (this->messaging_object ()->generate_request_header (opdetails, 00449 spec, 00450 output) == -1) 00451 { 00452 if (TAO_debug_level > 0) 00453 { 00454 ACE_ERROR ((LM_ERROR, 00455 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ") 00456 ACE_TEXT ("error while marshalling the Request header\n"), 00457 this->id())); 00458 } 00459 00460 return -1; 00461 } 00462 00463 return 0; 00464 }
| int TAO_Transport::handle_input | ( | TAO_Resume_Handle & | rh, | |
| ACE_Time_Value * | max_wait_time = 0 | |||
| ) | [virtual] |
Callback to read incoming data.
The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.
Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).
| max_wait_time | In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified. |
Definition at line 1629 of file Transport.cpp.
01631 { 01632 if (TAO_debug_level > 3) 01633 { 01634 ACE_DEBUG ((LM_DEBUG, 01635 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"), 01636 this->id ())); 01637 } 01638 01639 // First try to process messages of the head of the incoming queue. 01640 int const retval = this->process_queue_head (rh); 01641 01642 if (retval <= 0) 01643 { 01644 if (retval == -1) 01645 { 01646 if (TAO_debug_level > 2) 01647 { 01648 ACE_ERROR ((LM_ERROR, 01649 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01650 ACE_TEXT ("error while parsing the head of the queue\n"), 01651 this->id())); 01652 01653 } 01654 return -1; 01655 } 01656 else 01657 { 01658 // retval == 0 01659 01660 // Processed a message in queue successfully. This 01661 // thread must return to thread-pool now. 01662 return 0; 01663 } 01664 } 01665 01666 TAO_Queued_Data *q_data = 0; 01667 01668 if (this->incoming_message_stack_.top (q_data) != -1 01669 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED) 01670 { 01671 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */ 01672 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1) 01673 { 01674 if (TAO_debug_level > 0) 01675 { 01676 ACE_ERROR ((LM_ERROR, 01677 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01678 ACE_TEXT ("error consolidating incoming message\n"), 01679 this->id ())); 01680 } 01681 return -1; 01682 } 01683 } 01684 else 01685 { 01686 if (this->handle_input_parse_data (rh, max_wait_time) == -1) 01687 { 01688 if (TAO_debug_level > 0) 01689 { 01690 ACE_ERROR ((LM_ERROR, 01691 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ") 01692 ACE_TEXT ("error parsing incoming message\n"), 01693 this->id ())); 01694 } 01695 return -1; 01696 } 01697 } 01698 01699 return 0; 01700 }
| int TAO_Transport::handle_input_missing_data | ( | TAO_Resume_Handle & | rh, | |
| ACE_Time_Value * | max_wait_time, | |||
| TAO_Queued_Data * | q_data | |||
| ) | [private] |
Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.
Definition at line 1847 of file Transport.cpp.
01850 { 01851 // paranoid check 01852 if (q_data == 0) 01853 { 01854 return -1; 01855 } 01856 01857 if (TAO_debug_level > 3) 01858 { 01859 ACE_DEBUG ((LM_DEBUG, 01860 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01861 ACE_TEXT ("enter (missing data == %d)\n"), 01862 this->id (), q_data->missing_data ())); 01863 } 01864 01865 size_t const recv_size = q_data->missing_data (); 01866 01867 if (q_data->msg_block ()->space() < recv_size) 01868 { 01869 // make sure the message_block has enough space 01870 size_t const message_size = recv_size + q_data->msg_block ()->length(); 01871 01872 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1) 01873 { 01874 return -1; 01875 } 01876 } 01877 01878 // Saving the size of the received buffer in case any one needs to 01879 // get the size of the message thats received in the 01880 // context. Obviously the value will be changed for each recv call 01881 // and the user is supposed to invoke the accessor only in the 01882 // invocation context to get meaningful information. 01883 this->recv_buffer_size_ = recv_size; 01884 01885 // Read the message into the existing message block on heap 01886 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), 01887 recv_size, 01888 max_wait_time); 01889 01890 if (n <= 0) 01891 { 01892 return n; 01893 } 01894 01895 if (TAO_debug_level > 3) 01896 { 01897 ACE_DEBUG ((LM_DEBUG, 01898 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ") 01899 ACE_TEXT ("read bytes %d\n"), 01900 this->id (), n)); 01901 } 01902 01903 q_data->msg_block ()->wr_ptr(n); 01904 q_data->missing_data (q_data->missing_data () - n); 01905 01906 if (q_data->missing_data () == 0) 01907 { 01908 // paranoid check 01909 if (this->incoming_message_stack_.pop (q_data) == -1) 01910 { 01911 return -1; 01912 } 01913 01914 if (this->consolidate_process_message (q_data, rh) == -1) 01915 { 01916 return -1; 01917 } 01918 } 01919 01920 return 0; 01921 }
| int TAO_Transport::handle_input_parse_data | ( | TAO_Resume_Handle & | rh, | |
| ACE_Time_Value * | max_wait_time | |||
| ) | [private] |
Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.
Definition at line 1968 of file Transport.cpp.
01970 { 01971 if (TAO_debug_level > 3) 01972 { 01973 ACE_DEBUG ((LM_DEBUG, 01974 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 01975 ACE_TEXT ("enter\n"), 01976 this->id ())); 01977 } 01978 01979 // The buffer on the stack which will be used to hold the input 01980 // messages, ACE_CDR::MAX_ALIGNMENT compensates the 01981 // memory-alignment. This improves performance with SUN-Java-ORB-1.4 01982 // and higher that sends fragmented requests of size 1024 bytes. 01983 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT]; 01984 01985 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 01986 (void) ACE_OS::memset (buf, 01987 '\0', 01988 sizeof buf); 01989 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 01990 01991 // Create a data block 01992 ACE_Data_Block db (sizeof (buf), 01993 ACE_Message_Block::MB_DATA, 01994 buf, 01995 this->orb_core_->input_cdr_buffer_allocator (), 01996 this->orb_core_->locking_strategy (), 01997 ACE_Message_Block::DONT_DELETE, 01998 this->orb_core_->input_cdr_dblock_allocator ()); 01999 02000 // Create a message block 02001 ACE_Message_Block message_block (&db, 02002 ACE_Message_Block::DONT_DELETE, 02003 this->orb_core_->input_cdr_msgblock_allocator ()); 02004 02005 // Align the message block 02006 ACE_CDR::mb_align (&message_block); 02007 02008 size_t recv_size = 0; // Note: unsigned integer 02009 02010 // Pointer to newly parsed message 02011 TAO_Queued_Data *q_data = 0; 02012 02013 // Optimizing access of constants 02014 size_t const header_length = this->messaging_object ()->header_length (); 02015 02016 // Paranoid check 02017 if (header_length > message_block.space ()) 02018 { 02019 return -1; 02020 } 02021 02022 if (this->orb_core_->orb_params ()->single_read_optimization ()) 02023 { 02024 recv_size = message_block.space (); 02025 } 02026 else 02027 { 02028 // Single read optimization has been de-activated. That means 02029 // that we need to read from transport the GIOP header first 02030 // before the payload. This codes first checks the incoming 02031 // stack for partial messages which needs to be 02032 // consolidated. Otherwise we are in new cycle, reading complete 02033 // GIOP header of new incoming message. 02034 if (this->incoming_message_stack_.top (q_data) != -1 02035 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 02036 { 02037 // There is a partial message on incoming_message_stack_ 02038 // whose length is unknown so far. We need to consolidate 02039 // the GIOP header to get to know the payload size, 02040 recv_size = header_length - q_data->msg_block ()->length (); 02041 } 02042 else 02043 { 02044 // Read amount of data forming GIOP header of new incoming 02045 // message. 02046 recv_size = header_length; 02047 } 02048 // POST: 0 <= recv_size <= header_length 02049 } 02050 // POST: 0 <= recv_size <= message_block->space () 02051 02052 // If we have a partial message, copy it into our message block and 02053 // clear out the partial message. 02054 if (this->partial_message_ != 0 && this->partial_message_->length () > 0) 02055 { 02056 // (*) Copy back the partial message into current read-buffer, 02057 // verify that the read-strategy of "recv_size" bytes is not 02058 // exceeded. The latter check guarantees that recv_size does not 02059 // roll-over and keeps in range 02060 // 0<=recv_size<=message_block->space() 02061 if (this->partial_message_->length () <= recv_size && 02062 message_block.copy (this->partial_message_->rd_ptr (), 02063 this->partial_message_->length ()) == 0) 02064 { 02065 02066 recv_size -= this->partial_message_->length (); 02067 this->partial_message_->reset (); 02068 } 02069 else 02070 { 02071 return -1; 02072 } 02073 } 02074 // POST: 0 <= recv_size <= buffer_space 02075 02076 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0 02077 { 02078 // This event would cause endless looping, trying frequently to 02079 // read zero bytes from stream. This might happen, if TAOs 02080 // protocol implementation is not correct and tries to read data 02081 // beyond header without "single_read_optimazation" being 02082 // activated. 02083 if (TAO_debug_level > 0) 02084 { 02085 ACE_ERROR ((LM_ERROR, 02086 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02087 ACE_TEXT ("Error - endless loop detection, closing connection"), 02088 this->id ())); 02089 } 02090 return -1; 02091 } 02092 02093 // Saving the size of the received buffer in case any one needs to 02094 // get the size of the message thats received in the 02095 // context. Obviously the value will be changed for each recv call 02096 // and the user is supposed to invoke the accessor only in the 02097 // invocation context to get meaningful information. 02098 this->recv_buffer_size_ = recv_size; 02099 02100 // Read the message into the message block that we have created on 02101 // the stack. 02102 ssize_t const n = this->recv (message_block.wr_ptr (), 02103 recv_size, 02104 max_wait_time); 02105 02106 // If there is an error return to the reactor.. 02107 if (n <= 0) 02108 { 02109 return n; 02110 } 02111 02112 if (TAO_debug_level > 3) 02113 { 02114 ACE_DEBUG ((LM_DEBUG, 02115 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02116 ACE_TEXT ("read %d bytes\n"), 02117 this->id (), n)); 02118 } 02119 02120 // Set the write pointer in the stack buffer 02121 message_block.wr_ptr (n); 02122 02123 // 02124 // STACK PROCESSING OR MESSAGE CONSOLIDATION 02125 // 02126 02127 // PRE: data in buffer is aligned && message_block.length() > 0 02128 02129 if (this->incoming_message_stack_.top (q_data) != -1 02130 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED) 02131 { 02132 // 02133 // MESSAGE CONSOLIDATION 02134 // 02135 02136 // Partial message on incoming_message_stack_ needs to be 02137 // consolidated. The message header could not be parsed so far 02138 // and therefor the message size is unknown yet. Consolidating 02139 // the message destroys the memory alignment of succeeding 02140 // messages sharing the buffer, for that reason consolidation 02141 // and stack based processing are mutial exclusive. 02142 if (this->messaging_object ()->consolidate_node (q_data, 02143 message_block) == -1) 02144 { 02145 if (TAO_debug_level > 0) 02146 { 02147 ACE_ERROR ((LM_ERROR, 02148 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02149 ACE_TEXT ("error consolidating message from input buffer\n"), 02150 this->id () )); 02151 } 02152 return -1; 02153 } 02154 02155 // Complete message are to be enqueued and later processed 02156 if (q_data->missing_data () == 0) 02157 { 02158 if (this->incoming_message_stack_.pop (q_data) == -1) 02159 { 02160 return -1; 02161 } 02162 02163 if (this->consolidate_enqueue_message (q_data) == -1) 02164 { 02165 return -1; 02166 } 02167 } 02168 02169 if (message_block.length () > 0 02170 && this->handle_input_parse_extra_messages (message_block) == -1) 02171 { 02172 return -1; 02173 } 02174 02175 // In any case try to process the enqueued messages 02176 if (this->process_queue_head (rh) == -1) 02177 { 02178 return -1; 02179 } 02180 } 02181 else 02182 { 02183 // 02184 // STACK PROCESSING (critical path) 02185 // 02186 02187 // Process the first message in buffer on stack 02188 02189 // (PRE: first message resides in aligned memory) Make a node of 02190 // the message-block.. 02191 02192 TAO_Queued_Data qd (&message_block, 02193 this->orb_core_->transport_message_buffer_allocator ()); 02194 02195 size_t mesg_length = 0; 02196 02197 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1 02198 || (qd.missing_data () == 0 02199 && mesg_length > message_block.length ()) ) 02200 { 02201 // extracting message failed 02202 return -1; 02203 } 02204 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length() 02205 // This prevents seeking rd_ptr behind the wr_ptr 02206 02207 if (qd.missing_data () != 0 || 02208 qd.more_fragments () || 02209 qd.msg_type () == GIOP::Fragment) 02210 { 02211 if (qd.missing_data () == 0) 02212 { 02213 // Dealing with a fragment 02214 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); 02215 02216 if (nqd == 0) 02217 { 02218 return -1; 02219 } 02220 02221 // mark the end of message in new buffer 02222 char* end_mark = nqd->msg_block ()->rd_ptr () 02223 + mesg_length; 02224 nqd->msg_block ()->wr_ptr (end_mark); 02225 02226 // move the read pointer forward in old buffer 02227 message_block.rd_ptr (mesg_length); 02228 02229 // enqueue the message 02230 if (this->consolidate_enqueue_message (nqd) == -1) 02231 { 02232 return -1; 02233 } 02234 02235 if (message_block.length () > 0 02236 && this->handle_input_parse_extra_messages (message_block) == -1) 02237 { 02238 return -1; 02239 } 02240 02241 // In any case try to process the enqueued messages 02242 if (this->process_queue_head (rh) == -1) 02243 { 02244 return -1; 02245 } 02246 } 02247 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED) 02248 { 02249 // Incomplete message, must be the last one in buffer 02250 02251 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED && 02252 qd.missing_data () > message_block.space ()) 02253 { 02254 // Re-Allocate correct size on heap 02255 if (ACE_CDR::grow (qd.msg_block (), 02256 message_block.length () 02257 + qd.missing_data ()) == -1) 02258 { 02259 return -1; 02260 } 02261 } 02262 02263 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); 02264 02265 if (nqd == 0) 02266 { 02267 return -1; 02268 } 02269 02270 // move read-pointer to end of buffer 02271 message_block.rd_ptr (message_block.length()); 02272 02273 this->incoming_message_stack_.push (nqd); 02274 } 02275 } 02276 else 02277 { 02278 // 02279 // critical path 02280 // 02281 02282 // We cant process the message on stack right now. First we 02283 // have got to parse extra messages from message_block, 02284 // putting them into queue. When this is done we can return 02285 // to process this message, and notifying other threads to 02286 // process the messages in queue. 02287 char * end_marker = message_block.rd_ptr () 02288 + mesg_length; 02289 02290 if (message_block.length () > mesg_length) 02291 { 02292 // There are more message in data stream to be parsed. 02293 // Safe the rd_ptr to restore later. 02294 char *rd_ptr_stack_mesg = message_block.rd_ptr (); 02295 02296 // Skip parsed message, jump to next message in buffer 02297 // PRE: mesg_length <= message_block.length () 02298 message_block.rd_ptr (mesg_length); 02299 02300 // Extract remaining messages and enqueue them for later 02301 // heap processing 02302 if (this->handle_input_parse_extra_messages (message_block) == -1) 02303 { 02304 return -1; 02305 } 02306 02307 // correct the wr_ptr using the end_marker to point to the 02308 // end of the first message else the code after this will 02309 // see the full stream with all the messages 02310 message_block.wr_ptr (end_marker); 02311 02312 // Restore rd_ptr 02313 message_block.rd_ptr (rd_ptr_stack_mesg); 02314 } 02315 02316 // The following if-else has been copied from 02317 // process_queue_head(). While process_queue_head() 02318 // processes message on heap, here we will process a message 02319 // on stack. 02320 02321 // Now that we have one message on stack to be processed, 02322 // check whether we have one more message in the queue... 02323 if (this->incoming_message_queue_.queue_length () > 0) 02324 { 02325 if (TAO_debug_level > 0) 02326 { 02327 ACE_DEBUG ((LM_DEBUG, 02328 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ") 02329 ACE_TEXT ("notify reactor\n"), 02330 this->id ())); 02331 } 02332 02333 int const retval = this->notify_reactor (); 02334 02335 if (retval == 1) 02336 { 02337 // Let the class know that it doesn't need to resume the 02338 // handle.. 02339 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02340 } 02341 else if (retval < 0) 02342 return -1; 02343 } 02344 else 02345 { 02346 // As there are no further messages in queue just resume 02347 // the handle. Set the flag incase someone had reset the flag.. 02348 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02349 } 02350 02351 // PRE: incoming_message_queue is empty 02352 if (this->process_parsed_messages (&qd, rh) == -1) 02353 { 02354 return -1; 02355 } 02356 // move the rd_ptr tp position of end_marker 02357 message_block.rd_ptr (end_marker); 02358 } 02359 } 02360 02361 // Now that all cases have been processed, there might be kept some data 02362 // in buffer that needs to be safed for next "handle_input" invocations. 02363 if (message_block.length () > 0) 02364 { 02365 if (this->partial_message_ == 0) 02366 { 02367 this->allocate_partial_message_block (); 02368 } 02369 02370 if (this->partial_message_ != 0 && 02371 this->partial_message_->copy (message_block.rd_ptr (), 02372 message_block.length ()) == 0) 02373 { 02374 message_block.rd_ptr (message_block.length ()); 02375 } 02376 else 02377 { 02378 return -1; 02379 } 02380 } 02381 02382 return 0; 02383 }
| int TAO_Transport::handle_input_parse_extra_messages | ( | ACE_Message_Block & | message_block | ) | [private] |
Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.
Definition at line 1925 of file Transport.cpp.
01927 { 01928 // store buffer status of last extraction: -1 parse error, 0 01929 // incomplete message header in buffer, 1 complete messages header 01930 // parsed 01931 int buf_status = 0; 01932 01933 TAO_Queued_Data *q_data = 0; // init 01934 01935 // parse buffer until all messages have been extracted, consolidate 01936 // and enqueue complete messages, if the last message being parsed 01937 // has missin data, it is stays on top of incoming_message_stack. 01938 while (message_block.length () > 0 && 01939 (buf_status = this->messaging_object ()->extract_next_message 01940 (message_block, q_data)) != -1 && 01941 q_data != 0) // paranoid check 01942 { 01943 if (q_data->missing_data () == 0) 01944 { 01945 if (this->consolidate_enqueue_message (q_data) == -1) 01946 { 01947 return -1; 01948 } 01949 } 01950 else // incomplete message read, probably the last message in buffer 01951 { 01952 // can not fail 01953 this->incoming_message_stack_.push (q_data); 01954 } 01955 01956 q_data = 0; // reset 01957 } // while 01958 01959 if (buf_status == -1) 01960 { 01961 return -1; 01962 } 01963 01964 return 0; 01965 }
| TAO_Transport::Drain_Result TAO_Transport::handle_output | ( | TAO::Transport::Drain_Constraints const & | c | ) |
Callback method to reactively drain the outgoing data queue.
Definition at line 525 of file Transport.cpp.
00526 { 00527 if (TAO_debug_level > 3) 00528 { 00529 ACE_DEBUG ((LM_DEBUG, 00530 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output") 00531 ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"), 00532 this->id (), 00533 dc.block_on_io(), 00534 dc.timeout() ? dc.timeout()->sec() : -1, 00535 dc.timeout() ? dc.timeout()->usec() : -1 )); 00536 } 00537 00538 // The flushing strategy (potentially via the Reactor) wants to send 00539 // more data, first check if there is a current message that needs 00540 // more sending... 00541 Drain_Result const retval = this->drain_queue (dc); 00542 00543 if (TAO_debug_level > 3) 00544 { 00545 ACE_DEBUG ((LM_DEBUG, 00546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ") 00547 ACE_TEXT ("drain_queue returns %d/%d\n"), 00548 this->id (), 00549 static_cast<int> (retval.dre_), ACE_ERRNO_GET)); 00550 } 00551 00552 // Any errors are returned directly to the Reactor 00553 return retval; 00554 }
| int TAO_Transport::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
| const void * | act | |||
| ) |
The timeout callback, invoked when any of the timers related to this transport expire.
| current_time | The current time as reported from the Reactor | |
| act | The Asynchronous Completion Token. Currently it is interpreted as follows:
|
This is the only legal ACT in the current configuration....
Definition at line 874 of file Transport.cpp.
00876 { 00877 if (TAO_debug_level > 6) 00878 { 00879 ACE_DEBUG ((LM_DEBUG, 00880 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ") 00881 ACE_TEXT ("timer expired\n"), 00882 this->id ())); 00883 } 00884 00885 /// This is the only legal ACT in the current configuration.... 00886 if (act != &this->current_deadline_) 00887 { 00888 return -1; 00889 } 00890 00891 if (this->flush_timer_pending ()) 00892 { 00893 // The timer is always a oneshot timer, so mark is as not 00894 // pending. 00895 this->reset_flush_timer (); 00896 00897 TAO_Flushing_Strategy *flushing_strategy = 00898 this->orb_core ()->flushing_strategy (); 00899 int const result = flushing_strategy->schedule_output (this); 00900 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00901 { 00902 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00903 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00904 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00905 if (flushing_strategy->flush_transport (this, 0) == -1) { 00906 return -1; 00907 } 00908 } 00909 } 00910 00911 return 0; 00912 }
| void TAO_Transport::id | ( | size_t | id | ) |
Definition at line 100 of file Transport.inl.
| size_t TAO_Transport::id | ( | void | ) | const |
Set and Get the identifier for this transport instance.
If not set, this will return an integer representation of the this pointer for the instance on which it's called.
Definition at line 94 of file Transport.inl.
00095 { 00096 return this->id_; 00097 }
| bool TAO_Transport::idle_after_reply | ( | void | ) |
Request is sent and the reply is received. Idle the transport now.
Definition at line 273 of file Transport.cpp.
00274 { 00275 return this->tms ()->idle_after_reply (); 00276 }
| bool TAO_Transport::idle_after_send | ( | void | ) |
Request has been just sent, but the reply is not received. Idle the transport now.
Definition at line 267 of file Transport.cpp.
00268 { 00269 return this->tms ()->idle_after_send (); 00270 }
| ACE_Time_Value const * TAO_Transport::io_timeout | ( | TAO::Transport::Drain_Constraints const & | dc | ) | const [protected] |
Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.
This function was introduced as part of the fixes for bug 3647.
Definition at line 2791 of file Transport.cpp.
02793 { 02794 if (dc.block_on_io()) 02795 { 02796 return dc.timeout(); 02797 } 02798 if (this->wait_strategy()->can_process_upcalls()) 02799 { 02800 return 0; 02801 } 02802 return dc.timeout(); 02803 }
| bool TAO_Transport::is_connected | ( | void | ) | const |
Is this transport really connected.
Definition at line 184 of file Transport.inl.
00185 { 00186 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); 00187 return this->is_connected_; 00188 }
| CORBA::Boolean TAO_Transport::is_tcs_set | ( | void | ) | const |
Return true if the tcs has been set.
CodeSet negotiation.
Definition at line 166 of file Transport.inl.
00167 { 00168 return tcs_set_; 00169 }
| int TAO_Transport::make_idle | ( | void | ) |
Cache management.
Definition at line 501 of file Transport.cpp.
00502 { 00503 if (TAO_debug_level > 3) 00504 { 00505 ACE_DEBUG ((LM_DEBUG, 00506 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"), 00507 this->id ())); 00508 } 00509 00510 return this->transport_cache_manager ().make_idle (this->cache_map_entry_); 00511 }
| void TAO_Transport::messaging_init | ( | TAO_GIOP_Message_Version const & | version | ) |
Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.
Definition at line 2670 of file Transport.cpp.
02671 { 02672 this->messaging_object ()->init (version.major, version.minor); 02673 }
| TAO_GIOP_Message_Base * TAO_Transport::messaging_object | ( | void | ) |
Return the messaging object that is used to format the data that needs to be sent.
Definition at line 129 of file Transport.inl.
00130 { 00131 return this->messaging_object_; 00132 }
| int TAO_Transport::notify_reactor | ( | void | ) | [private] |
These classes need privileged access to:
Definition at line 2573 of file Transport.cpp.
02574 { 02575 if (!this->ws_->is_registered ()) 02576 { 02577 return 0; 02578 } 02579 02580 ACE_Event_Handler *eh = this->event_handler_i (); 02581 02582 // Get the reactor associated with the event handler 02583 ACE_Reactor *reactor = this->orb_core ()->reactor (); 02584 02585 if (TAO_debug_level > 0) 02586 { 02587 ACE_DEBUG ((LM_DEBUG, 02588 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02589 ACE_TEXT ("notify to Reactor\n"), 02590 this->id ())); 02591 } 02592 02593 // Send a notification to the reactor... 02594 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK); 02595 02596 if (retval < 0 && TAO_debug_level > 2) 02597 { 02598 // @todo: need to think about what is the action that 02599 // we can take when we get here. 02600 ACE_ERROR ((LM_ERROR, 02601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ") 02602 ACE_TEXT ("notify to the reactor failed..\n"), 02603 this->id ())); 02604 } 02605 02606 return 1; 02607 }
| void TAO_Transport::opened_as | ( | TAO::Connection_Role | role | ) |
Definition at line 57 of file Transport.inl.
00058 { 00059 this->opening_connection_role_ = role; 00060 }
| TAO::Connection_Role TAO_Transport::opened_as | ( | void | ) | const |
Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
Definition at line 51 of file Transport.inl.
00052 { 00053 return this->opening_connection_role_; 00054 }
| TAO_ORB_Core * TAO_Transport::orb_core | ( | void | ) | const |
Access the ORB that owns this connection.
Definition at line 20 of file Transport.inl.
00021 { 00022 return this->orb_core_; 00023 }
| TAO_OutputCDR & TAO_Transport::out_stream | ( | void | ) |
Accessor for the output CDR stream.
Definition at line 2658 of file Transport.cpp.
02659 { 02660 return this->messaging_object ()->out_stream (); 02661 }
| TAO_SYNCH_MUTEX & TAO_Transport::output_cdr_lock | ( | void | ) |
Accessor for synchronizing Transport OutputCDR access.
Definition at line 2664 of file Transport.cpp.
02665 { 02666 return this->output_cdr_mutex_; 02667 }
| bool TAO_Transport::post_connect_hook | ( | void | ) | [virtual] |
Hooks that can be overridden in concrete transports.
These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)
Definition at line 325 of file Transport.cpp.
| bool TAO_Transport::post_open | ( | size_t | id | ) |
Perform all the actions when this transport get opened.
Definition at line 2694 of file Transport.cpp.
02695 { 02696 if (TAO_debug_level > 9) 02697 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ") 02698 ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id)); 02699 this->id_ = id; 02700 02701 // When we have data in our outgoing queue schedule ourselves 02702 // for output 02703 if (!this->queue_is_empty_i ()) 02704 { 02705 // If the wait strategy wants us to be registered with the reactor 02706 // then we do so. If registeration is required and it succeeds, 02707 // #REFCOUNT# becomes two. 02708 if (this->wait_strategy ()->register_handler () == 0) 02709 { 02710 if (this->flush_in_post_open_) 02711 { 02712 TAO_Flushing_Strategy *flushing_strategy = 02713 this->orb_core ()->flushing_strategy (); 02714 02715 if (flushing_strategy == 0) 02716 throw CORBA::INTERNAL (); 02717 02718 this->flush_in_post_open_ = false; 02719 (void) flushing_strategy->schedule_output (this); 02720 } 02721 } 02722 else 02723 { 02724 // Registration failures. 02725 02726 // Purge from the connection cache, if we are not in the cache, this 02727 // just does nothing. 02728 (void) this->purge_entry (); 02729 02730 // Close the handler. 02731 (void) this->close_connection (); 02732 02733 if (TAO_debug_level > 0) 02734 { 02735 ACE_ERROR ((LM_ERROR, 02736 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ") 02737 ACE_TEXT ("could not register the transport ") 02738 ACE_TEXT ("in the reactor.\n"), 02739 this->id ())); 02740 } 02741 02742 return false; 02743 } 02744 } 02745 02746 { 02747 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false); 02748 this->is_connected_ = true; 02749 } 02750 02751 if (TAO_debug_level > 9 && !this->cache_map_entry_) 02752 { 02753 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open") 02754 ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_)); 02755 } 02756 02757 this->transport_cache_manager ().mark_connected (this->cache_map_entry_, 02758 true); 02759 02760 // update transport cache to make this entry available 02761 this->transport_cache_manager ().set_entry_state ( 02762 this->cache_map_entry_, 02763 TAO::ENTRY_IDLE_AND_PURGABLE); 02764 02765 return true; 02766 }
| void TAO_Transport::pre_close | ( | void | ) |
do what needs to be done when closing the transport
Definition at line 2676 of file Transport.cpp.
02677 { 02678 // @TODO: something needs to be done with is_connected_. Checking it is 02679 // guarded by a mutex, but setting it is not. Until the need for mutexed 02680 // protection is required, the transport cache is holding its own copy 02681 // of the is_connected_ flag, so that during cache lookups the cache 02682 // manager doesn't need to be burdened by the lock in is_connected(). 02683 this->is_connected_ = false; 02684 this->transport_cache_manager ().mark_connected (this->cache_map_entry_, 02685 false); 02686 this->purge_entry (); 02687 { 02688 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 02689 this->cleanup_queue_i (); 02690 } 02691 }
| int TAO_Transport::process_parsed_messages | ( | TAO_Queued_Data * | qd, | |
| TAO_Resume_Handle & | rh | |||
| ) | [protected] |
Process the message by sending it to the higher layers of the ORB.
Definition at line 2387 of file Transport.cpp.
02389 { 02390 if (TAO_debug_level > 7) 02391 { 02392 ACE_DEBUG ((LM_DEBUG, 02393 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02394 ACE_TEXT ("entering (missing data == %d)\n"), 02395 this->id(), qd->missing_data ())); 02396 } 02397 02398 #if TAO_HAS_TRANSPORT_CURRENT == 1 02399 // Update stats, if any 02400 if (this->stats_ != 0) 02401 this->stats_->messages_received (qd->msg_block ()->length ()); 02402 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 02403 02404 switch (qd->msg_type ()) 02405 { 02406 case GIOP::CloseConnection: 02407 { 02408 if (TAO_debug_level > 0) 02409 { 02410 ACE_DEBUG ((LM_DEBUG, 02411 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02412 ACE_TEXT ("received CloseConnection message - %m\n"), 02413 this->id())); 02414 } 02415 02416 // Return a "-1" so that the next stage can take care of 02417 // closing connection and the necessary memory management. 02418 return -1; 02419 } 02420 break; 02421 case GIOP::Request: 02422 case GIOP::LocateRequest: 02423 { 02424 // Let us resume the handle before we go ahead to process the 02425 // request. This will open up the handle for other threads. 02426 rh.resume_handle (); 02427 02428 if (this->messaging_object ()->process_request_message (this, qd) == -1) 02429 { 02430 // Return a "-1" so that the next stage can take care of 02431 // closing connection and the necessary memory management. 02432 return -1; 02433 } 02434 } 02435 break; 02436 case GIOP::Reply: 02437 case GIOP::LocateReply: 02438 { 02439 rh.resume_handle (); 02440 02441 TAO_Pluggable_Reply_Params params (this); 02442 02443 if (this->messaging_object ()->process_reply_message (params, qd) == -1) 02444 { 02445 if (TAO_debug_level > 0) 02446 { 02447 ACE_ERROR ((LM_ERROR, 02448 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02449 ACE_TEXT ("error in process_reply_message - %m\n"), 02450 this->id ())); 02451 } 02452 02453 return -1; 02454 } 02455 02456 } 02457 break; 02458 case GIOP::CancelRequest: 02459 { 02460 // The associated request might be incomplete residing 02461 // fragmented in messaging object. We must make sure the 02462 // resources allocated by fragments are released. 02463 if (this->messaging_object ()->discard_fragmented_message (qd) == -1) 02464 { 02465 if (TAO_debug_level > 0) 02466 { 02467 ACE_ERROR ((LM_ERROR, 02468 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02469 ACE_TEXT ("error processing CancelRequest\n"), 02470 this->id ())); 02471 } 02472 } 02473 02474 // We are not able to cancel requests being processed already; 02475 // this is declared as optional feature by CORBA, and TAO does 02476 // not support this currently. 02477 02478 // Just continue processing, CancelRequest does not mean to cut 02479 // off the connection. 02480 } 02481 break; 02482 case GIOP::MessageError: 02483 { 02484 if (TAO_debug_level > 0) 02485 { 02486 ACE_ERROR ((LM_ERROR, 02487 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ") 02488 ACE_TEXT ("received MessageError, closing connection\n"), 02489 this->id ())); 02490 } 02491 return -1; 02492 } 02493 break; 02494 case GIOP::Fragment: 02495 { 02496 // Nothing to be done. 02497 } 02498 break; 02499 } 02500 02501 // If not, just return back.. 02502 return 0; 02503 }
| int TAO_Transport::process_queue_head | ( | TAO_Resume_Handle & | rh | ) | [private] |
These classes need privileged access to:
Definition at line 2506 of file Transport.cpp.
02507 { 02508 if (TAO_debug_level > 3) 02509 { 02510 ACE_DEBUG ((LM_DEBUG, 02511 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"), 02512 this->id (), this->incoming_message_queue_.queue_length () )); 02513 } 02514 02515 // See if message in queue ... 02516 if (this->incoming_message_queue_.queue_length () > 0) 02517 { 02518 // Get the message on the head of the queue.. 02519 TAO_Queued_Data *qd = 02520 this->incoming_message_queue_.dequeue_head (); 02521 02522 if (TAO_debug_level > 3) 02523 { 02524 ACE_DEBUG ((LM_DEBUG, 02525 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02526 ACE_TEXT ("the size of the queue is [%d]\n"), 02527 this->id (), 02528 this->incoming_message_queue_.queue_length())); 02529 } 02530 // Now that we have pulled out out one message out of the queue, 02531 // check whether we have one more message in the queue... 02532 if (this->incoming_message_queue_.queue_length () > 0) 02533 { 02534 if (TAO_debug_level > 0) 02535 { 02536 ACE_DEBUG ((LM_DEBUG, 02537 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ") 02538 ACE_TEXT ("notify reactor\n"), 02539 this->id ())); 02540 } 02541 02542 int const retval = this->notify_reactor (); 02543 02544 if (retval == 1) 02545 { 02546 // Let the class know that it doesn't need to resume the 02547 // handle.. 02548 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); 02549 } 02550 else if (retval < 0) 02551 return -1; 02552 } 02553 else 02554 { 02555 // As we are ready to process the last message just resume 02556 // the handle. Set the flag incase someone had reset the flag.. 02557 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); 02558 } 02559 02560 // Process the message... 02561 int const retval = this->process_parsed_messages (qd, rh); 02562 02563 // Delete the Queued_Data.. 02564 TAO_Queued_Data::release (qd); 02565 02566 return retval; 02567 } 02568 02569 return 1; 02570 }
| bool TAO_Transport::provide_blockable_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Add event handlers corresponding to transports that have RW wait strategy to the handlers set. Called by the cache when the ORB is shuting down.
| handlers | The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on. |
Definition at line 253 of file Transport.cpp.
00254 { 00255 if (this->ws_->non_blocking () || 00256 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE) 00257 return false; 00258 00259 (void) this->add_reference (); 00260 00261 h.insert (this->connection_handler_i ()); 00262 00263 return true; 00264 }
| void TAO_Transport::provide_handler | ( | TAO::Connection_Handler_Set & | handlers | ) |
Added event handler to the handlers set.
Called by the cache when the cache is closing.
| handlers | The TAO_Connection_Handler_Set into which the transport should place its handler |
Definition at line 245 of file Transport.cpp.
00246 { 00247 (void) this->add_reference (); 00248 00249 handlers.insert (this->connection_handler_i ()); 00250 }
| int TAO_Transport::purge_entry | ( | void | ) |
Cache management.
Definition at line 479 of file Transport.cpp.
00480 { 00481 // We must store our entry in a temporary and zero out the data member. 00482 // If there is only one reference count on us, we will end up causing 00483 // our own destruction. And we can not be holding a cache map entry if 00484 // that happens. 00485 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* entry = 0; 00486 { 00487 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00488 entry = this->cache_map_entry_; 00489 this->cache_map_entry_ = 0; 00490 } 00491 return this->transport_cache_manager ().purge_entry (entry); 00492 }
| void TAO_Transport::purging_order | ( | unsigned long | value | ) |
Definition at line 84 of file Transport.inl.
00085 { 00086 // This should only be called by the Transport Cache Manager when 00087 // it is holding it's lock. 00088 // The transport should still be here since the cache manager still 00089 // has a reference to it. 00090 this->purging_order_ = value; 00091 }
| unsigned long TAO_Transport::purging_order | ( | void | ) | const |
Get and Set the purging order. The purging strategy uses the set version to set the purging order.
Definition at line 78 of file Transport.inl.
00079 { 00080 return this->purging_order_; 00081 }
| bool TAO_Transport::queue_is_empty | ( | void | ) |
Check if there are messages pending in the queue.
Definition at line 106 of file Transport.inl.
00107 { 00108 ACE_GUARD_RETURN (ACE_Lock, 00109 ace_mon, 00110 *this->handler_lock_, 00111 false); 00112 return this->queue_is_empty_i (); 00113 }
| bool TAO_Transport::queue_is_empty_i | ( | void | ) | const [private] |
Check if there are messages pending in the queue.
This version assumes that the lock is already held. Use with care!
Definition at line 8 of file Transport.inl.
00009 { 00010 return (this->head_ == 0); 00011 }
| int TAO_Transport::queue_message_i | ( | const ACE_Message_Block * | message_block, | |
| ACE_Time_Value * | max_wait_time, | |||
| bool | back = true | |||
| ) | [protected] |
Queue a message for message_block
| max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. | |
| back | If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue. |
Definition at line 1601 of file Transport.cpp.
01603 { 01604 TAO_Queued_Message *queued_message = 0; 01605 ACE_NEW_RETURN (queued_message, 01606 TAO_Asynch_Queued_Message (message_block, 01607 this->orb_core_, 01608 max_wait_time, 01609 0, 01610 true), 01611 -1); 01612 if (back) { 01613 queued_message->push_back (this->head_, this->tail_); 01614 } 01615 else { 01616 queued_message->push_front (this->head_, this->tail_); 01617 } 01618 01619 return 0; 01620 }
| int TAO_Transport::recache_transport | ( | TAO_Transport_Descriptor_Interface * | desc | ) |
Recache ourselves in the cache.
Ideally the following should be inline.
purge_entry has a return value, use it
Definition at line 469 of file Transport.cpp.
00470 { 00471 // First purge our entry 00472 this->purge_entry (); 00473 00474 // Then add ourselves to the cache 00475 return this->transport_cache_manager ().cache_transport (desc, this); 00476 }
| virtual ssize_t TAO_Transport::recv | ( | char * | buffer, | |
| size_t | len, | |||
| const ACE_Time_Value * | timeout = 0 | |||
| ) | [pure virtual] |
Read len bytes from into buf.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
| buffer | ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies. |
| size_t TAO_Transport::recv_buffer_size | ( | void | ) | const |
Accessor to recv_buffer_size_.
Definition at line 197 of file Transport.inl.
00198 { 00199 return this->recv_buffer_size_; 00200 }
| int TAO_Transport::register_handler | ( | void | ) | [virtual] |
Register the handler with the reactor.
Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.
Definition at line 365 of file Transport.cpp.
00366 { 00367 if (TAO_debug_level > 4) 00368 { 00369 ACE_DEBUG ((LM_DEBUG, 00370 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"), 00371 this->id ())); 00372 } 00373 00374 ACE_Reactor * const r = this->orb_core_->reactor (); 00375 00376 // @@note: This should be okay since the register handler call will 00377 // not make a nested call into the transport. 00378 ACE_GUARD_RETURN (ACE_Lock, 00379 ace_mon, 00380 *this->handler_lock_, 00381 false); 00382 00383 if (r == this->event_handler_i ()->reactor ()) 00384 { 00385 return 0; 00386 } 00387 00388 // Set the flag in the Connection Handler and in the Wait Strategy 00389 // @@Maybe we should set these flags after registering with the 00390 // reactor. What if the registration fails??? 00391 this->ws_->is_registered (true); 00392 00393 // Register the handler with the reactor 00394 return r->register_handler (this->event_handler_i (), 00395 ACE_Event_Handler::READ_MASK); 00396 }
| bool TAO_Transport::register_if_necessary | ( | void | ) |
Register with the reactor via the wait strategy.
Definition at line 331 of file Transport.cpp.
00332 { 00333 if (this->is_connected_ && 00334 this->wait_strategy ()->register_handler () == -1) 00335 { 00336 // Registration failures. 00337 if (TAO_debug_level > 0) 00338 { 00339 ACE_ERROR ((LM_ERROR, 00340 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ") 00341 ACE_TEXT ("could not register the transport ") 00342 ACE_TEXT ("in the reactor.\n"), 00343 this->id ())); 00344 } 00345 00346 // Purge from the connection cache, if we are not in the cache, this 00347 // just does nothing. 00348 (void) this->purge_entry (); 00349 00350 // Close the handler. 00351 (void) this->close_connection (); 00352 00353 return false; 00354 } 00355 return true; 00356 }
| ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference | ( | void | ) |
These classes need privileged access to:
Definition at line 2652 of file Transport.cpp.
02653 { 02654 return this->event_handler_i ()->remove_reference (); 02655 }
| void TAO_Transport::report_invalid_event_handler | ( | const char * | caller | ) | [private] |
Print out error messages if the event handler is not valid.
Definition at line 1282 of file Transport.cpp.
| void TAO_Transport::reset_flush_timer | ( | void | ) | [private] |
The flush timer expired or was explicitly cancelled, mark it as not pending
Definition at line 122 of file Transport.inl.
00123 { 00124 this->flush_timer_id_ = -1; 00125 this->current_deadline_ = ACE_Time_Value::zero; 00126 }
| int TAO_Transport::schedule_output_i | ( | void | ) | [private] |
Schedule handle_output() callbacks.
Definition at line 804 of file Transport.cpp.
00805 { 00806 ACE_Event_Handler * const eh = this->event_handler_i (); 00807 ACE_Reactor * const reactor = eh->reactor (); 00808 00809 if (reactor == 0) 00810 { 00811 if (TAO_debug_level > 1) 00812 { 00813 ACE_ERROR ((LM_ERROR, 00814 ACE_TEXT ("TAO (%P|%t) - ") 00815 ACE_TEXT ("Transport[%d]::schedule_output_i, ") 00816 ACE_TEXT ("no reactor,") 00817 ACE_TEXT ("returning -1\n"), 00818 this->id ())); 00819 } 00820 return -1; 00821 } 00822 00823 // Check to see if our event handler is still registered with the 00824 // reactor. It's possible for another thread to have run close_connection() 00825 // since we last used the event handler. 00826 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ()); 00827 if (found) 00828 { 00829 found->remove_reference (); 00830 00831 if (found != eh) 00832 { 00833 if (TAO_debug_level > 3) 00834 { 00835 ACE_ERROR ((LM_ERROR, 00836 ACE_TEXT ("TAO (%P|%t) - ") 00837 ACE_TEXT ("Transport[%d]::schedule_output_i ") 00838 ACE_TEXT ("event handler not found in reactor,") 00839 ACE_TEXT ("returning -1\n"), 00840 this->id ())); 00841 } 00842 00843 return -1; 00844 } 00845 } 00846 00847 if (TAO_debug_level > 3) 00848 { 00849 ACE_DEBUG ((LM_DEBUG, 00850 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"), 00851 this->id ())); 00852 } 00853 00854 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); 00855 }
| virtual ssize_t TAO_Transport::send | ( | iovec * | iov, | |
| int | iovcnt, | |||
| size_t & | bytes_transferred, | |||
| ACE_Time_Value const * | timeout | |||
| ) | [pure virtual] |
Write the complete Message_Block chain to the connection.
This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.
Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.
Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.
| iov | contains the data that must be sent. | |
| timeout | is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application. | |
| bytes_transferred | should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem. |
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.
| int TAO_Transport::send_asynchronous_message_i | ( | TAO_Stub * | stub, | |
| const ACE_Message_Block * | message_block, | |||
| ACE_Time_Value * | max_wait_time | |||
| ) | [private] |
Send an asynchronous message, i.e. do not block until the message is on the wire
Definition at line 1350 of file Transport.cpp.
01353 { 01354 // Let's figure out if the message should be queued without trying 01355 // to send first: 01356 bool try_sending_first = true; 01357 01358 bool const queue_empty = this->queue_is_empty_i (); 01359 01360 TAO::Transport_Queueing_Strategy *queue_strategy = 01361 stub->transport_queueing_strategy (); 01362 01363 if (!queue_empty) 01364 { 01365 try_sending_first = false; 01366 } 01367 else if (queue_strategy) 01368 { 01369 if (queue_strategy->must_queue (queue_empty)) 01370 { 01371 try_sending_first = false; 01372 } 01373 } 01374 01375 bool partially_sent = false; 01376 bool timeout_encountered = false; 01377 01378 TAO::Transport::Drain_Constraints dc( 01379 max_wait_time, this->using_blocking_io_for_asynch_messages()); 01380 01381 if (try_sending_first) 01382 { 01383 ssize_t n = 0; 01384 size_t byte_count = 0; 01385 // ... in this case we must try to send the message first ... 01386 01387 size_t const total_length = message_block->total_length (); 01388 01389 if (TAO_debug_level > 6) 01390 { 01391 ACE_DEBUG ((LM_DEBUG, 01392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01393 ACE_TEXT ("trying to send the message (ml = %d)\n"), 01394 this->id (), total_length)); 01395 } 01396 01397 // @@ I don't think we want to hold the mutex here, however if 01398 // we release it we need to recheck the status of the transport 01399 // after we return... once I understand the final form for this 01400 // code I will re-visit this decision 01401 n = this->send_message_block_chain_i (message_block, 01402 byte_count, 01403 dc); 01404 01405 if (n == -1) 01406 { 01407 // ... if this is just an EWOULDBLOCK we must schedule the 01408 // message for later, if it is ETIME we still have to send 01409 // the complete message, because cutting off the message at 01410 // this point will destroy the synchronization with the 01411 // server ... 01412 if (errno != EWOULDBLOCK && errno != ETIME) 01413 { 01414 if (TAO_debug_level > 0) 01415 { 01416 ACE_ERROR ((LM_ERROR, 01417 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01418 ACE_TEXT ("fatal error in ") 01419 ACE_TEXT ("send_message_block_chain_i - %m\n"), 01420 this->id ())); 01421 } 01422 return -1; 01423 } 01424 } 01425 01426 // ... let's figure out if the complete message was sent ... 01427 if (total_length == byte_count) 01428 { 01429 // Done, just return. Notice that there are no allocations 01430 // or copies up to this point (though some fancy calling 01431 // back and forth). 01432 // This is the common case for the critical path, it should 01433 // be fast. 01434 return 0; 01435 } 01436 01437 if (byte_count > 0) 01438 { 01439 partially_sent = true; 01440 } 01441 01442 // If it was partially sent, then push to front of queue and don't flush 01443 if (errno == ETIME) 01444 { 01445 timeout_encountered = true; 01446 if (byte_count == 0) 01447 { 01448 //This request has timed out and none of it was sent to the transport 01449 //We can't return -1 here, since that would end up closing the tranpsort 01450 if (TAO_debug_level > 2) 01451 { 01452 ACE_DEBUG ((LM_DEBUG, 01453 ACE_TEXT ("TAO (%P|%t) - ") 01454 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01455 ACE_TEXT ("timeout encountered before any bytes sent\n"), 01456 this->id ())); 01457 } 01458 throw ::CORBA::TIMEOUT ( 01459 CORBA::SystemException::_tao_minor_code ( 01460 TAO_TIMEOUT_SEND_MINOR_CODE, 01461 ETIME), 01462 CORBA::COMPLETED_NO); 01463 } 01464 } 01465 01466 if (TAO_debug_level > 6) 01467 { 01468 ACE_DEBUG ((LM_DEBUG, 01469 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01470 ACE_TEXT ("partial send %d / %d bytes\n"), 01471 this->id (), byte_count, total_length)); 01472 } 01473 01474 // ... part of the data was sent, need to figure out what piece 01475 // of the message block chain must be queued ... 01476 while (message_block != 0 && message_block->length () == 0) 01477 { 01478 message_block = message_block->cont (); 01479 } 01480 01481 // ... at least some portion of the message block chain should 01482 // remain ... 01483 } 01484 01485 // ... either the message must be queued or we need to queue it 01486 // because it was not completely sent out ... 01487 01488 ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time); 01489 if (this->queue_message_i (message_block, wait_time, !partially_sent) 01490 == -1) 01491 { 01492 if (TAO_debug_level > 0) 01493 { 01494 ACE_DEBUG ((LM_DEBUG, 01495 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01496 ACE_TEXT ("send_asynchronous_message_i, ") 01497 ACE_TEXT ("cannot queue message for - %m\n"), 01498 this->id ())); 01499 } 01500 return -1; 01501 } 01502 01503 if (TAO_debug_level > 6) 01504 { 01505 ACE_DEBUG ((LM_DEBUG, 01506 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") 01507 ACE_TEXT ("message is queued\n"), 01508 this->id ())); 01509 } 01510 01511 if (timeout_encountered && partially_sent) 01512 { 01513 //Must close down the transport here since we can't guarantee the 01514 //integrity of the GIOP stream (the next send may try to write to 01515 //the socket before looking at the queue). 01516 if (TAO_debug_level > 0) 01517 { 01518 ACE_DEBUG ((LM_DEBUG, 01519 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01520 ACE_TEXT ("send_asynchronous_message_i, ") 01521 ACE_TEXT ("timeout after partial send, closing.\n"), 01522 this->id ())); 01523 } 01524 return -1; 01525 } 01526 else if (!timeout_encountered) 01527 { 01528 // We can't flush if we have already encountered a timeout 01529 // ... if the queue is full we need to activate the output on the 01530 // queue ... 01531 bool must_flush = false; 01532 const bool constraints_reached = 01533 this->check_buffering_constraints_i (stub, 01534 must_flush); 01535 01536 // ... but we also want to activate it if the message was partially 01537 // sent.... Plus, when we use the blocking flushing strategy the 01538 // queue is flushed as a side-effect of 'schedule_output()' 01539 01540 TAO_Flushing_Strategy *flushing_strategy = 01541 this->orb_core ()->flushing_strategy (); 01542 01543 if (constraints_reached || try_sending_first) 01544 { 01545 int const result = flushing_strategy->schedule_output (this); 01546 if (result == TAO_Flushing_Strategy::MUST_FLUSH) 01547 { 01548 must_flush = true; 01549 } 01550 } 01551 01552 if (must_flush) 01553 { 01554 if (TAO_debug_level > 0) 01555 { 01556 ACE_DEBUG ((LM_DEBUG, 01557 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 01558 ACE_TEXT ("send_asynchronous_message_i, ") 01559 ACE_TEXT ("flushing transport.\n"), 01560 this->id ())); 01561 } 01562 01563 size_t sent_byte = sent_byte_count_; 01564 int ret = 0; 01565 { 01566 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 01567 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 01568 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 01569 ret = flushing_strategy->flush_transport (this, max_wait_time); 01570 } 01571 01572 if (ret == -1) 01573 { 01574 if (errno == ETIME) 01575 { 01576 if (sent_byte == sent_byte_count_) // if nothing was actually flushed 01577 { 01578 //This request has timed out and none of it was sent to the transport 01579 //We can't return -1 here, since that would end up closing the tranpsort 01580 if (TAO_debug_level > 2) 01581 { 01582 ACE_DEBUG ((LM_DEBUG, 01583 ACE_TEXT ("TAO (%P|%t) - ") 01584 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ") 01585 ACE_TEXT ("2 timeout encountered before any bytes sent\n"), 01586 this->id ())); 01587 } 01588 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code 01589 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME), 01590 CORBA::COMPLETED_NO); 01591 } 01592 } 01593 return -1; 01594 } 01595 } 01596 } 01597 return 0; 01598 }
| void TAO_Transport::send_connection_closed_notifications | ( | void | ) |
Notify all the components inside a Transport when the underlying connection is closed.
Definition at line 1294 of file Transport.cpp.
01295 { 01296 { 01297 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); 01298 01299 this->send_connection_closed_notifications_i (); 01300 } 01301 01302 this->tms ()->connection_closed (); 01303 }
| void TAO_Transport::send_connection_closed_notifications_i | ( | void | ) | [private] |
Assume the lock is held.
Definition at line 1306 of file Transport.cpp.
01307 { 01308 this->cleanup_queue_i (); 01309 }
| virtual int TAO_Transport::send_message | ( | TAO_OutputCDR & | stream, | |
| TAO_Stub * | stub = 0, |
|||
| TAO_Message_Semantics | message_semantics = TAO_TWOWAY_REQUEST, |
|||
| ACE_Time_Value * | max_time_wait = 0 | |||
| ) | [pure virtual] |
This method formats the stream and then sends the message on the transport. Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.
| int TAO_Transport::send_message_block_chain | ( | const ACE_Message_Block * | message_block, | |
| size_t & | bytes_transferred, | |||
| ACE_Time_Value * | max_wait_time = 0 | |||
| ) |
This is a very specialized interface to send a simple chain of messages through the Transport. The only place we use this interface is in GIOP_Message_Base.cpp, to send error messages (i.e., an indication that we received a malformed GIOP message,) and to close the connection.
Definition at line 568 of file Transport.cpp.
00571 { 00572 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00573 00574 TAO::Transport::Drain_Constraints dc( 00575 max_wait_time, true); 00576 00577 return this->send_message_block_chain_i (mb, 00578 bytes_transferred, 00579 dc); 00580 }
| int TAO_Transport::send_message_block_chain_i | ( | const ACE_Message_Block * | message_block, | |
| size_t & | bytes_transferred, | |||
| TAO::Transport::Drain_Constraints const & | dc | |||
| ) |
Send a message block chain, assuming the lock is held.
Definition at line 583 of file Transport.cpp.
00586 { 00587 size_t const total_length = mb->total_length (); 00588 00589 // We are going to block, so there is no need to clone 00590 // the message block. 00591 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00592 00593 synch_message.push_back (this->head_, this->tail_); 00594 00595 Drain_Result const n = this->drain_queue_i (dc); 00596 00597 if (n == DR_ERROR) 00598 { 00599 synch_message.remove_from_list (this->head_, this->tail_); 00600 return -1; // Error while sending... 00601 } 00602 else if (n == DR_QUEUE_EMPTY) 00603 { 00604 bytes_transferred = total_length; 00605 return 1; // Empty queue, message was sent.. 00606 } 00607 00608 // Remove the temporary message from the queue... 00609 synch_message.remove_from_list (this->head_, this->tail_); 00610 00611 bytes_transferred = total_length - synch_message.message_length (); 00612 00613 return 0; 00614 }
| int TAO_Transport::send_message_shared | ( | TAO_Stub * | stub, | |
| TAO_Message_Semantics | message_semantics, | |||
| const ACE_Message_Block * | message_block, | |||
| ACE_Time_Value * | max_wait_time | |||
| ) | [virtual] |
Sent the contents of message_block.
| stub | The object reference used for this operation, useful to obtain the current policies. | |
| message_semantics | If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return. | |
| message_block | The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field. | |
| max_wait_time | The maximum time that the operation can block, used in the implementation of timeouts. |
Definition at line 295 of file Transport.cpp.
00299 { 00300 int result = 0; 00301 00302 { 00303 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); 00304 00305 result = 00306 this->send_message_shared_i (stub, message_semantics, 00307 message_block, max_wait_time); 00308 } 00309 00310 if (result == -1) 00311 { 00312 // The connection needs to be closed here. 00313 // In the case of a partially written message this is the only way to cleanup 00314 // the physical connection as well as the Transport. An EOF on the remote end 00315 // will cancel the partially received message. 00316 this->close_connection (); 00317 } 00318 00319 return result; 00320 }
| int TAO_Transport::send_message_shared_i | ( | TAO_Stub * | stub, | |
| TAO_Message_Semantics | message_semantics, | |||
| const ACE_Message_Block * | message_block, | |||
| ACE_Time_Value * | max_wait_time | |||
| ) | [protected] |
Implement send_message_shared() assuming the handler_lock_ is held.
Definition at line 1312 of file Transport.cpp.
01316 { 01317 int ret = 0; 01318 01319 #if TAO_HAS_TRANSPORT_CURRENT == 1 01320 size_t const message_length = message_block->length (); 01321 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01322 01323 switch (message_semantics) 01324 { 01325 case TAO_TWOWAY_REQUEST: 01326 ret = this->send_synchronous_message_i (message_block, max_wait_time); 01327 break; 01328 01329 case TAO_REPLY: 01330 ret = this->send_reply_message_i (message_block, max_wait_time); 01331 break; 01332 01333 case TAO_ONEWAY_REQUEST: 01334 ret = this->send_asynchronous_message_i (stub, 01335 message_block, 01336 max_wait_time); 01337 break; 01338 } 01339 01340 #if TAO_HAS_TRANSPORT_CURRENT == 1 01341 // "Count" the message, only if no error was encountered. 01342 if (ret != -1 && this->stats_ != 0) 01343 this->stats_->messages_sent (message_length); 01344 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ 01345 01346 return ret; 01347 }
| int TAO_Transport::send_reply_message_i | ( | const ACE_Message_Block * | message_block, | |
| ACE_Time_Value * | max_wait_time | |||
| ) | [private] |
Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.
Definition at line 713 of file Transport.cpp.
00715 { 00716 // Dont clone now.. We could be sent in one shot! 00717 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00718 00719 synch_message.push_back (this->head_, this->tail_); 00720 00721 int const n = 00722 this->send_synch_message_helper_i (synch_message, max_wait_time); 00723 00724 // What about partially sent messages. 00725 if (n == -1 || n == 1) 00726 { 00727 return n; 00728 } 00729 00730 if (TAO_debug_level > 3) 00731 { 00732 ACE_DEBUG ((LM_DEBUG, 00733 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ") 00734 ACE_TEXT ("preparing to add to queue before leaving\n"), 00735 this->id ())); 00736 } 00737 00738 // Till this point we shouldn't have any copying and that is the 00739 // point anyway. Now, remove the node from the list 00740 synch_message.remove_from_list (this->head_, this->tail_); 00741 00742 // Clone the node that we have. 00743 TAO_Queued_Message *msg = 00744 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); 00745 00746 // Stick it in the queue 00747 msg->push_back (this->head_, this->tail_); 00748 00749 TAO_Flushing_Strategy *flushing_strategy = 00750 this->orb_core ()->flushing_strategy (); 00751 00752 int const result = flushing_strategy->schedule_output (this); 00753 00754 if (result == -1) 00755 { 00756 if (TAO_debug_level > 5) 00757 { 00758 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_" 00759 "message_i, dequeuing msg due to schedule_output " 00760 "failure\n", this->id ())); 00761 } 00762 msg->remove_from_list (this->head_, this->tail_); 00763 msg->destroy (); 00764 } 00765 else if (result == TAO_Flushing_Strategy::MUST_FLUSH) 00766 { 00767 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00768 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00769 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00770 (void) flushing_strategy->flush_transport (this, 0); 00771 } 00772 00773 return 1; 00774 }
| virtual int TAO_Transport::send_request | ( | TAO_Stub * | stub, | |
| TAO_ORB_Core * | orb_core, | |||
| TAO_OutputCDR & | stream, | |||
| TAO_Message_Semantics | message_semantics, | |||
| ACE_Time_Value * | max_time_wait | |||
| ) | [pure virtual] |
Prepare the waiting and demuxing strategy to receive a reply for a new request. Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request
but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply
The following method encapsulates this idiom.
| int TAO_Transport::send_synch_message_helper_i | ( | TAO_Synch_Queued_Message & | s, | |
| ACE_Time_Value * | max_wait_time | |||
| ) | [private] |
A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.
Definition at line 777 of file Transport.cpp.
00779 { 00780 TAO::Transport::Drain_Constraints dc( 00781 max_wait_time, this->using_blocking_io_for_synch_messages()); 00782 00783 Drain_Result const n = this->drain_queue_i (dc); 00784 00785 if (n == DR_ERROR) 00786 { 00787 synch_message.remove_from_list (this->head_, this->tail_); 00788 return -1; // Error while sending... 00789 } 00790 else if (n == DR_QUEUE_EMPTY) 00791 { 00792 return 1; // Empty queue, message was sent.. 00793 } 00794 00795 if (synch_message.all_data_sent ()) 00796 { 00797 return 1; 00798 } 00799 00800 return 0; 00801 }
| int TAO_Transport::send_synchronous_message_i | ( | const ACE_Message_Block * | message_block, | |
| ACE_Time_Value * | max_wait_time | |||
| ) | [private] |
Send a synchronous message, i.e. block until the message is on the wire
Definition at line 617 of file Transport.cpp.
00619 { 00620 // We are going to block, so there is no need to clone 00621 // the message block. 00622 size_t const total_length = mb->total_length (); 00623 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); 00624 00625 synch_message.push_back (this->head_, this->tail_); 00626 00627 int const result = this->send_synch_message_helper_i (synch_message, 00628 max_wait_time); 00629 if (result == -1 && errno == ETIME) 00630 { 00631 if (total_length == synch_message.message_length ()) //none was sent 00632 { 00633 if (TAO_debug_level > 2) 00634 { 00635 ACE_DEBUG ((LM_DEBUG, 00636 ACE_TEXT ("TAO (%P|%t) - ") 00637 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ") 00638 ACE_TEXT ("timeout encountered before any bytes sent\n"), 00639 this->id ())); 00640 } 00641 throw ::CORBA::TIMEOUT ( 00642 CORBA::SystemException::_tao_minor_code ( 00643 TAO_TIMEOUT_SEND_MINOR_CODE, 00644 ETIME), 00645 CORBA::COMPLETED_NO); 00646 } 00647 else 00648 { 00649 return -1; 00650 } 00651 } 00652 else if(result == -1 || result == 1) 00653 { 00654 return result; 00655 } 00656 00657 TAO_Flushing_Strategy *flushing_strategy = 00658 this->orb_core ()->flushing_strategy (); 00659 if (flushing_strategy->schedule_output (this) == -1) 00660 { 00661 synch_message.remove_from_list (this->head_, this->tail_); 00662 if (TAO_debug_level > 0) 00663 { 00664 ACE_ERROR ((LM_ERROR, 00665 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") 00666 ACE_TEXT ("send_synchronous_message_i, ") 00667 ACE_TEXT ("error while scheduling flush - %m\n"), 00668 this->id ())); 00669 } 00670 return -1; 00671 } 00672 00673 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, 00674 // because we're always going to flush anyway. 00675 00676 // Release the mutex, other threads may modify the queue as we 00677 // block for a long time writing out data. 00678 int flush_result; 00679 { 00680 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; 00681 TAO_REVERSE_LOCK reverse (*this->handler_lock_); 00682 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); 00683 00684 flush_result = flushing_strategy->flush_message (this, 00685 &synch_message, 00686 max_wait_time); 00687 } 00688 00689 if (flush_result == -1) 00690 { 00691 synch_message.remove_from_list (this->head_, this->tail_); 00692 00693 // We don't need to do anything special for the timeout case. 00694 // The connection is going to get closed and the Transport destroyed. 00695 // The only thing to do maybe is to empty the queue. 00696 00697 if (TAO_debug_level > 0) 00698 { 00699 ACE_ERROR ((LM_ERROR, 00700 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") 00701 ACE_TEXT ("error while sending message - %m\n"), 00702 this->id ())); 00703 } 00704 00705 return -1; 00706 } 00707 00708 return 1; 00709 }
| size_t TAO_Transport::sent_byte_count | ( | void | ) | const |
Accessor to sent_byte_count_.
Definition at line 203 of file Transport.inl.
00204 { 00205 return this->sent_byte_count_; 00206 }
| void TAO_Transport::set_bidir_context_info | ( | TAO_Operation_Details & | opdetails | ) | [virtual] |
These classes need privileged access to:
Definition at line 2786 of file Transport.cpp.
| void TAO_Transport::set_flush_in_post_open | ( | void | ) |
Set the flush in post open flag.
Definition at line 209 of file Transport.inl.
00210 { 00211 this->flush_in_post_open_ = true; 00212 }
| TAO::Transport::Stats * TAO_Transport::stats | ( | void | ) | const |
Transport statistics.
Definition at line 217 of file Transport.inl.
00218 { 00219 return this->stats_; 00220 }
| CORBA::ULong TAO_Transport::tag | ( | void | ) | const |
Return the protocol tag.
The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.
Definition at line 14 of file Transport.inl.
00015 { 00016 return this->tag_; 00017 }
| int TAO_Transport::tear_listen_point_list | ( | TAO_InputCDR & | cdr | ) | [virtual] |
Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints
Definition at line 289 of file Transport.cpp.
00290 { 00291 ACE_NOTSUP_RETURN (-1); 00292 }
| TAO_Transport_Mux_Strategy * TAO_Transport::tms | ( | void | ) | const |
Get the TAO_Tranport_Mux_Strategy used by this object.
The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.
Definition at line 26 of file Transport.inl.
00027 { 00028 return tms_; 00029 }
| TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager | ( | void | ) | [private] |
Helper method that returns the Transport Cache Manager.
Definition at line 2610 of file Transport.cpp.
02611 { 02612 return this->orb_core_->lane_resources ().transport_cache (); 02613 }
| int TAO_Transport::update_transport | ( | void | ) |
Cache management.
Definition at line 514 of file Transport.cpp.
00515 { 00516 return this->transport_cache_manager ().update_entry (this->cache_map_entry_); 00517 }
| bool TAO_Transport::using_blocking_io_for_asynch_messages | ( | ) | const [private] |
Return true if blocking I/O should be used for sending asynchronous (AMI calls, non-blocking oneways, responses to operations, etc.) messages. This is determined based on the current flushing strategy.
Definition at line 2816 of file Transport.cpp.
| bool TAO_Transport::using_blocking_io_for_synch_messages | ( | ) | const [private] |
Return true if blocking I/O should be used for sending synchronous (two-way, reliable oneways, etc.) messages. This is determined based on the current flushing and waiting strategies.
Definition at line 2806 of file Transport.cpp.
02807 { 02808 if (this->wait_strategy()->can_process_upcalls()) 02809 { 02810 return false; 02811 } 02812 return true; 02813 }
| TAO_Wait_Strategy * TAO_Transport::wait_strategy | ( | void | ) | const |
Return the TAO_Wait_Strategy used by this object.
The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.
Definition at line 33 of file Transport.inl.
00034 { 00035 return this->ws_; 00036 }
| void TAO_Transport::wchar_translator | ( | TAO_Codeset_Translator_Base * | tf | ) |
CodeSet negotiation - Set the wchar codeset translator factory.
Definition at line 158 of file Transport.inl.
00159 { 00160 this->wchar_translator_ = tf; 00161 this->tcs_set_ = 1; 00162 }
| TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator | ( | void | ) | const |
CodeSet Negotiation - Get the wchar codeset translator factory.
Definition at line 145 of file Transport.inl.
00146 { 00147 return this->wchar_translator_; 00148 }
friend class TAO_Leader_Follower_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 948 of file Transport.h.
friend class TAO_Reactive_Flushing_Strategy [friend] |
These classes need privileged access to:
Definition at line 947 of file Transport.h.
friend class TAO_Thread_Per_Connection_Handler [friend] |
Needs priveleged access to event_handler_i ()
Definition at line 952 of file Transport.h.
int TAO_Transport::bidirectional_flag_ [protected] |
Use to check if bidirectional info has been synchronized with the peer. Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.
The value of this flag will be 0 if the client sends info and 1 if the server receives the info.
Definition at line 1113 of file Transport.h.
Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.
Definition at line 1085 of file Transport.h.
Additional member values required to support codeset translation.
@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?
Definition at line 1191 of file Transport.h.
ACE_Time_Value TAO_Transport::current_deadline_ [protected] |
The queue will start draining no later than <queeing_deadline_> if* the deadline is
Definition at line 1130 of file Transport.h.
bool TAO_Transport::first_request_ [private] |
First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.
Definition at line 1203 of file Transport.h.
bool TAO_Transport::flush_in_post_open_ [private] |
Indicate that flushing needs to be done in post_open().
Definition at line 1223 of file Transport.h.
long TAO_Transport::flush_timer_id_ [protected] |
The timer ID.
Definition at line 1133 of file Transport.h.
ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected] |
Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.
Definition at line 1147 of file Transport.h.
TAO_Queued_Message* TAO_Transport::head_ [protected] |
Implement the outgoing data queue.
Definition at line 1118 of file Transport.h.
size_t TAO_Transport::id_ [protected] |
A unique identifier for the transport.
This never *never* changes over the lifespan, so we don't have to worry about locking it.
HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.
Definition at line 1157 of file Transport.h.
Queue of the consolidated, incoming messages..
Definition at line 1122 of file Transport.h.
TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected] |
Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"
Definition at line 1126 of file Transport.h.
bool TAO_Transport::is_connected_ [protected] |
Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready
Definition at line 1171 of file Transport.h.
Our messaging object.
Definition at line 1176 of file Transport.h.
These classes need privileged access to:
Definition at line 1115 of file Transport.h.
TAO_ORB_Core* const TAO_Transport::orb_core_ [protected] |
Global orbcore resource.
Definition at line 1081 of file Transport.h.
TAO_SYNCH_MUTEX TAO_Transport::output_cdr_mutex_ [mutable, private] |
lock for synchronizing Transport OutputCDR access
Definition at line 1226 of file Transport.h.
ACE_Message_Block* TAO_Transport::partial_message_ [private] |
Holds the partial GIOP message (if there is one).
Definition at line 1206 of file Transport.h.
unsigned long TAO_Transport::purging_order_ [protected] |
Used by the LRU, LFU and FIFO Connection Purging Strategies.
Definition at line 1160 of file Transport.h.
size_t TAO_Transport::recv_buffer_size_ [protected] |
Size of the buffer received.
Definition at line 1163 of file Transport.h.
size_t TAO_Transport::sent_byte_count_ [protected] |
Number of bytes sent.
Definition at line 1166 of file Transport.h.
TAO::Transport::Stats* TAO_Transport::stats_ [private] |
Statistics.
Definition at line 1219 of file Transport.h.
CORBA::ULong const TAO_Transport::tag_ [protected] |
IOP protocol tag.
Definition at line 1078 of file Transport.h.
TAO_Queued_Message* TAO_Transport::tail_ [protected] |
These classes need privileged access to:
Definition at line 1119 of file Transport.h.
CORBA::Boolean TAO_Transport::tcs_set_ [private] |
The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.
Definition at line 1197 of file Transport.h.
TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected] |
Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.
Definition at line 1089 of file Transport.h.
TAO_Transport_Timer TAO_Transport::transport_timer_ [protected] |
The adapter used to receive timeout callbacks from the Reactor.
Definition at line 1136 of file Transport.h.
These classes need privileged access to:
Definition at line 1192 of file Transport.h.
TAO_Wait_Strategy* TAO_Transport::ws_ [protected] |
Strategy for waiting for the reply after sending the request.
Definition at line 1092 of file Transport.h.
1.6.1