TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Inheritance diagram for TAO_Transport:
Inheritance graph
[legend]
Collaboration diagram for TAO_Transport:
Collaboration graph
[legend]

List of all members.

Classes

struct  Drain_Result

Public Types

enum  Drain_Result_Enum { DR_ERROR = -1, DR_OK = 0, DR_QUEUE_EMPTY = 1, DR_WOULDBLOCK = 2 }

Public Member Functions

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE)
 Default creator, requires the tag value be supplied.
virtual ~TAO_Transport (void)
 Destructor.
CORBA::ULong tag (void) const
 Return the protocol tag.
TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection.
TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object.
TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object.
Drain_Result handle_output (TAO::Transport::Drain_Constraints const &c)
 Callback method to reactively drain the outgoing data queue.
int bidirectional_flag (void) const
 Get the bidirectional flag.
void bidirectional_flag (int flag)
 Set the bidirectional flag.
void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set the Cache Map entry.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry (void)
 Get the Cache Map entry.
size_t id (void) const
 Set and Get the identifier for this transport instance.
void id (size_t id)
TAO::Connection_Role opened_as (void) const
void opened_as (TAO::Connection_Role)
unsigned long purging_order (void) const
void purging_order (unsigned long value)
bool queue_is_empty (void)
 Check if there are messages pending in the queue.
bool register_if_necessary (void)
 Register with the reactor via the wait strategy.
void provide_handler (TAO::Connection_Handler_Set &handlers)
 Added event handler to the handlers set.
bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers)
virtual int register_handler (void)
 Register the handler with the reactor.
virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, ACE_Time_Value const *timeout)=0
 Write the complete Message_Block chain to the connection.
virtual ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
 Read len bytes from into buf.
Control connection lifecycle

These methods are routed through the TMS object. The TMS strategies implement them correctly.



bool idle_after_send (void)
bool idle_after_reply (void)
virtual void close_connection (void)
 Call the implementation method after obtaining the lock.

Template methods

The Transport class uses the Template Method Pattern to implement the protocol specific functionality. Implementors of a pluggable protocol should override the following methods with the semantics documented below.



class TAO_Reactive_Flushing_Strategy
class TAO_Leader_Follower_Flushing_Strategy
class TAO_Thread_Per_Connection_Handler
CORBA::ULong const tag_
 IOP protocol tag.
TAO_ORB_Core *const orb_core_
 Global orbcore resource.
TAO::Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry_
TAO_Transport_Mux_Strategytms_
TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request.
int bidirectional_flag_
TAO::Connection_Role opening_connection_role_
TAO_Queued_Messagehead_
 Implement the outgoing data queue.
TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the consolidated, incoming messages..
TAO::Incoming_Message_Stack incoming_message_stack_
ACE_Time_Value current_deadline_
long flush_timer_id_
 The timer ID.
TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor.
ACE_Lockhandler_lock_
size_t id_
 A unique identifier for the transport.
unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies.
size_t recv_buffer_size_
 Size of the buffer received.
size_t sent_byte_count_
 Number of bytes sent.
bool is_connected_
TAO_GIOP_Message_Basemessaging_object_
 Our messaging object.
TAO_Codeset_Translator_Basechar_translator_
 Additional member values required to support codeset translation.
TAO_Codeset_Translator_Basewchar_translator_
CORBA::Boolean tcs_set_
bool first_request_
ACE_Message_Blockpartial_message_
 Holds the partial GIOP message (if there is one).
TAO::Transport::Stats * stats_
 Statistics.
bool flush_in_post_open_
 Indicate that flushing needs to be done in post_open().
TAO_SYNCH_MUTEX output_cdr_mutex_
 lock for synchronizing Transport OutputCDR access
void messaging_init (TAO_GIOP_Message_Version const &version)
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
virtual bool post_connect_hook (void)
 Hooks that can be overridden in concrete transports.
ACE_Event_Handler::Reference_Count add_reference (void)
 Memory management routines.
ACE_Event_Handler::Reference_Count remove_reference (void)
TAO_GIOP_Message_Basemessaging_object (void)
virtual ACE_Event_Handlerevent_handler_i (void)=0
bool is_connected (void) const
 Is this transport really connected.
bool post_open (size_t id)
 Perform all the actions when this transport get opened.
void pre_close (void)
 do what needs to be done when closing the transport
TAO_Connection_Handlerconnection_handler (void)
 Get the connection handler for this transport.
TAO_OutputCDRout_stream (void)
 Accessor for the output CDR stream.
TAO_SYNCH_MUTEX & output_cdr_lock (void)
 Accessor for synchronizing Transport OutputCDR access.
void set_flush_in_post_open (void)
 Set the flush in post open flag.
bool can_be_purged (void)
 Can the transport be purged?
virtual void set_bidir_context_info (TAO_Operation_Details &opdetails)
int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 Recache ourselves in the cache.
virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0)
 Callback to read incoming data.
virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, TAO_Message_Semantics message_semantics, ACE_Time_Value *max_time_wait)=0
virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, TAO_Message_Semantics message_semantics=TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0
virtual int send_message_shared (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of message_block.
int format_queue_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time, TAO_Stub *stub)
int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, TAO::Transport::Drain_Constraints const &dc)
 Send a message block chain, assuming the lock is held.
int purge_entry (void)
 Cache management.
int make_idle (void)
 Cache management.
int update_transport (void)
 Cache management.
int handle_timeout (const ACE_Time_Value &current_time, const void *act)
size_t recv_buffer_size (void) const
 Accessor to recv_buffer_size_.
size_t sent_byte_count (void) const
 Accessor to sent_byte_count_.
TAO_Codeset_Translator_Basechar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory.
TAO_Codeset_Translator_Basewchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory.
void char_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the char codeset translator factory.
void wchar_translator (TAO_Codeset_Translator_Base *)
 CodeSet negotiation - Set the wchar codeset translator factory.
void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
void clear_translators (TAO_InputCDR *, TAO_OutputCDR *)
CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set.
void first_request_sent (bool flag=false)
 Set the state of the first_request_ to flag.
bool first_request () const
 Get the first request flag.
void send_connection_closed_notifications (void)
TAO::Transport::Stats * stats (void) const
 Transport statistics.
virtual TAO_Connection_Handlerconnection_handler_i (void)=0
int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int send_message_shared_i (TAO_Stub *stub, TAO_Message_Semantics message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int queue_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time, bool back=true)
ACE_Time_Value const * io_timeout (TAO::Transport::Drain_Constraints const &dc) const
 Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.
TAO::Transport_Cache_Managertransport_cache_manager (void)
 Helper method that returns the Transport Cache Manager.
Drain_Result drain_queue (TAO::Transport::Drain_Constraints const &dc)
 Send some of the data in the queue.
Drain_Result drain_queue_i (TAO::Transport::Drain_Constraints const &dc)
 Implement drain_queue() assuming the lock is held.
bool queue_is_empty_i (void) const
 Check if there are messages pending in the queue.
Drain_Result drain_queue_helper (int &iovcnt, iovec iov[], TAO::Transport::Drain_Constraints const &dc)
 A helper routine used in drain_queue_i().
int schedule_output_i (void)
 Schedule handle_output() callbacks.
int cancel_output_i (void)
 Cancel handle_output() callbacks.
void cleanup_queue (size_t byte_count)
 Cleanup the queue.
void cleanup_queue_i ()
 Cleanup the complete queue.
int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
 Check if the buffering constraints have been reached.
int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_asynchronous_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
int flush_timer_pending (void) const
 Check if the flush timer is still pending.
void reset_flush_timer (void)
void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid.
int handle_input_missing_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time, TAO_Queued_Data *q_data)
int handle_input_parse_data (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time)
int handle_input_parse_extra_messages (ACE_Message_Block &message_block)
int consolidate_enqueue_message (TAO_Queued_Data *qd)
int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
void send_connection_closed_notifications_i (void)
 Assume the lock is held.
void allocate_partial_message_block (void)
bool using_blocking_io_for_synch_messages () const
bool using_blocking_io_for_asynch_messages () const

Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, data will be copied into new buffer being able to hold full message and is queued; succeeding events will read data from socket and write directly into this buffer. Otherwise, if if the message in local buffer is complete, we free the handle and then send the message to the higher layers of the ORB for processing.

(b) If buffer with incomplete message has been enqueued, while trying to do the above, the reactor will call us back when the handle becomes read ready. The read-operation will copy data directly into the enqueued buffer. If the message has bee read completely the message is sent to the higher layers of the ORB for processing.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD

Definition at line 321 of file Transport.h.


Member Enumeration Documentation

Enumerator:
DR_ERROR 
DR_OK 
DR_QUEUE_EMPTY 
DR_WOULDBLOCK 

Definition at line 367 of file Transport.h.

00368     {
00369       DR_ERROR = -1,
00370       DR_OK = 0,
00371       DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue()
00372       DR_WOULDBLOCK = 2
00373     };


Constructor & Destructor Documentation

TAO_Transport::TAO_Transport ( CORBA::ULong  tag,
TAO_ORB_Core orb_core,
size_t  input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE 
)

Default creator, requires the tag value be supplied.

TAO_Transport::~TAO_Transport ( void   )  [virtual]

Destructor.

Definition at line 202 of file Transport.cpp.

00203 {
00204   delete this->messaging_object_;
00205 
00206   delete this->ws_;
00207 
00208   delete this->tms_;
00209 
00210   delete this->handler_lock_;
00211 
00212   if (!this->is_connected_)
00213     {
00214       // When we have a not connected transport we could have buffered
00215       // messages on this transport which we have to cleanup now.
00216       this->cleanup_queue_i();
00217     }
00218 
00219   // Release the partial message block, however we may
00220   // have never allocated one.
00221   ACE_Message_Block::release (this->partial_message_);
00222 
00223   // By the time the destructor is reached here all the connection stuff
00224   // *must* have been cleaned up.
00225 
00226   // The following assert is needed for the test "Bug_2494_Regression".
00227   // See the bugzilla bug #2494 for details.
00228   ACE_ASSERT (this->queue_is_empty_i ());
00229   ACE_ASSERT (this->cache_map_entry_ == 0);
00230 
00231 #if TAO_HAS_TRANSPORT_CURRENT == 1
00232   delete this->stats_;
00233 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
00234 
00235   /*
00236    * Hook to add code that cleans up components
00237    * belong to the concrete protocol implementation.
00238    * Further additions to this Transport class will
00239    * need to add code *before* this hook.
00240    */
00241   //@@ TAO_TRANSPORT_SPL_DESTRUCTOR_ADD_HOOK
00242 }


Member Function Documentation

ACE_Event_Handler::Reference_Count TAO_Transport::add_reference ( void   ) 

Memory management routines.

Definition at line 2646 of file Transport.cpp.

02647 {
02648   return this->event_handler_i ()->add_reference ();
02649 }

void TAO_Transport::allocate_partial_message_block ( void   )  [private]

Allocate a partial message block and store it in our partial_message_ data member.

Definition at line 2769 of file Transport.cpp.

02770 {
02771   if (this->partial_message_ == 0)
02772     {
02773       // This value must be at least large enough to hold a GIOP message
02774       // header plus a GIOP fragment header
02775       size_t const partial_message_size =
02776         this->messaging_object ()->header_length ();
02777        // + this->messaging_object ()->fragment_header_length ();
02778        // deprecated, conflicts with not-single_read_opt.
02779 
02780       ACE_NEW (this->partial_message_,
02781                ACE_Message_Block (partial_message_size));
02782     }
02783 }

void TAO_Transport::assign_translators ( TAO_InputCDR inp,
TAO_OutputCDR outp 
)

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2616 of file Transport.cpp.

02617 {
02618   if (this->char_translator_)
02619     {
02620       this->char_translator_->assign (inp);
02621       this->char_translator_->assign (outp);
02622     }
02623   if (this->wchar_translator_)
02624     {
02625       this->wchar_translator_->assign (inp);
02626       this->wchar_translator_->assign (outp);
02627     }
02628 }

void TAO_Transport::bidirectional_flag ( int  flag  ) 

Set the bidirectional flag.

Definition at line 45 of file Transport.inl.

00046 {
00047   this->bidirectional_flag_ = flag;
00048 }

int TAO_Transport::bidirectional_flag ( void   )  const

Get the bidirectional flag.

Definition at line 39 of file Transport.inl.

00040 {
00041   return this->bidirectional_flag_;
00042 }

TAO::Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry ( void   ) 

Get the Cache Map entry.

Definition at line 63 of file Transport.inl.

00064 {
00065   return this->cache_map_entry_;
00066 }

void TAO_Transport::cache_map_entry ( TAO::Transport_Cache_Manager::HASH_MAP_ENTRY entry  ) 

Set the Cache Map entry.

Definition at line 69 of file Transport.inl.

00071 {
00072   // Sync with TAO_Transport::purge_entry()
00073   ACE_GUARD (ACE_Lock, ace_mon, *this->handler_lock_);
00074   this->cache_map_entry_ = entry;
00075 }

bool TAO_Transport::can_be_purged ( void   ) 

Can the transport be purged?

Definition at line 495 of file Transport.cpp.

00496 {
00497   return !this->tms_->has_request ();
00498 }

int TAO_Transport::cancel_output_i ( void   )  [private]

Cancel handle_output() callbacks.

Definition at line 858 of file Transport.cpp.

00859 {
00860   ACE_Event_Handler * const eh = this->event_handler_i ();
00861   ACE_Reactor *const reactor = eh->reactor ();
00862 
00863   if (TAO_debug_level > 3)
00864     {
00865       ACE_DEBUG ((LM_DEBUG,
00866          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00867          this->id ()));
00868     }
00869 
00870   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00871 }

void TAO_Transport::char_translator ( TAO_Codeset_Translator_Base tf  ) 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 151 of file Transport.inl.

00152 {
00153   this->char_translator_ = tf;
00154   this->tcs_set_ = 1;
00155 }

TAO_Codeset_Translator_Base * TAO_Transport::char_translator ( void   )  const

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 139 of file Transport.inl.

00140 {
00141   return this->char_translator_;
00142 }

int TAO_Transport::check_buffering_constraints_i ( TAO_Stub stub,
bool &  must_flush 
) [private]

Check if the buffering constraints have been reached.

Definition at line 1223 of file Transport.cpp.

01224 {
01225   // First let's compute the size of the queue:
01226   size_t msg_count = 0;
01227   size_t total_bytes = 0;
01228 
01229   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01230     {
01231       ++msg_count;
01232       total_bytes += i->message_length ();
01233     }
01234 
01235   bool set_timer = false;
01236   ACE_Time_Value new_deadline;
01237 
01238   TAO::Transport_Queueing_Strategy *queue_strategy =
01239     stub->transport_queueing_strategy ();
01240 
01241   bool constraints_reached = true;
01242 
01243   if (queue_strategy)
01244     {
01245       constraints_reached =
01246         queue_strategy->buffering_constraints_reached (stub,
01247                                                        msg_count,
01248                                                        total_bytes,
01249                                                        must_flush,
01250                                                        this->current_deadline_,
01251                                                        set_timer,
01252                                                        new_deadline);
01253     }
01254   else
01255     {
01256       must_flush = false;
01257     }
01258 
01259   // ... set the new timer, also cancel any previous timers ...
01260   if (set_timer)
01261     {
01262       ACE_Event_Handler *eh = this->event_handler_i ();
01263       ACE_Reactor * const reactor = eh->reactor ();
01264       this->current_deadline_ = new_deadline;
01265       ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01266 
01267       if (this->flush_timer_pending ())
01268         {
01269           reactor->cancel_timer (this->flush_timer_id_);
01270         }
01271 
01272       this->flush_timer_id_ =
01273         reactor->schedule_timer (&this->transport_timer_,
01274                                  &this->current_deadline_,
01275                                  delay);
01276     }
01277 
01278   return constraints_reached;
01279 }

void TAO_Transport::cleanup_queue ( size_t  byte_count  )  [private]

Cleanup the queue.

Exactly byte_count bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1176 of file Transport.cpp.

01177 {
01178   while (!this->queue_is_empty_i () && byte_count > 0)
01179     {
01180       TAO_Queued_Message *i = this->head_;
01181 
01182       if (TAO_debug_level > 4)
01183         {
01184           ACE_DEBUG ((LM_DEBUG,
01185              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01186              ACE_TEXT ("byte_count = %d\n"),
01187              this->id (), byte_count));
01188         }
01189 
01190       // Update the state of the first message
01191       i->bytes_transferred (byte_count);
01192 
01193       if (TAO_debug_level > 4)
01194         {
01195           ACE_DEBUG ((LM_DEBUG,
01196              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01197              ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01198              this->id (), byte_count, i->all_data_sent (),
01199              i->message_length ()));
01200         }
01201 
01202       // ... if all the data was sent the message must be removed from
01203       // the queue...
01204       if (i->all_data_sent ())
01205         {
01206           i->remove_from_list (this->head_, this->tail_);
01207           i->destroy ();
01208         }
01209       else if (byte_count == 0)
01210         {
01211           // If we have sent out a full message block, but we are not
01212           // finished with this message, we need to do something with the
01213           // message block chain held by our output stream.  If we don't,
01214           // another thread can attempt to service this transport and end
01215           // up resetting the output stream which will release the
01216           // message that we haven't finished sending.
01217           i->copy_if_necessary (this->out_stream ().begin ());
01218         }
01219     }
01220 }

void TAO_Transport::cleanup_queue_i (  )  [private]

Cleanup the complete queue.

Definition at line 1133 of file Transport.cpp.

01134 {
01135   if (TAO_debug_level > 4)
01136     {
01137       ACE_DEBUG ((LM_DEBUG,
01138          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01139          ACE_TEXT ("cleaning up complete queue\n"),
01140          this->id ()));
01141     }
01142 
01143   size_t byte_count = 0;
01144   int msg_count = 0;
01145 
01146   // Cleanup all messages
01147   while (!this->queue_is_empty_i ())
01148     {
01149       TAO_Queued_Message *i = this->head_;
01150 
01151       if (TAO_debug_level > 4)
01152         {
01153           byte_count += i->message_length();
01154           ++msg_count;
01155         }
01156        // @@ This is a good point to insert a flag to indicate that a
01157        //    CloseConnection message was successfully received.
01158       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01159                         this->orb_core_->leader_follower ());
01160 
01161       i->remove_from_list (this->head_, this->tail_);
01162 
01163       i->destroy ();
01164     }
01165 
01166   if (TAO_debug_level > 4)
01167     {
01168       ACE_DEBUG ((LM_DEBUG,
01169                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01170                   ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01171                   this->id (), msg_count, byte_count));
01172     }
01173 }

void TAO_Transport::clear_translators ( TAO_InputCDR inp,
TAO_OutputCDR outp 
)

It is necessary to clear the codeset translator when a CDR stream is used for more than one GIOP message. This is required since the header must not be translated, whereas the body must be.

Definition at line 2631 of file Transport.cpp.

02632 {
02633   if (inp)
02634     {
02635       inp->char_translator (0);
02636       inp->wchar_translator (0);
02637     }
02638   if (outp)
02639     {
02640       outp->char_translator (0);
02641       outp->wchar_translator (0);
02642     }
02643 }

void TAO_Transport::close_connection ( void   )  [virtual]

Call the implementation method after obtaining the lock.

Definition at line 359 of file Transport.cpp.

00360 {
00361   this->connection_handler_i ()->close_connection ();
00362 }

TAO_Connection_Handler * TAO_Transport::connection_handler ( void   ) 

Get the connection handler for this transport.

Definition at line 191 of file Transport.inl.

00192 {
00193   return this->connection_handler_i();
00194 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i ( void   )  [protected, pure virtual]

These classes need privileged access to:

int TAO_Transport::consolidate_enqueue_message ( TAO_Queued_Data qd  )  [private]
Returns:
-1 error, otherwise 0

Definition at line 1790 of file Transport.cpp.

01791 {
01792   // consolidate message on top of stack, only for fragmented messages
01793 
01794   // paranoid check
01795   if (q_data->missing_data () != 0)
01796     {
01797        return -1;
01798     }
01799 
01800   if (q_data->more_fragments () ||
01801       q_data->msg_type () == GIOP::Fragment)
01802     {
01803       TAO_Queued_Data *new_q_data = 0;
01804 
01805       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01806         {
01807         case -1: // error
01808           return -1;
01809 
01810         case 0:  // returning consolidated message in new_q_data
01811           if (!new_q_data)
01812             {
01813               if (TAO_debug_level > 0)
01814                 {
01815                   ACE_ERROR ((LM_ERROR,
01816                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01817                      ACE_TEXT ("error, consolidated message is NULL\n"),
01818                      this->id ()));
01819                 }
01820               return -1;
01821             }
01822 
01823           if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01824             {
01825               TAO_Queued_Data::release (new_q_data);
01826               return -1;
01827             }
01828           break;
01829 
01830         case 1:  // fragment has been stored in messaging_oject()
01831           break;
01832         }
01833     }
01834   else
01835     {
01836       if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01837         {
01838           TAO_Queued_Data::release (q_data);
01839           return -1;
01840         }
01841     }
01842 
01843   return 0; // success
01844 }

int TAO_Transport::consolidate_process_message ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [private]
Returns:
-1 error, otherwise 0

Definition at line 1703 of file Transport.cpp.

01705 {
01706   // paranoid check
01707   if (q_data->missing_data () != 0)
01708     {
01709       if (TAO_debug_level > 0)
01710         {
01711            ACE_ERROR ((LM_ERROR,
01712               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01713               ACE_TEXT ("missing data\n"),
01714               this->id ()));
01715         }
01716        return -1;
01717     }
01718 
01719   if (q_data->more_fragments () ||
01720       q_data->msg_type () == GIOP::Fragment)
01721     {
01722       // consolidate message on top of stack, only for fragmented messages
01723       TAO_Queued_Data *new_q_data = 0;
01724 
01725       switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01726         {
01727         case -1: // error
01728           return -1;
01729 
01730         case 0:  // returning consolidated message in q_data
01731           if (!new_q_data)
01732             {
01733               if (TAO_debug_level > 0)
01734                 {
01735                   ACE_ERROR ((LM_ERROR,
01736                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01737                      ACE_TEXT ("error, consolidated message is NULL\n"),
01738                      this->id ()));
01739                 }
01740               return -1;
01741             }
01742 
01743 
01744           if (this->process_parsed_messages (new_q_data, rh) == -1)
01745             {
01746               TAO_Queued_Data::release (new_q_data);
01747 
01748               if (TAO_debug_level > 0)
01749                 {
01750                   ACE_ERROR ((LM_ERROR,
01751                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01752                      ACE_TEXT ("error processing consolidated message\n"),
01753                      this->id ()));
01754                 }
01755               return -1;
01756             }
01757 
01758           TAO_Queued_Data::release (new_q_data);
01759 
01760           break;
01761 
01762         case 1:  // fragment has been stored in messaging_oject()
01763           break;
01764         }
01765     }
01766   else
01767     {
01768       if (this->process_parsed_messages (q_data, rh) == -1)
01769         {
01770           TAO_Queued_Data::release (q_data);
01771 
01772           if (TAO_debug_level > 0)
01773             {
01774               ACE_ERROR ((LM_ERROR,
01775                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01776                  ACE_TEXT ("error processing message\n"),
01777                  this->id ()));
01778             }
01779           return -1;
01780         }
01781 
01782       TAO_Queued_Data::release (q_data);
01783 
01784     }
01785 
01786   return 0;
01787 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue ( TAO::Transport::Drain_Constraints const &  dc  )  [private]

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Definition at line 915 of file Transport.cpp.

00916 {
00917   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR);
00918   Drain_Result const retval = this->drain_queue_i (dc);
00919 
00920   if (retval == DR_QUEUE_EMPTY)
00921     {
00922       // ... there is no current message or it was completely
00923       // sent, cancel output...
00924       TAO_Flushing_Strategy *flushing_strategy =
00925         this->orb_core ()->flushing_strategy ();
00926 
00927       flushing_strategy->cancel_output (this);
00928 
00929       return DR_OK;
00930     }
00931 
00932   return retval;
00933 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue_helper ( int &  iovcnt,
iovec  iov[],
TAO::Transport::Drain_Constraints const &  dc 
) [private]

A helper routine used in drain_queue_i().

Definition at line 936 of file Transport.cpp.

00938 {
00939   // As a side-effect, this decrements the timeout() pointed-to value by
00940   // the time used in this function.  That might be important as there are
00941   // potentially long running system calls invoked from here.
00942   ACE_Countdown_Time countdown(dc.timeout());
00943 
00944   size_t byte_count = 0;
00945 
00946   // ... send the message ...
00947   ssize_t retval = -1;
00948 
00949 #if TAO_HAS_SENDFILE == 1
00950   if (this->mmap_allocator_)
00951     retval = this->sendfile (this->mmap_allocator_,
00952                              iov,
00953                              iovcnt,
00954                              byte_count,
00955                              dc);
00956   else
00957 #endif  /* TAO_HAS_SENDFILE==1 */
00958     retval = this->send (iov, iovcnt, byte_count,
00959                          this->io_timeout (dc));
00960 
00961   if (TAO_debug_level == 5)
00962     {
00963       dump_iov (iov, iovcnt, this->id (),
00964                 byte_count, ACE_TEXT("drain_queue_helper"));
00965     }
00966 
00967   if (retval == 0)
00968     {
00969       if (TAO_debug_level > 4)
00970         {
00971           ACE_DEBUG ((LM_DEBUG,
00972              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00973              ACE_TEXT ("send() returns 0\n"),
00974              this->id ()));
00975         }
00976       return DR_ERROR;
00977     }
00978   else if (retval == -1)
00979     {
00980       if (TAO_debug_level > 4)
00981         {
00982           ACE_DEBUG ((LM_DEBUG,
00983              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00984              ACE_TEXT ("error during send() (errno: %d) - %m\n"),
00985              this->id (), ACE_ERRNO_GET));
00986         }
00987 
00988       if (errno == EWOULDBLOCK || errno == EAGAIN)
00989         {
00990           return DR_WOULDBLOCK;
00991         }
00992 
00993       return DR_ERROR;
00994     }
00995 
00996   // ... now we need to update the queue, removing elements
00997   // that have been sent, and updating the last element if it
00998   // was only partially sent ...
00999   this->cleanup_queue (byte_count);
01000   iovcnt = 0;
01001 
01002   // ... start over, how do we guarantee progress?  Because if
01003   // no bytes are sent send() can only return 0 or -1
01004 
01005   // Total no. of bytes sent for a send call
01006   this->sent_byte_count_ += byte_count;
01007 
01008   if (TAO_debug_level > 4)
01009     {
01010       ACE_DEBUG ((LM_DEBUG,
01011          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
01012          ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
01013          this->id(), byte_count, this->queue_is_empty_i ()));
01014     }
01015 
01016   return DR_QUEUE_EMPTY;
01017   // drain_queue_i will check if the queue is actually empty
01018 }

TAO_Transport::Drain_Result TAO_Transport::drain_queue_i ( TAO::Transport::Drain_Constraints const &  dc  )  [private]

Implement drain_queue() assuming the lock is held.

Definition at line 1021 of file Transport.cpp.

01022 {
01023   // This is the vector used to send data, it must be declared outside
01024   // the loop because after the loop there may still be data to be
01025   // sent
01026   int iovcnt = 0;
01027 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01028   iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
01029 #else
01030   iovec iov[ACE_IOV_MAX];
01031 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01032 
01033   // We loop over all the elements in the queue ...
01034   TAO_Queued_Message *i = this->head_;
01035 
01036   // Reset the value so that the counting is done for each new send
01037   // call.
01038   this->sent_byte_count_ = 0;
01039 
01040   // Avoid calling this expensive function each time through the loop. Instead
01041   // we'll assume that the time is unlikely to change much during the loop.
01042   // If we are forced to send in the loop then we'll recompute the time.
01043   ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
01044 
01045   while (i != 0)
01046     {
01047       if (i->is_expired (now))
01048         {
01049           if (TAO_debug_level > 3)
01050           {
01051             ACE_DEBUG ((LM_DEBUG,
01052               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01053               ACE_TEXT ("Discarding expired queued message.\n"),
01054               this->id ()));
01055           }
01056           TAO_Queued_Message *next = i->next ();
01057           i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
01058                             this->orb_core_->leader_follower ());
01059           i->remove_from_list (this->head_, this->tail_);
01060           i->destroy ();
01061           i = next;
01062           continue;
01063         }
01064       // ... each element fills the iovector ...
01065       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01066 
01067       // ... the vector is full, no choice but to send some data out.
01068       // We need to loop because a single message can span multiple
01069       // IOV_MAX elements ...
01070       if (iovcnt == ACE_IOV_MAX)
01071         {
01072           Drain_Result const retval =
01073             this->drain_queue_helper (iovcnt, iov, dc);
01074 
01075           if (TAO_debug_level > 4)
01076             {
01077               ACE_DEBUG ((LM_DEBUG,
01078                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01079                  ACE_TEXT ("helper retval = %d\n"),
01080                  this->id (), static_cast<int> (retval.dre_)));
01081             }
01082 
01083           if (retval != DR_QUEUE_EMPTY)
01084             {
01085               return retval;
01086             }
01087 
01088           now = ACE_High_Res_Timer::gettimeofday_hr ();
01089 
01090           i = this->head_;
01091           continue;
01092         }
01093       // ... notice that this line is only reached if there is still
01094       // room in the iovector ...
01095       i = i->next ();
01096     }
01097 
01098   if (iovcnt != 0)
01099     {
01100       Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc);
01101 
01102       if (TAO_debug_level > 4)
01103         {
01104           ACE_DEBUG ((LM_DEBUG,
01105               ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01106               ACE_TEXT ("helper retval = %d\n"),
01107               this->id (), static_cast<int> (retval.dre_)));
01108         }
01109 
01110       if (retval != DR_QUEUE_EMPTY)
01111         {
01112           return retval;
01113         }
01114     }
01115 
01116   if (this->queue_is_empty_i ())
01117     {
01118       if (this->flush_timer_pending ())
01119         {
01120           ACE_Event_Handler *eh = this->event_handler_i ();
01121           ACE_Reactor * const reactor = eh->reactor ();
01122           reactor->cancel_timer (this->flush_timer_id_);
01123           this->reset_flush_timer ();
01124         }
01125 
01126       return DR_QUEUE_EMPTY;
01127     }
01128 
01129   return DR_OK;
01130 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i ( void   )  [pure virtual]

Return the event handler used to receive notifications from the Reactor. Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that functions as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.
Todo:
This method has to be renamed to event_handler()
bool TAO_Transport::first_request ( void   )  const

Get the first request flag.

Definition at line 178 of file Transport.inl.

00179 {
00180   return this->first_request_;
00181 }

void TAO_Transport::first_request_sent ( bool  flag = false  ) 

Set the state of the first_request_ to flag.

Definition at line 172 of file Transport.inl.

00173 {
00174   this->first_request_ = flag;
00175 }

int TAO_Transport::flush_timer_pending ( void   )  const [private]

Check if the flush timer is still pending.

Definition at line 116 of file Transport.inl.

00117 {
00118   return this->flush_timer_id_ != -1;
00119 }

int TAO_Transport::format_queue_message ( TAO_OutputCDR stream,
ACE_Time_Value max_wait_time,
TAO_Stub stub 
)

Format and queue a message for stream

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 557 of file Transport.cpp.

00560 {
00561   if (this->messaging_object ()->format_message (stream, stub) != 0)
00562     return -1;
00563 
00564   return this->queue_message_i (stream.begin (), max_wait_time);
00565 }

int TAO_Transport::generate_locate_request ( TAO_Target_Specification spec,
TAO_Operation_Details opdetails,
TAO_OutputCDR output 
)

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 419 of file Transport.cpp.

00423 {
00424   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00425                                                                  spec,
00426                                                                  output) == -1)
00427     {
00428       if (TAO_debug_level > 0)
00429         {
00430           ACE_ERROR ((LM_ERROR,
00431                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00432                       ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00433                       this->id ()));
00434         }
00435 
00436       return -1;
00437     }
00438 
00439   return 0;
00440 }

int TAO_Transport::generate_request_header ( TAO_Operation_Details opd,
TAO_Target_Specification spec,
TAO_OutputCDR msg 
) [virtual]

This is a request for the transport object to write a request header before it sends out the request

Definition at line 443 of file Transport.cpp.

00447 {
00448   if (this->messaging_object ()->generate_request_header (opdetails,
00449                                                           spec,
00450                                                           output) == -1)
00451     {
00452       if (TAO_debug_level > 0)
00453         {
00454           ACE_ERROR ((LM_ERROR,
00455                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ")
00456                       ACE_TEXT ("error while marshalling the Request header\n"),
00457                       this->id()));
00458         }
00459 
00460       return -1;
00461     }
00462 
00463   return 0;
00464 }

int TAO_Transport::handle_input ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time = 0 
) [virtual]

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!

Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.

Definition at line 1629 of file Transport.cpp.

01631 {
01632   if (TAO_debug_level > 3)
01633     {
01634       ACE_DEBUG ((LM_DEBUG,
01635          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01636          this->id ()));
01637     }
01638 
01639   // First try to process messages of the head of the incoming queue.
01640   int const retval = this->process_queue_head (rh);
01641 
01642   if (retval <= 0)
01643     {
01644       if (retval == -1)
01645         {
01646           if (TAO_debug_level > 2)
01647             {
01648               ACE_ERROR ((LM_ERROR,
01649                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01650                  ACE_TEXT ("error while parsing the head of the queue\n"),
01651                  this->id()));
01652 
01653             }
01654           return -1;
01655         }
01656       else
01657         {
01658           // retval == 0
01659 
01660           // Processed a message in queue successfully. This
01661           // thread must return to thread-pool now.
01662           return 0;
01663         }
01664     }
01665 
01666   TAO_Queued_Data *q_data = 0;
01667 
01668   if (this->incoming_message_stack_.top (q_data) != -1
01669       && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01670     {
01671       /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete  */
01672       if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01673         {
01674           if (TAO_debug_level > 0)
01675             {
01676               ACE_ERROR ((LM_ERROR,
01677                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01678                  ACE_TEXT ("error consolidating incoming message\n"),
01679                  this->id ()));
01680             }
01681           return -1;
01682         }
01683     }
01684   else
01685     {
01686       if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01687         {
01688           if (TAO_debug_level > 0)
01689             {
01690               ACE_ERROR ((LM_ERROR,
01691                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01692                  ACE_TEXT ("error parsing incoming message\n"),
01693                  this->id ()));
01694             }
01695           return -1;
01696         }
01697     }
01698 
01699   return 0;
01700 }

int TAO_Transport::handle_input_missing_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time,
TAO_Queued_Data q_data 
) [private]

Is invoked by handle_input operation. It consolidate message on top of incoming_message_stack. The amount of missing data is known and recv operation copies data directly into message buffer, as much as a single recv-invocation provides.

Definition at line 1847 of file Transport.cpp.

01850 {
01851   // paranoid check
01852   if (q_data == 0)
01853     {
01854       return -1;
01855     }
01856 
01857   if (TAO_debug_level > 3)
01858     {
01859       ACE_DEBUG ((LM_DEBUG,
01860          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01861          ACE_TEXT ("enter (missing data == %d)\n"),
01862          this->id (), q_data->missing_data ()));
01863     }
01864 
01865   size_t const recv_size = q_data->missing_data ();
01866 
01867   if (q_data->msg_block ()->space() < recv_size)
01868     {
01869       // make sure the message_block has enough space
01870       size_t const message_size = recv_size + q_data->msg_block ()->length();
01871 
01872       if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01873         {
01874           return -1;
01875         }
01876     }
01877 
01878   // Saving the size of the received buffer in case any one needs to
01879   // get the size of the message thats received in the
01880   // context. Obviously the value will be changed for each recv call
01881   // and the user is supposed to invoke the accessor only in the
01882   // invocation context to get meaningful information.
01883   this->recv_buffer_size_ = recv_size;
01884 
01885   // Read the message into the existing message block on heap
01886   ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01887                                 recv_size,
01888                                 max_wait_time);
01889 
01890   if (n <= 0)
01891     {
01892       return n;
01893     }
01894 
01895   if (TAO_debug_level > 3)
01896     {
01897       ACE_DEBUG ((LM_DEBUG,
01898          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01899          ACE_TEXT ("read bytes %d\n"),
01900          this->id (), n));
01901     }
01902 
01903   q_data->msg_block ()->wr_ptr(n);
01904   q_data->missing_data (q_data->missing_data () - n);
01905 
01906   if (q_data->missing_data () == 0)
01907     {
01908       // paranoid check
01909       if (this->incoming_message_stack_.pop (q_data) == -1)
01910         {
01911           return -1;
01912         }
01913 
01914       if (this->consolidate_process_message (q_data, rh) == -1)
01915         {
01916           return -1;
01917         }
01918     }
01919 
01920   return 0;
01921 }

int TAO_Transport::handle_input_parse_data ( TAO_Resume_Handle rh,
ACE_Time_Value max_wait_time 
) [private]

Is invoked by handle_input operation. It parses new messages from input stream or consolidates messages whose header has been partially read, the message size being unknown so far. It parses as much data as a single recv-invocation provides.

Definition at line 1968 of file Transport.cpp.

01970 {
01971   if (TAO_debug_level > 3)
01972     {
01973       ACE_DEBUG ((LM_DEBUG,
01974          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01975          ACE_TEXT ("enter\n"),
01976          this->id ()));
01977     }
01978 
01979   // The buffer on the stack which will be used to hold the input
01980   // messages, ACE_CDR::MAX_ALIGNMENT compensates the
01981   // memory-alignment. This improves performance with SUN-Java-ORB-1.4
01982   // and higher that sends fragmented requests of size 1024 bytes.
01983   char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01984 
01985 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01986   (void) ACE_OS::memset (buf,
01987                          '\0',
01988                          sizeof buf);
01989 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
01990 
01991   // Create a data block
01992   ACE_Data_Block db (sizeof (buf),
01993                      ACE_Message_Block::MB_DATA,
01994                      buf,
01995                      this->orb_core_->input_cdr_buffer_allocator (),
01996                      this->orb_core_->locking_strategy (),
01997                      ACE_Message_Block::DONT_DELETE,
01998                      this->orb_core_->input_cdr_dblock_allocator ());
01999 
02000   // Create a message block
02001   ACE_Message_Block message_block (&db,
02002                                    ACE_Message_Block::DONT_DELETE,
02003                                    this->orb_core_->input_cdr_msgblock_allocator ());
02004 
02005   // Align the message block
02006   ACE_CDR::mb_align (&message_block);
02007 
02008   size_t recv_size = 0; // Note: unsigned integer
02009 
02010   // Pointer to newly parsed message
02011   TAO_Queued_Data *q_data = 0;
02012 
02013   // Optimizing access of constants
02014   size_t const header_length = this->messaging_object ()->header_length ();
02015 
02016   // Paranoid check
02017   if (header_length > message_block.space ())
02018     {
02019       return -1;
02020     }
02021 
02022   if (this->orb_core_->orb_params ()->single_read_optimization ())
02023     {
02024       recv_size = message_block.space ();
02025     }
02026   else
02027     {
02028       // Single read optimization has been de-activated. That means
02029       // that we need to read from transport the GIOP header first
02030       // before the payload. This codes first checks the incoming
02031       // stack for partial messages which needs to be
02032       // consolidated. Otherwise we are in new cycle, reading complete
02033       // GIOP header of new incoming message.
02034       if (this->incoming_message_stack_.top (q_data) != -1
02035            && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02036         {
02037           // There is a partial message on incoming_message_stack_
02038           // whose length is unknown so far. We need to consolidate
02039           // the GIOP header to get to know the payload size,
02040           recv_size = header_length - q_data->msg_block ()->length ();
02041         }
02042       else
02043         {
02044           // Read amount of data forming GIOP header of new incoming
02045           // message.
02046           recv_size = header_length;
02047         }
02048       // POST: 0 <= recv_size <= header_length
02049     }
02050   // POST: 0 <= recv_size <= message_block->space ()
02051 
02052   // If we have a partial message, copy it into our message block and
02053   // clear out the partial message.
02054   if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
02055     {
02056       // (*) Copy back the partial message into current read-buffer,
02057       // verify that the read-strategy of "recv_size" bytes is not
02058       // exceeded. The latter check guarantees that recv_size does not
02059       // roll-over and keeps in range
02060       // 0<=recv_size<=message_block->space()
02061       if (this->partial_message_->length () <= recv_size &&
02062           message_block.copy (this->partial_message_->rd_ptr (),
02063                               this->partial_message_->length ()) == 0)
02064         {
02065 
02066           recv_size -= this->partial_message_->length ();
02067           this->partial_message_->reset ();
02068         }
02069       else
02070         {
02071           return -1;
02072         }
02073     }
02074   // POST: 0 <= recv_size <= buffer_space
02075 
02076   if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
02077     {
02078       // This event would cause endless looping, trying frequently to
02079       // read zero bytes from stream.  This might happen, if TAOs
02080       // protocol implementation is not correct and tries to read data
02081       // beyond header without "single_read_optimazation" being
02082       // activated.
02083       if (TAO_debug_level > 0)
02084         {
02085           ACE_ERROR ((LM_ERROR,
02086              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02087              ACE_TEXT ("Error - endless loop detection, closing connection"),
02088              this->id ()));
02089         }
02090       return -1;
02091     }
02092 
02093   // Saving the size of the received buffer in case any one needs to
02094   // get the size of the message thats received in the
02095   // context. Obviously the value will be changed for each recv call
02096   // and the user is supposed to invoke the accessor only in the
02097   // invocation context to get meaningful information.
02098   this->recv_buffer_size_ = recv_size;
02099 
02100   // Read the message into the message block that we have created on
02101   // the stack.
02102   ssize_t const n = this->recv (message_block.wr_ptr (),
02103                                 recv_size,
02104                                 max_wait_time);
02105 
02106   // If there is an error return to the reactor..
02107   if (n <= 0)
02108     {
02109       return n;
02110     }
02111 
02112   if (TAO_debug_level > 3)
02113     {
02114       ACE_DEBUG ((LM_DEBUG,
02115          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02116          ACE_TEXT ("read %d bytes\n"),
02117          this->id (), n));
02118     }
02119 
02120   // Set the write pointer in the stack buffer
02121   message_block.wr_ptr (n);
02122 
02123   //
02124   // STACK PROCESSING OR MESSAGE CONSOLIDATION
02125   //
02126 
02127   // PRE: data in buffer is aligned && message_block.length() > 0
02128 
02129   if (this->incoming_message_stack_.top (q_data) != -1
02130       && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02131     {
02132       //
02133       // MESSAGE CONSOLIDATION
02134       //
02135 
02136       // Partial message on incoming_message_stack_ needs to be
02137       // consolidated.  The message header could not be parsed so far
02138       // and therefor the message size is unknown yet. Consolidating
02139       // the message destroys the memory alignment of succeeding
02140       // messages sharing the buffer, for that reason consolidation
02141       // and stack based processing are mutial exclusive.
02142       if (this->messaging_object ()->consolidate_node (q_data,
02143                                                        message_block) == -1)
02144         {
02145            if (TAO_debug_level > 0)
02146             {
02147                 ACE_ERROR ((LM_ERROR,
02148                    ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02149                    ACE_TEXT ("error consolidating message from input buffer\n"),
02150                    this->id () ));
02151              }
02152            return -1;
02153         }
02154 
02155       // Complete message are to be enqueued and later processed
02156       if (q_data->missing_data () == 0)
02157         {
02158           if (this->incoming_message_stack_.pop (q_data) == -1)
02159             {
02160               return -1;
02161             }
02162 
02163           if (this->consolidate_enqueue_message (q_data) == -1)
02164             {
02165               return -1;
02166             }
02167         }
02168 
02169       if (message_block.length () > 0
02170           && this->handle_input_parse_extra_messages (message_block) == -1)
02171         {
02172           return -1;
02173         }
02174 
02175       // In any case try to process the enqueued messages
02176       if (this->process_queue_head (rh) == -1)
02177         {
02178           return -1;
02179         }
02180     }
02181   else
02182     {
02183       //
02184       // STACK PROCESSING (critical path)
02185       //
02186 
02187       // Process the first message in buffer on stack
02188 
02189       // (PRE: first message resides in aligned memory) Make a node of
02190       // the message-block..
02191 
02192       TAO_Queued_Data qd (&message_block,
02193                           this->orb_core_->transport_message_buffer_allocator ());
02194 
02195       size_t mesg_length  = 0;
02196 
02197       if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1
02198           || (qd.missing_data () == 0
02199               && mesg_length > message_block.length ()) )
02200         {
02201           // extracting message failed
02202           return -1;
02203         }
02204       // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
02205       // This prevents seeking rd_ptr behind the wr_ptr
02206 
02207       if (qd.missing_data () != 0 ||
02208           qd.more_fragments () ||
02209           qd.msg_type () == GIOP::Fragment)
02210         {
02211           if (qd.missing_data () == 0)
02212             {
02213               // Dealing with a fragment
02214               TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02215 
02216               if (nqd == 0)
02217                 {
02218                   return -1;
02219                 }
02220 
02221               // mark the end of message in new buffer
02222               char* end_mark = nqd->msg_block ()->rd_ptr ()
02223                              + mesg_length;
02224               nqd->msg_block ()->wr_ptr (end_mark);
02225 
02226               // move the read pointer forward in old buffer
02227               message_block.rd_ptr (mesg_length);
02228 
02229               // enqueue the message
02230               if (this->consolidate_enqueue_message (nqd) == -1)
02231                 {
02232                   return -1;
02233                 }
02234 
02235               if (message_block.length () > 0
02236                   && this->handle_input_parse_extra_messages (message_block) == -1)
02237                 {
02238                   return -1;
02239                 }
02240 
02241               // In any case try to process the enqueued messages
02242               if (this->process_queue_head (rh) == -1)
02243                 {
02244                   return -1;
02245                 }
02246             }
02247           else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02248             {
02249               // Incomplete message, must be the last one in buffer
02250 
02251               if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02252                   qd.missing_data () > message_block.space ())
02253                 {
02254                   // Re-Allocate correct size on heap
02255                   if (ACE_CDR::grow (qd.msg_block (),
02256                                      message_block.length ()
02257                                      + qd.missing_data ()) == -1)
02258                     {
02259                       return -1;
02260                     }
02261                 }
02262 
02263               TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02264 
02265               if (nqd == 0)
02266                 {
02267                   return -1;
02268                 }
02269 
02270               // move read-pointer to end of buffer
02271               message_block.rd_ptr (message_block.length());
02272 
02273               this->incoming_message_stack_.push (nqd);
02274             }
02275         }
02276       else
02277         {
02278           //
02279           // critical path
02280           //
02281 
02282           // We cant process the message on stack right now. First we
02283           // have got to parse extra messages from message_block,
02284           // putting them into queue.  When this is done we can return
02285           // to process this message, and notifying other threads to
02286           // process the messages in queue.
02287           char * end_marker = message_block.rd_ptr ()
02288                             + mesg_length;
02289 
02290           if (message_block.length () > mesg_length)
02291             {
02292               // There are more message in data stream to be parsed.
02293               // Safe the rd_ptr to restore later.
02294               char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02295 
02296               // Skip parsed message, jump to next message in buffer
02297               // PRE: mesg_length <= message_block.length ()
02298               message_block.rd_ptr (mesg_length);
02299 
02300               // Extract remaining messages and enqueue them for later
02301               // heap processing
02302               if (this->handle_input_parse_extra_messages (message_block) == -1)
02303                 {
02304                   return -1;
02305                 }
02306 
02307               // correct the wr_ptr using the end_marker to point to the
02308               // end of the first message else the code after this will
02309               // see the full stream with all the messages
02310               message_block.wr_ptr (end_marker);
02311 
02312               // Restore rd_ptr
02313               message_block.rd_ptr (rd_ptr_stack_mesg);
02314             }
02315 
02316           // The following if-else has been copied from
02317           // process_queue_head().  While process_queue_head()
02318           // processes message on heap, here we will process a message
02319           // on stack.
02320 
02321           // Now that we have one message on stack to be processed,
02322           // check whether we have one more message in the queue...
02323           if (this->incoming_message_queue_.queue_length () > 0)
02324             {
02325               if (TAO_debug_level > 0)
02326                 {
02327                   ACE_DEBUG ((LM_DEBUG,
02328                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02329                      ACE_TEXT ("notify reactor\n"),
02330                      this->id ()));
02331                 }
02332 
02333               int const retval = this->notify_reactor ();
02334 
02335               if (retval == 1)
02336                 {
02337                   // Let the class know that it doesn't need to resume  the
02338                   // handle..
02339                   rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02340                 }
02341               else if (retval < 0)
02342                 return -1;
02343             }
02344           else
02345             {
02346               // As there are no further messages in queue just resume
02347               // the handle. Set the flag incase someone had reset the flag..
02348               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02349             }
02350 
02351           // PRE: incoming_message_queue is empty
02352           if (this->process_parsed_messages (&qd, rh) == -1)
02353             {
02354               return -1;
02355             }
02356           // move the rd_ptr tp position of end_marker
02357           message_block.rd_ptr (end_marker);
02358         }
02359     }
02360 
02361   // Now that all cases have been processed, there might be kept some data
02362   // in buffer that needs to be safed for next "handle_input" invocations.
02363    if (message_block.length () > 0)
02364      {
02365        if (this->partial_message_ == 0)
02366          {
02367            this->allocate_partial_message_block ();
02368          }
02369 
02370        if (this->partial_message_ != 0 &&
02371            this->partial_message_->copy (message_block.rd_ptr (),
02372                                          message_block.length ()) == 0)
02373          {
02374            message_block.rd_ptr (message_block.length ());
02375          }
02376        else
02377          {
02378            return -1;
02379          }
02380      }
02381 
02382    return 0;
02383 }

int TAO_Transport::handle_input_parse_extra_messages ( ACE_Message_Block message_block  )  [private]

Is invoked by handle_input_parse_data. Parses all messages remaining in message_block.

Definition at line 1925 of file Transport.cpp.

01927 {
01928   // store buffer status of last extraction: -1 parse error, 0
01929   // incomplete message header in buffer, 1 complete messages header
01930   // parsed
01931   int buf_status = 0;
01932 
01933   TAO_Queued_Data *q_data = 0;     // init
01934 
01935   // parse buffer until all messages have been extracted, consolidate
01936   // and enqueue complete messages, if the last message being parsed
01937   // has missin data, it is stays on top of incoming_message_stack.
01938   while (message_block.length () > 0 &&
01939          (buf_status = this->messaging_object ()->extract_next_message
01940           (message_block, q_data)) != -1 &&
01941          q_data != 0) // paranoid check
01942     {
01943       if (q_data->missing_data () == 0)
01944         {
01945           if (this->consolidate_enqueue_message (q_data) == -1)
01946             {
01947               return -1;
01948             }
01949         }
01950       else  // incomplete message read, probably the last message in buffer
01951         {
01952           // can not fail
01953           this->incoming_message_stack_.push (q_data);
01954         }
01955 
01956       q_data = 0; // reset
01957     } // while
01958 
01959   if (buf_status == -1)
01960     {
01961       return -1;
01962     }
01963 
01964   return 0;
01965 }

TAO_Transport::Drain_Result TAO_Transport::handle_output ( TAO::Transport::Drain_Constraints const &  c  ) 

Callback method to reactively drain the outgoing data queue.

Definition at line 525 of file Transport.cpp.

00526 {
00527   if (TAO_debug_level > 3)
00528     {
00529       ACE_DEBUG ((LM_DEBUG,
00530                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output")
00531                   ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"),
00532                   this->id (),
00533               dc.block_on_io(),
00534               dc.timeout() ? dc.timeout()->sec() : -1,
00535               dc.timeout() ? dc.timeout()->usec() : -1 ));
00536     }
00537 
00538   // The flushing strategy (potentially via the Reactor) wants to send
00539   // more data, first check if there is a current message that needs
00540   // more sending...
00541   Drain_Result const retval = this->drain_queue (dc);
00542 
00543   if (TAO_debug_level > 3)
00544     {
00545       ACE_DEBUG ((LM_DEBUG,
00546                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00547                   ACE_TEXT ("drain_queue returns %d/%d\n"),
00548                   this->id (),
00549                   static_cast<int> (retval.dre_), ACE_ERRNO_GET));
00550     }
00551 
00552   // Any errors are returned directly to the Reactor
00553   return retval;
00554 }

int TAO_Transport::handle_timeout ( const ACE_Time_Value current_time,
const void *  act 
)

The timeout callback, invoked when any of the timers related to this transport expire.

Parameters:
current_time The current time as reported from the Reactor
act The Asynchronous Completion Token. Currently it is interpreted as follows:

  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error
Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

This is the only legal ACT in the current configuration....

Definition at line 874 of file Transport.cpp.

00876 {
00877   if (TAO_debug_level > 6)
00878     {
00879       ACE_DEBUG ((LM_DEBUG,
00880          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ")
00881          ACE_TEXT ("timer expired\n"),
00882          this->id ()));
00883     }
00884 
00885   /// This is the only legal ACT in the current configuration....
00886   if (act != &this->current_deadline_)
00887     {
00888       return -1;
00889     }
00890 
00891   if (this->flush_timer_pending ())
00892     {
00893       // The timer is always a oneshot timer, so mark is as not
00894       // pending.
00895       this->reset_flush_timer ();
00896 
00897       TAO_Flushing_Strategy *flushing_strategy =
00898         this->orb_core ()->flushing_strategy ();
00899       int const result = flushing_strategy->schedule_output (this);
00900       if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00901         {
00902           typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00903           TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00904           ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00905           if (flushing_strategy->flush_transport (this, 0) == -1) {
00906             return -1;
00907           }
00908         }
00909     }
00910 
00911   return 0;
00912 }

void TAO_Transport::id ( size_t  id  ) 

Definition at line 100 of file Transport.inl.

00101 {
00102   this->id_ = id;
00103 }

size_t TAO_Transport::id ( void   )  const

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 94 of file Transport.inl.

00095 {
00096   return this->id_;
00097 }

bool TAO_Transport::idle_after_reply ( void   ) 

Request is sent and the reply is received. Idle the transport now.

Definition at line 273 of file Transport.cpp.

00274 {
00275   return this->tms ()->idle_after_reply ();
00276 }

bool TAO_Transport::idle_after_send ( void   ) 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 267 of file Transport.cpp.

00268 {
00269   return this->tms ()->idle_after_send ();
00270 }

ACE_Time_Value const * TAO_Transport::io_timeout ( TAO::Transport::Drain_Constraints const &  dc  )  const [protected]

Re-factor computation of I/O timeouts based on operation timeouts. Depending on the wait strategy, we need to timeout I/O operations or not. For example, if we are using a non-blocking strategy, we want to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK settings on the underlying sockets. However, for blocking strategies we want to pass the operation timeouts, to respect the application level policies.

This function was introduced as part of the fixes for bug 3647.

Definition at line 2791 of file Transport.cpp.

02793 {
02794   if (dc.block_on_io())
02795   {
02796     return dc.timeout();
02797   }
02798   if (this->wait_strategy()->can_process_upcalls())
02799   {
02800     return 0;
02801   }
02802   return dc.timeout();
02803 }

bool TAO_Transport::is_connected ( void   )  const

Is this transport really connected.

Definition at line 184 of file Transport.inl.

00185 {
00186   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
00187   return this->is_connected_;
00188 }

CORBA::Boolean TAO_Transport::is_tcs_set ( void   )  const

Return true if the tcs has been set.

CodeSet negotiation.

Definition at line 166 of file Transport.inl.

00167 {
00168   return tcs_set_;
00169 }

int TAO_Transport::make_idle ( void   ) 

Cache management.

Definition at line 501 of file Transport.cpp.

00502 {
00503   if (TAO_debug_level > 3)
00504     {
00505       ACE_DEBUG ((LM_DEBUG,
00506                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00507                   this->id ()));
00508     }
00509 
00510   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00511 }

void TAO_Transport::messaging_init ( TAO_GIOP_Message_Version const &  version  ) 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Definition at line 2670 of file Transport.cpp.

02671 {
02672   this->messaging_object ()->init (version.major, version.minor);
02673 }

TAO_GIOP_Message_Base * TAO_Transport::messaging_object ( void   ) 

Return the messaging object that is used to format the data that needs to be sent.

Definition at line 129 of file Transport.inl.

00130 {
00131   return this->messaging_object_;
00132 }

int TAO_Transport::notify_reactor ( void   )  [private]

These classes need privileged access to:

Definition at line 2573 of file Transport.cpp.

02574 {
02575   if (!this->ws_->is_registered ())
02576     {
02577       return 0;
02578     }
02579 
02580   ACE_Event_Handler *eh = this->event_handler_i ();
02581 
02582   // Get the reactor associated with the event handler
02583   ACE_Reactor *reactor = this->orb_core ()->reactor ();
02584 
02585   if (TAO_debug_level > 0)
02586     {
02587       ACE_DEBUG ((LM_DEBUG,
02588          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02589          ACE_TEXT ("notify to Reactor\n"),
02590          this->id ()));
02591     }
02592 
02593   // Send a notification to the reactor...
02594   int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02595 
02596   if (retval < 0 && TAO_debug_level > 2)
02597     {
02598       // @todo: need to think about what is the action that
02599       // we can take when we get here.
02600       ACE_ERROR ((LM_ERROR,
02601          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02602          ACE_TEXT ("notify to the reactor failed..\n"),
02603          this->id ()));
02604     }
02605 
02606   return 1;
02607 }

void TAO_Transport::opened_as ( TAO::Connection_Role  role  ) 

Definition at line 57 of file Transport.inl.

00058 {
00059   this->opening_connection_role_ = role;
00060 }

TAO::Connection_Role TAO_Transport::opened_as ( void   )  const

Methods dealing with the role of the connection, e.g., CLIENT or SERVER. See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.

Definition at line 51 of file Transport.inl.

00052 {
00053   return this->opening_connection_role_;
00054 }

TAO_ORB_Core * TAO_Transport::orb_core ( void   )  const

Access the ORB that owns this connection.

Definition at line 20 of file Transport.inl.

00021 {
00022   return this->orb_core_;
00023 }

TAO_OutputCDR & TAO_Transport::out_stream ( void   ) 

Accessor for the output CDR stream.

Definition at line 2658 of file Transport.cpp.

02659 {
02660   return this->messaging_object ()->out_stream ();
02661 }

TAO_SYNCH_MUTEX & TAO_Transport::output_cdr_lock ( void   ) 

Accessor for synchronizing Transport OutputCDR access.

Definition at line 2664 of file Transport.cpp.

02665 {
02666   return this->output_cdr_mutex_;
02667 }

bool TAO_Transport::post_connect_hook ( void   )  [virtual]

Hooks that can be overridden in concrete transports.

These hooks are invoked just after connection establishment (or after a connection is fetched from cache). The return value signifies whether the invoker should proceed with post connection establishment activities. Protocols like SSLIOP need this to verify whether connections already established have valid certificates. There are no pre_connect_hooks () since the transport doesn't exist before a connection establishment. :-)

Note:
The methods are not made const with a reason.

Definition at line 325 of file Transport.cpp.

00326 {
00327   return true;
00328 }

bool TAO_Transport::post_open ( size_t  id  ) 

Perform all the actions when this transport get opened.

Definition at line 2694 of file Transport.cpp.

02695 {
02696   if (TAO_debug_level > 9)
02697     ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ")
02698                 ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id));
02699   this->id_ = id;
02700 
02701   // When we have data in our outgoing queue schedule ourselves
02702   // for output
02703   if (!this->queue_is_empty_i ())
02704     {
02705       // If the wait strategy wants us to be registered with the reactor
02706       // then we do so. If registeration is required and it succeeds,
02707       // #REFCOUNT# becomes two.
02708       if (this->wait_strategy ()->register_handler () == 0)
02709         {
02710           if (this->flush_in_post_open_)
02711             {
02712               TAO_Flushing_Strategy *flushing_strategy =
02713                 this->orb_core ()->flushing_strategy ();
02714 
02715               if (flushing_strategy == 0)
02716                 throw CORBA::INTERNAL ();
02717 
02718               this->flush_in_post_open_ = false;
02719               (void) flushing_strategy->schedule_output (this);
02720             }
02721         }
02722       else
02723         {
02724           // Registration failures.
02725 
02726           // Purge from the connection cache, if we are not in the cache, this
02727           // just does nothing.
02728           (void) this->purge_entry ();
02729 
02730           // Close the handler.
02731           (void) this->close_connection ();
02732 
02733           if (TAO_debug_level > 0)
02734             {
02735               ACE_ERROR ((LM_ERROR,
02736                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ")
02737                      ACE_TEXT ("could not register the transport ")
02738                      ACE_TEXT ("in the reactor.\n"),
02739                      this->id ()));
02740             }
02741 
02742           return false;
02743         }
02744     }
02745 
02746   {
02747     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
02748     this->is_connected_ = true;
02749   }
02750 
02751   if (TAO_debug_level > 9 && !this->cache_map_entry_)
02752     {
02753       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open")
02754                             ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_));
02755     }
02756 
02757   this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02758                                                    true);
02759 
02760   // update transport cache to make this entry available
02761   this->transport_cache_manager ().set_entry_state (
02762     this->cache_map_entry_,
02763     TAO::ENTRY_IDLE_AND_PURGABLE);
02764 
02765   return true;
02766 }

void TAO_Transport::pre_close ( void   ) 

do what needs to be done when closing the transport

Definition at line 2676 of file Transport.cpp.

02677 {
02678   // @TODO: something needs to be done with is_connected_. Checking it is
02679   // guarded by a mutex, but setting it is not. Until the need for mutexed
02680   // protection is required, the transport cache is holding its own copy
02681   // of the is_connected_ flag, so that during cache lookups the cache
02682   // manager doesn't need to be burdened by the lock in is_connected().
02683   this->is_connected_ = false;
02684   this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02685                                                    false);
02686   this->purge_entry ();
02687   {
02688     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02689     this->cleanup_queue_i ();
02690   }
02691 }

int TAO_Transport::process_parsed_messages ( TAO_Queued_Data qd,
TAO_Resume_Handle rh 
) [protected]

Process the message by sending it to the higher layers of the ORB.

Definition at line 2387 of file Transport.cpp.

02389 {
02390   if (TAO_debug_level > 7)
02391     {
02392       ACE_DEBUG ((LM_DEBUG,
02393          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02394          ACE_TEXT ("entering (missing data == %d)\n"),
02395          this->id(), qd->missing_data ()));
02396     }
02397 
02398 #if TAO_HAS_TRANSPORT_CURRENT == 1
02399   // Update stats, if any
02400   if (this->stats_ != 0)
02401     this->stats_->messages_received (qd->msg_block ()->length ());
02402 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
02403 
02404   switch (qd->msg_type ())
02405   {
02406     case GIOP::CloseConnection:
02407     {
02408       if (TAO_debug_level > 0)
02409         {
02410           ACE_DEBUG ((LM_DEBUG,
02411              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02412              ACE_TEXT ("received CloseConnection message - %m\n"),
02413              this->id()));
02414         }
02415 
02416       // Return a "-1" so that the next stage can take care of
02417       // closing connection and the necessary memory management.
02418       return -1;
02419     }
02420     break;
02421     case GIOP::Request:
02422     case GIOP::LocateRequest:
02423     {
02424       // Let us resume the handle before we go ahead to process the
02425       // request. This will open up the handle for other threads.
02426       rh.resume_handle ();
02427 
02428       if (this->messaging_object ()->process_request_message (this, qd) == -1)
02429         {
02430           // Return a "-1" so that the next stage can take care of
02431           // closing connection and the necessary memory management.
02432           return -1;
02433         }
02434     }
02435     break;
02436     case GIOP::Reply:
02437     case GIOP::LocateReply:
02438     {
02439       rh.resume_handle ();
02440 
02441       TAO_Pluggable_Reply_Params params (this);
02442 
02443       if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02444         {
02445           if (TAO_debug_level > 0)
02446             {
02447               ACE_ERROR ((LM_ERROR,
02448                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02449                  ACE_TEXT ("error in process_reply_message - %m\n"),
02450                  this->id ()));
02451             }
02452 
02453           return -1;
02454         }
02455 
02456     }
02457     break;
02458     case GIOP::CancelRequest:
02459     {
02460       // The associated request might be incomplete residing
02461       // fragmented in messaging object. We must make sure the
02462       // resources allocated by fragments are released.
02463       if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02464         {
02465           if (TAO_debug_level > 0)
02466             {
02467               ACE_ERROR ((LM_ERROR,
02468                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02469                  ACE_TEXT ("error processing CancelRequest\n"),
02470                  this->id ()));
02471             }
02472         }
02473 
02474       // We are not able to cancel requests being processed already;
02475       // this is declared as optional feature by CORBA, and TAO does
02476       // not support this currently.
02477 
02478       // Just continue processing, CancelRequest does not mean to cut
02479       // off the connection.
02480     }
02481     break;
02482     case GIOP::MessageError:
02483     {
02484       if (TAO_debug_level > 0)
02485         {
02486           ACE_ERROR ((LM_ERROR,
02487              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02488              ACE_TEXT ("received MessageError, closing connection\n"),
02489              this->id ()));
02490         }
02491       return -1;
02492     }
02493     break;
02494     case GIOP::Fragment:
02495     {
02496       // Nothing to be done.
02497     }
02498     break;
02499   }
02500 
02501   // If not, just return back..
02502   return 0;
02503 }

int TAO_Transport::process_queue_head ( TAO_Resume_Handle rh  )  [private]

These classes need privileged access to:

Definition at line 2506 of file Transport.cpp.

02507 {
02508   if (TAO_debug_level > 3)
02509     {
02510       ACE_DEBUG ((LM_DEBUG,
02511          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02512          this->id (), this->incoming_message_queue_.queue_length () ));
02513     }
02514 
02515   // See if  message in queue ...
02516   if (this->incoming_message_queue_.queue_length () > 0)
02517     {
02518       // Get the message on the head of the queue..
02519       TAO_Queued_Data *qd =
02520         this->incoming_message_queue_.dequeue_head ();
02521 
02522       if (TAO_debug_level > 3)
02523         {
02524           ACE_DEBUG ((LM_DEBUG,
02525              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02526              ACE_TEXT ("the size of the queue is [%d]\n"),
02527              this->id (),
02528              this->incoming_message_queue_.queue_length()));
02529         }
02530       // Now that we have pulled out out one message out of the queue,
02531       // check whether we have one more message in the queue...
02532       if (this->incoming_message_queue_.queue_length () > 0)
02533         {
02534           if (TAO_debug_level > 0)
02535             {
02536               ACE_DEBUG ((LM_DEBUG,
02537                  ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02538                  ACE_TEXT ("notify reactor\n"),
02539                  this->id ()));
02540             }
02541 
02542           int const retval = this->notify_reactor ();
02543 
02544           if (retval == 1)
02545             {
02546               // Let the class know that it doesn't need to resume  the
02547               // handle..
02548               rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02549             }
02550           else if (retval < 0)
02551             return -1;
02552         }
02553       else
02554         {
02555           // As we are ready to process the last message just resume
02556           // the handle. Set the flag incase someone had reset the flag..
02557           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02558         }
02559 
02560       // Process the message...
02561       int const retval = this->process_parsed_messages (qd, rh);
02562 
02563       // Delete the Queued_Data..
02564       TAO_Queued_Data::release (qd);
02565 
02566       return retval;
02567     }
02568 
02569   return 1;
02570 }

bool TAO_Transport::provide_blockable_handler ( TAO::Connection_Handler_Set handlers  ) 

Add event handlers corresponding to transports that have RW wait strategy to the handlers set. Called by the cache when the ORB is shuting down.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler if the transport has RW strategy on.
Returns:
true indicates a handler was added to the handler set. false indocates that the transport did not have a blockable handler that could be added.

Definition at line 253 of file Transport.cpp.

00254 {
00255   if (this->ws_->non_blocking () ||
00256       this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00257     return false;
00258 
00259   (void) this->add_reference ();
00260 
00261   h.insert (this->connection_handler_i ());
00262 
00263   return true;
00264 }

void TAO_Transport::provide_handler ( TAO::Connection_Handler_Set handlers  ) 

Added event handler to the handlers set.

Called by the cache when the cache is closing.

Parameters:
handlers The TAO_Connection_Handler_Set into which the transport should place its handler

Definition at line 245 of file Transport.cpp.

00246 {
00247   (void) this->add_reference ();
00248 
00249   handlers.insert (this->connection_handler_i ());
00250 }

int TAO_Transport::purge_entry ( void   ) 

Cache management.

Definition at line 479 of file Transport.cpp.

00480 {
00481   // We must store our entry in a temporary and zero out the data member.
00482   // If there is only one reference count on us, we will end up causing
00483   // our own destruction.  And we can not be holding a cache map entry if
00484   // that happens.
00485   TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* entry = 0;
00486   {
00487     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00488     entry = this->cache_map_entry_;
00489     this->cache_map_entry_ = 0;
00490   }
00491   return this->transport_cache_manager ().purge_entry (entry);
00492 }

void TAO_Transport::purging_order ( unsigned long  value  ) 

Definition at line 84 of file Transport.inl.

00085 {
00086   // This should only be called by the Transport Cache Manager when
00087   // it is holding it's lock.
00088   // The transport should still be here since the cache manager still
00089   // has a reference to it.
00090   this->purging_order_ = value;
00091 }

unsigned long TAO_Transport::purging_order ( void   )  const

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 78 of file Transport.inl.

00079 {
00080   return this->purging_order_;
00081 }

bool TAO_Transport::queue_is_empty ( void   ) 

Check if there are messages pending in the queue.

Returns:
true if the queue is empty

Definition at line 106 of file Transport.inl.

00107 {
00108   ACE_GUARD_RETURN (ACE_Lock,
00109                     ace_mon,
00110                     *this->handler_lock_,
00111                     false);
00112   return this->queue_is_empty_i ();
00113 }

bool TAO_Transport::queue_is_empty_i ( void   )  const [private]

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
true if the queue is empty

Definition at line 8 of file Transport.inl.

00009 {
00010   return (this->head_ == 0);
00011 }

int TAO_Transport::queue_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time,
bool  back = true 
) [protected]

Queue a message for message_block

Parameters:
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.
back If true, the message will be pushed to the back of the queue. If false, the message will be pushed to the front of the queue.

Definition at line 1601 of file Transport.cpp.

01603 {
01604   TAO_Queued_Message *queued_message = 0;
01605   ACE_NEW_RETURN (queued_message,
01606                   TAO_Asynch_Queued_Message (message_block,
01607                                              this->orb_core_,
01608                                              max_wait_time,
01609                                              0,
01610                                              true),
01611                   -1);
01612   if (back) {
01613     queued_message->push_back (this->head_, this->tail_);
01614   }
01615   else {
01616     queued_message->push_front (this->head_, this->tail_);
01617   }
01618 
01619   return 0;
01620 }

int TAO_Transport::recache_transport ( TAO_Transport_Descriptor_Interface desc  ) 

Recache ourselves in the cache.

Todo:

Ideally the following should be inline.

purge_entry has a return value, use it

Definition at line 469 of file Transport.cpp.

00470 {
00471   // First purge our entry
00472   this->purge_entry ();
00473 
00474   // Then add ourselves to the cache
00475   return this->transport_cache_manager ().cache_transport (desc, this);
00476 }

virtual ssize_t TAO_Transport::recv ( char *  buffer,
size_t  len,
const ACE_Time_Value timeout = 0 
) [pure virtual]

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.
size_t TAO_Transport::recv_buffer_size ( void   )  const

Accessor to recv_buffer_size_.

Definition at line 197 of file Transport.inl.

00198 {
00199   return this->recv_buffer_size_;
00200 }

int TAO_Transport::register_handler ( void   )  [virtual]

Register the handler with the reactor.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Definition at line 365 of file Transport.cpp.

00366 {
00367   if (TAO_debug_level > 4)
00368     {
00369       ACE_DEBUG ((LM_DEBUG,
00370                   ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00371                   this->id ()));
00372     }
00373 
00374   ACE_Reactor * const r = this->orb_core_->reactor ();
00375 
00376   // @@note: This should be okay since the register handler call will
00377   // not make a nested call into the transport.
00378   ACE_GUARD_RETURN (ACE_Lock,
00379                     ace_mon,
00380                     *this->handler_lock_,
00381                     false);
00382 
00383   if (r == this->event_handler_i ()->reactor ())
00384     {
00385       return 0;
00386     }
00387 
00388   // Set the flag in the Connection Handler and in the Wait Strategy
00389   // @@Maybe we should set these flags after registering with the
00390   // reactor. What if the  registration fails???
00391   this->ws_->is_registered (true);
00392 
00393   // Register the handler with the reactor
00394   return r->register_handler (this->event_handler_i (),
00395                               ACE_Event_Handler::READ_MASK);
00396 }

bool TAO_Transport::register_if_necessary ( void   ) 

Register with the reactor via the wait strategy.

Definition at line 331 of file Transport.cpp.

00332 {
00333   if (this->is_connected_ &&
00334       this->wait_strategy ()->register_handler () == -1)
00335     {
00336       // Registration failures.
00337       if (TAO_debug_level > 0)
00338         {
00339           ACE_ERROR ((LM_ERROR,
00340                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ")
00341                       ACE_TEXT ("could not register the transport ")
00342                       ACE_TEXT ("in the reactor.\n"),
00343                       this->id ()));
00344         }
00345 
00346       // Purge from the connection cache, if we are not in the cache, this
00347       // just does nothing.
00348       (void) this->purge_entry ();
00349 
00350       // Close the handler.
00351       (void) this->close_connection ();
00352 
00353       return false;
00354     }
00355   return true;
00356 }

ACE_Event_Handler::Reference_Count TAO_Transport::remove_reference ( void   ) 

These classes need privileged access to:

Definition at line 2652 of file Transport.cpp.

02653 {
02654   return this->event_handler_i ()->remove_reference ();
02655 }

void TAO_Transport::report_invalid_event_handler ( const char *  caller  )  [private]

Print out error messages if the event handler is not valid.

Definition at line 1282 of file Transport.cpp.

01283 {
01284   if (TAO_debug_level > 0)
01285     {
01286       ACE_DEBUG ((LM_DEBUG,
01287          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01288          ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"),
01289          this->id (), caller, this->tag_));
01290     }
01291 }

void TAO_Transport::reset_flush_timer ( void   )  [private]

The flush timer expired or was explicitly cancelled, mark it as not pending

Definition at line 122 of file Transport.inl.

00123 {
00124   this->flush_timer_id_ = -1;
00125   this->current_deadline_ = ACE_Time_Value::zero;
00126 }

int TAO_Transport::schedule_output_i ( void   )  [private]

Schedule handle_output() callbacks.

Definition at line 804 of file Transport.cpp.

00805 {
00806   ACE_Event_Handler * const eh = this->event_handler_i ();
00807   ACE_Reactor * const reactor = eh->reactor ();
00808 
00809   if (reactor == 0)
00810     {
00811       if (TAO_debug_level > 1)
00812         {
00813           ACE_ERROR ((LM_ERROR,
00814                       ACE_TEXT ("TAO (%P|%t) - ")
00815                       ACE_TEXT ("Transport[%d]::schedule_output_i, ")
00816                       ACE_TEXT ("no reactor,")
00817                       ACE_TEXT ("returning -1\n"),
00818                       this->id ()));
00819         }
00820       return -1;
00821     }
00822 
00823   // Check to see if our event handler is still registered with the
00824   // reactor.  It's possible for another thread to have run close_connection()
00825   // since we last used the event handler.
00826   ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00827   if (found)
00828     {
00829       found->remove_reference ();
00830 
00831       if (found != eh)
00832         {
00833           if (TAO_debug_level > 3)
00834             {
00835               ACE_ERROR ((LM_ERROR,
00836                           ACE_TEXT ("TAO (%P|%t) - ")
00837                           ACE_TEXT ("Transport[%d]::schedule_output_i ")
00838                           ACE_TEXT ("event handler not found in reactor,")
00839                           ACE_TEXT ("returning -1\n"),
00840                           this->id ()));
00841             }
00842 
00843           return -1;
00844         }
00845     }
00846 
00847   if (TAO_debug_level > 3)
00848     {
00849       ACE_DEBUG ((LM_DEBUG,
00850          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00851          this->id ()));
00852     }
00853 
00854   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00855 }

virtual ssize_t TAO_Transport::send ( iovec *  iov,
int  iovcnt,
size_t &  bytes_transferred,
ACE_Time_Value const *  timeout 
) [pure virtual]

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov contains the data that must be sent.
timeout is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.

This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

int TAO_Transport::send_asynchronous_message_i ( TAO_Stub stub,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send an asynchronous message, i.e. do not block until the message is on the wire

Definition at line 1350 of file Transport.cpp.

01353 {
01354   // Let's figure out if the message should be queued without trying
01355   // to send first:
01356   bool try_sending_first = true;
01357 
01358   bool const queue_empty = this->queue_is_empty_i ();
01359 
01360   TAO::Transport_Queueing_Strategy *queue_strategy =
01361     stub->transport_queueing_strategy ();
01362 
01363   if (!queue_empty)
01364     {
01365       try_sending_first = false;
01366     }
01367   else if (queue_strategy)
01368     {
01369       if (queue_strategy->must_queue (queue_empty))
01370         {
01371           try_sending_first = false;
01372         }
01373     }
01374 
01375   bool partially_sent = false;
01376   bool timeout_encountered = false;
01377 
01378   TAO::Transport::Drain_Constraints dc(
01379       max_wait_time, this->using_blocking_io_for_asynch_messages());
01380 
01381   if (try_sending_first)
01382     {
01383       ssize_t n = 0;
01384       size_t byte_count = 0;
01385       // ... in this case we must try to send the message first ...
01386 
01387       size_t const total_length = message_block->total_length ();
01388 
01389       if (TAO_debug_level > 6)
01390         {
01391           ACE_DEBUG ((LM_DEBUG,
01392              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01393              ACE_TEXT ("trying to send the message (ml = %d)\n"),
01394              this->id (), total_length));
01395         }
01396 
01397       // @@ I don't think we want to hold the mutex here, however if
01398       // we release it we need to recheck the status of the transport
01399       // after we return... once I understand the final form for this
01400       // code I will re-visit this decision
01401       n = this->send_message_block_chain_i (message_block,
01402                                             byte_count,
01403                                             dc);
01404 
01405       if (n == -1)
01406         {
01407           // ... if this is just an EWOULDBLOCK we must schedule the
01408           // message for later, if it is ETIME we still have to send
01409           // the complete message, because cutting off the message at
01410           // this point will destroy the synchronization with the
01411           // server ...
01412           if (errno != EWOULDBLOCK && errno != ETIME)
01413             {
01414               if (TAO_debug_level > 0)
01415                 {
01416                   ACE_ERROR ((LM_ERROR,
01417                      ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01418                      ACE_TEXT ("fatal error in ")
01419                      ACE_TEXT ("send_message_block_chain_i - %m\n"),
01420                      this->id ()));
01421                 }
01422               return -1;
01423             }
01424         }
01425 
01426       // ... let's figure out if the complete message was sent ...
01427       if (total_length == byte_count)
01428         {
01429           // Done, just return.  Notice that there are no allocations
01430           // or copies up to this point (though some fancy calling
01431           // back and forth).
01432           // This is the common case for the critical path, it should
01433           // be fast.
01434           return 0;
01435         }
01436 
01437       if (byte_count > 0)
01438       {
01439         partially_sent = true;
01440       }
01441 
01442       // If it was partially sent, then push to front of queue and don't flush
01443       if (errno == ETIME)
01444       {
01445         timeout_encountered = true;
01446         if (byte_count == 0)
01447         {
01448           //This request has timed out and none of it was sent to the transport
01449           //We can't return -1 here, since that would end up closing the tranpsort
01450           if (TAO_debug_level > 2)
01451             {
01452               ACE_DEBUG ((LM_DEBUG,
01453                           ACE_TEXT ("TAO (%P|%t) - ")
01454                           ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01455                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
01456                           this->id ()));
01457             }
01458           throw ::CORBA::TIMEOUT (
01459             CORBA::SystemException::_tao_minor_code (
01460               TAO_TIMEOUT_SEND_MINOR_CODE,
01461               ETIME),
01462             CORBA::COMPLETED_NO);
01463         }
01464       }
01465 
01466       if (TAO_debug_level > 6)
01467         {
01468           ACE_DEBUG ((LM_DEBUG,
01469              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01470              ACE_TEXT ("partial send %d / %d bytes\n"),
01471              this->id (), byte_count, total_length));
01472         }
01473 
01474       // ... part of the data was sent, need to figure out what piece
01475       // of the message block chain must be queued ...
01476       while (message_block != 0 && message_block->length () == 0)
01477         {
01478           message_block = message_block->cont ();
01479         }
01480 
01481       // ... at least some portion of the message block chain should
01482       // remain ...
01483     }
01484 
01485   // ... either the message must be queued or we need to queue it
01486   // because it was not completely sent out ...
01487 
01488   ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01489   if (this->queue_message_i (message_block, wait_time, !partially_sent)
01490       == -1)
01491     {
01492       if (TAO_debug_level > 0)
01493         {
01494           ACE_DEBUG ((LM_DEBUG,
01495                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01496                       ACE_TEXT ("send_asynchronous_message_i, ")
01497                       ACE_TEXT ("cannot queue message for  - %m\n"),
01498                       this->id ()));
01499         }
01500       return -1;
01501     }
01502 
01503   if (TAO_debug_level > 6)
01504     {
01505       ACE_DEBUG ((LM_DEBUG,
01506          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01507          ACE_TEXT ("message is queued\n"),
01508          this->id ()));
01509     }
01510 
01511   if (timeout_encountered && partially_sent)
01512     {
01513       //Must close down the transport here since we can't guarantee the
01514       //integrity of the GIOP stream (the next send may try to write to
01515       //the socket before looking at the queue).
01516       if (TAO_debug_level > 0)
01517         {
01518           ACE_DEBUG ((LM_DEBUG,
01519                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01520                       ACE_TEXT ("send_asynchronous_message_i, ")
01521                       ACE_TEXT ("timeout after partial send, closing.\n"),
01522                       this->id ()));
01523         }
01524       return -1;
01525     }
01526   else if (!timeout_encountered)
01527     {
01528       // We can't flush if we have already encountered a timeout
01529       // ... if the queue is full we need to activate the output on the
01530       // queue ...
01531       bool must_flush = false;
01532       const bool constraints_reached =
01533         this->check_buffering_constraints_i (stub,
01534                                              must_flush);
01535 
01536       // ... but we also want to activate it if the message was partially
01537       // sent.... Plus, when we use the blocking flushing strategy the
01538       // queue is flushed as a side-effect of 'schedule_output()'
01539 
01540       TAO_Flushing_Strategy *flushing_strategy =
01541         this->orb_core ()->flushing_strategy ();
01542 
01543       if (constraints_reached || try_sending_first)
01544         {
01545           int const result = flushing_strategy->schedule_output (this);
01546           if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01547             {
01548               must_flush = true;
01549             }
01550         }
01551 
01552       if (must_flush)
01553         {
01554           if (TAO_debug_level > 0)
01555             {
01556               ACE_DEBUG ((LM_DEBUG,
01557                           ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01558                           ACE_TEXT ("send_asynchronous_message_i, ")
01559                           ACE_TEXT ("flushing transport.\n"),
01560                           this->id ()));
01561             }
01562 
01563           size_t sent_byte = sent_byte_count_;
01564           int ret = 0;
01565           {
01566             typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01567             TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01568             ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01569             ret = flushing_strategy->flush_transport (this, max_wait_time);
01570           }
01571 
01572           if (ret == -1)
01573             {
01574               if (errno == ETIME)
01575                 {
01576                   if (sent_byte == sent_byte_count_) // if nothing was actually flushed
01577                     {
01578                       //This request has timed out and none of it was sent to the transport
01579                       //We can't return -1 here, since that would end up closing the tranpsort
01580                       if (TAO_debug_level > 2)
01581                         {
01582                           ACE_DEBUG ((LM_DEBUG,
01583                                       ACE_TEXT ("TAO (%P|%t) - ")
01584                                       ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01585                                       ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01586                                       this->id ()));
01587                         }
01588                       throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01589                                               (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01590                                               CORBA::COMPLETED_NO);
01591                     }
01592                 }
01593               return -1;
01594             }
01595         }
01596     }
01597   return 0;
01598 }

void TAO_Transport::send_connection_closed_notifications ( void   ) 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1294 of file Transport.cpp.

01295 {
01296   {
01297     ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01298 
01299     this->send_connection_closed_notifications_i ();
01300   }
01301 
01302   this->tms ()->connection_closed ();
01303 }

void TAO_Transport::send_connection_closed_notifications_i ( void   )  [private]

Assume the lock is held.

Definition at line 1306 of file Transport.cpp.

01307 {
01308   this->cleanup_queue_i ();
01309 }

virtual int TAO_Transport::send_message ( TAO_OutputCDR stream,
TAO_Stub stub = 0,
TAO_Message_Semantics  message_semantics = TAO_TWOWAY_REQUEST,
ACE_Time_Value max_time_wait = 0 
) [pure virtual]

This method formats the stream and then sends the message on the transport. Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

int TAO_Transport::send_message_block_chain ( const ACE_Message_Block message_block,
size_t &  bytes_transferred,
ACE_Time_Value max_wait_time = 0 
)

This is a very specialized interface to send a simple chain of messages through the Transport. The only place we use this interface is in GIOP_Message_Base.cpp, to send error messages (i.e., an indication that we received a malformed GIOP message,) and to close the connection.

Definition at line 568 of file Transport.cpp.

00571 {
00572   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00573 
00574   TAO::Transport::Drain_Constraints dc(
00575       max_wait_time, true);
00576 
00577   return this->send_message_block_chain_i (mb,
00578                                            bytes_transferred,
00579                                            dc);
00580 }

int TAO_Transport::send_message_block_chain_i ( const ACE_Message_Block message_block,
size_t &  bytes_transferred,
TAO::Transport::Drain_Constraints const &  dc 
)

Send a message block chain, assuming the lock is held.

Definition at line 583 of file Transport.cpp.

00586 {
00587   size_t const total_length = mb->total_length ();
00588 
00589   // We are going to block, so there is no need to clone
00590   // the message block.
00591   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00592 
00593   synch_message.push_back (this->head_, this->tail_);
00594 
00595   Drain_Result const n = this->drain_queue_i (dc);
00596 
00597   if (n == DR_ERROR)
00598     {
00599       synch_message.remove_from_list (this->head_, this->tail_);
00600       return -1; // Error while sending...
00601     }
00602   else if (n == DR_QUEUE_EMPTY)
00603     {
00604       bytes_transferred = total_length;
00605       return 1;  // Empty queue, message was sent..
00606     }
00607 
00608   // Remove the temporary message from the queue...
00609   synch_message.remove_from_list (this->head_, this->tail_);
00610 
00611   bytes_transferred = total_length - synch_message.message_length ();
00612 
00613   return 0;
00614 }

int TAO_Transport::send_message_shared ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [virtual]

Sent the contents of message_block.

Parameters:
stub The object reference used for this operation, useful to obtain the current policies.
message_semantics If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time The maximum time that the operation can block, used in the implementation of timeouts.

Definition at line 295 of file Transport.cpp.

00299 {
00300   int result = 0;
00301 
00302   {
00303     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00304 
00305     result =
00306       this->send_message_shared_i (stub, message_semantics,
00307                                    message_block, max_wait_time);
00308   }
00309 
00310   if (result == -1)
00311     {
00312       // The connection needs to be closed here.
00313       // In the case of a partially written message this is the only way to cleanup
00314       //  the physical connection as well as the Transport. An EOF on the remote end
00315       //  will cancel the partially received message.
00316       this->close_connection ();
00317     }
00318 
00319   return result;
00320 }

int TAO_Transport::send_message_shared_i ( TAO_Stub stub,
TAO_Message_Semantics  message_semantics,
const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [protected]

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1312 of file Transport.cpp.

01316 {
01317   int ret = 0;
01318 
01319 #if TAO_HAS_TRANSPORT_CURRENT == 1
01320   size_t const message_length = message_block->length ();
01321 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01322 
01323   switch (message_semantics)
01324     {
01325       case TAO_TWOWAY_REQUEST:
01326         ret = this->send_synchronous_message_i (message_block, max_wait_time);
01327         break;
01328 
01329       case TAO_REPLY:
01330         ret = this->send_reply_message_i (message_block, max_wait_time);
01331         break;
01332 
01333       case TAO_ONEWAY_REQUEST:
01334         ret = this->send_asynchronous_message_i (stub,
01335                                                  message_block,
01336                                                  max_wait_time);
01337         break;
01338     }
01339 
01340 #if TAO_HAS_TRANSPORT_CURRENT == 1
01341   // "Count" the message, only if no error was encountered.
01342   if (ret != -1 && this->stats_ != 0)
01343     this->stats_->messages_sent (message_length);
01344 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
01345 
01346   return ret;
01347 }

int TAO_Transport::send_reply_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 713 of file Transport.cpp.

00715 {
00716   // Dont clone now.. We could be sent in one shot!
00717   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00718 
00719   synch_message.push_back (this->head_, this->tail_);
00720 
00721   int const n =
00722     this->send_synch_message_helper_i (synch_message, max_wait_time);
00723 
00724   // What about partially sent messages.
00725   if (n == -1 || n == 1)
00726     {
00727       return n;
00728     }
00729 
00730   if (TAO_debug_level > 3)
00731     {
00732       ACE_DEBUG ((LM_DEBUG,
00733          ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00734          ACE_TEXT ("preparing to add to queue before leaving\n"),
00735          this->id ()));
00736     }
00737 
00738   // Till this point we shouldn't have any copying and that is the
00739   // point anyway. Now, remove the node from the list
00740   synch_message.remove_from_list (this->head_, this->tail_);
00741 
00742   // Clone the node that we have.
00743   TAO_Queued_Message *msg =
00744     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00745 
00746   // Stick it in the queue
00747   msg->push_back (this->head_, this->tail_);
00748 
00749   TAO_Flushing_Strategy *flushing_strategy =
00750     this->orb_core ()->flushing_strategy ();
00751 
00752   int const result = flushing_strategy->schedule_output (this);
00753 
00754   if (result == -1)
00755     {
00756       if (TAO_debug_level > 5)
00757         {
00758           ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00759                       "message_i, dequeuing msg due to schedule_output "
00760                       "failure\n", this->id ()));
00761         }
00762       msg->remove_from_list (this->head_, this->tail_);
00763       msg->destroy ();
00764     }
00765   else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00766     {
00767       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00768       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00769       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00770       (void) flushing_strategy->flush_transport (this, 0);
00771     }
00772 
00773   return 1;
00774 }

virtual int TAO_Transport::send_request ( TAO_Stub stub,
TAO_ORB_Core orb_core,
TAO_OutputCDR stream,
TAO_Message_Semantics  message_semantics,
ACE_Time_Value max_time_wait 
) [pure virtual]

Prepare the waiting and demuxing strategy to receive a reply for a new request. Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.
int TAO_Transport::send_synch_message_helper_i ( TAO_Synch_Queued_Message s,
ACE_Time_Value max_wait_time 
) [private]

A helper method used by send_synchronous_message_i() and send_reply_message_i(). Reusable code that could be used by both the methods.

Definition at line 777 of file Transport.cpp.

00779 {
00780   TAO::Transport::Drain_Constraints dc(
00781       max_wait_time, this->using_blocking_io_for_synch_messages());
00782 
00783   Drain_Result const n = this->drain_queue_i (dc);
00784 
00785   if (n == DR_ERROR)
00786     {
00787       synch_message.remove_from_list (this->head_, this->tail_);
00788       return -1; // Error while sending...
00789     }
00790   else if (n == DR_QUEUE_EMPTY)
00791     {
00792       return 1;  // Empty queue, message was sent..
00793     }
00794 
00795   if (synch_message.all_data_sent ())
00796     {
00797       return 1;
00798     }
00799 
00800   return 0;
00801 }

int TAO_Transport::send_synchronous_message_i ( const ACE_Message_Block message_block,
ACE_Time_Value max_wait_time 
) [private]

Send a synchronous message, i.e. block until the message is on the wire

Definition at line 617 of file Transport.cpp.

00619 {
00620   // We are going to block, so there is no need to clone
00621   // the message block.
00622   size_t const total_length = mb->total_length ();
00623   TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00624 
00625   synch_message.push_back (this->head_, this->tail_);
00626 
00627   int const result = this->send_synch_message_helper_i (synch_message,
00628                                                         max_wait_time);
00629   if (result == -1 && errno == ETIME)
00630     {
00631       if (total_length == synch_message.message_length ()) //none was sent
00632         {
00633           if (TAO_debug_level > 2)
00634             {
00635               ACE_DEBUG ((LM_DEBUG,
00636                           ACE_TEXT ("TAO (%P|%t) - ")
00637                           ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00638                           ACE_TEXT ("timeout encountered before any bytes sent\n"),
00639                           this->id ()));
00640             }
00641           throw ::CORBA::TIMEOUT (
00642             CORBA::SystemException::_tao_minor_code (
00643               TAO_TIMEOUT_SEND_MINOR_CODE,
00644               ETIME),
00645             CORBA::COMPLETED_NO);
00646         }
00647       else
00648         {
00649           return -1;
00650         }
00651     }
00652   else if(result == -1 || result == 1)
00653     {
00654       return result;
00655     }
00656 
00657   TAO_Flushing_Strategy *flushing_strategy =
00658     this->orb_core ()->flushing_strategy ();
00659   if (flushing_strategy->schedule_output (this) == -1)
00660     {
00661       synch_message.remove_from_list (this->head_, this->tail_);
00662       if (TAO_debug_level > 0)
00663         {
00664           ACE_ERROR ((LM_ERROR,
00665                       ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00666                       ACE_TEXT ("send_synchronous_message_i, ")
00667                       ACE_TEXT ("error while scheduling flush - %m\n"),
00668                       this->id ()));
00669         }
00670       return -1;
00671     }
00672 
00673   // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
00674   // because we're always going to flush anyway.
00675 
00676   // Release the mutex, other threads may modify the queue as we
00677   // block for a long time writing out data.
00678   int flush_result;
00679   {
00680     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00681     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00682     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00683 
00684     flush_result = flushing_strategy->flush_message (this,
00685                                                      &synch_message,
00686                                                      max_wait_time);
00687   }
00688 
00689   if (flush_result == -1)
00690     {
00691       synch_message.remove_from_list (this->head_, this->tail_);
00692 
00693       // We don't need to do anything special for the timeout case.
00694       // The connection is going to get closed and the Transport destroyed.
00695       // The only thing to do maybe is to empty the queue.
00696 
00697       if (TAO_debug_level > 0)
00698         {
00699           ACE_ERROR ((LM_ERROR,
00700              ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00701              ACE_TEXT ("error while sending message - %m\n"),
00702              this->id ()));
00703         }
00704 
00705       return -1;
00706     }
00707 
00708   return 1;
00709 }

size_t TAO_Transport::sent_byte_count ( void   )  const

Accessor to sent_byte_count_.

Definition at line 203 of file Transport.inl.

00204 {
00205   return this->sent_byte_count_;
00206 }

void TAO_Transport::set_bidir_context_info ( TAO_Operation_Details opdetails  )  [virtual]

These classes need privileged access to:

Definition at line 2786 of file Transport.cpp.

02787 {
02788 }

void TAO_Transport::set_flush_in_post_open ( void   ) 

Set the flush in post open flag.

Definition at line 209 of file Transport.inl.

00210 {
00211   this->flush_in_post_open_ = true;
00212 }

TAO::Transport::Stats * TAO_Transport::stats ( void   )  const

Transport statistics.

Definition at line 217 of file Transport.inl.

00218 {
00219   return this->stats_;
00220 }

CORBA::ULong TAO_Transport::tag ( void   )  const

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 14 of file Transport.inl.

00015 {
00016   return this->tag_;
00017 }

int TAO_Transport::tear_listen_point_list ( TAO_InputCDR cdr  )  [virtual]

Extracts the list of listen points from the cdr stream. The list would have the protocol specific details of the ListenPoints

Definition at line 289 of file Transport.cpp.

00290 {
00291   ACE_NOTSUP_RETURN (-1);
00292 }

TAO_Transport_Mux_Strategy * TAO_Transport::tms ( void   )  const

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 26 of file Transport.inl.

00027 {
00028   return tms_;
00029 }

TAO::Transport_Cache_Manager & TAO_Transport::transport_cache_manager ( void   )  [private]

Helper method that returns the Transport Cache Manager.

Definition at line 2610 of file Transport.cpp.

02611 {
02612   return this->orb_core_->lane_resources ().transport_cache ();
02613 }

int TAO_Transport::update_transport ( void   ) 

Cache management.

Definition at line 514 of file Transport.cpp.

00515 {
00516   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00517 }

bool TAO_Transport::using_blocking_io_for_asynch_messages (  )  const [private]

Return true if blocking I/O should be used for sending asynchronous (AMI calls, non-blocking oneways, responses to operations, etc.) messages. This is determined based on the current flushing strategy.

Definition at line 2816 of file Transport.cpp.

02817 {
02818   return false;
02819 }

bool TAO_Transport::using_blocking_io_for_synch_messages (  )  const [private]

Return true if blocking I/O should be used for sending synchronous (two-way, reliable oneways, etc.) messages. This is determined based on the current flushing and waiting strategies.

Definition at line 2806 of file Transport.cpp.

02807 {
02808   if (this->wait_strategy()->can_process_upcalls())
02809   {
02810     return false;
02811   }
02812   return true;
02813 }

TAO_Wait_Strategy * TAO_Transport::wait_strategy ( void   )  const

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 33 of file Transport.inl.

00034 {
00035   return this->ws_;
00036 }

void TAO_Transport::wchar_translator ( TAO_Codeset_Translator_Base tf  ) 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 158 of file Transport.inl.

00159 {
00160   this->wchar_translator_ = tf;
00161   this->tcs_set_ = 1;
00162 }

TAO_Codeset_Translator_Base * TAO_Transport::wchar_translator ( void   )  const

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 145 of file Transport.inl.

00146 {
00147   return this->wchar_translator_;
00148 }


Friends And Related Function Documentation

These classes need privileged access to:

Definition at line 948 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]

These classes need privileged access to:

Definition at line 947 of file Transport.h.

friend class TAO_Thread_Per_Connection_Handler [friend]

Needs priveleged access to event_handler_i ()

Definition at line 952 of file Transport.h.


Member Data Documentation

Use to check if bidirectional info has been synchronized with the peer. Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 1113 of file Transport.h.

Our entry in the cache. We don't own this. It is here for our convenience. We cannot just change things around.

Definition at line 1085 of file Transport.h.

Additional member values required to support codeset translation.

@Phil, I think it would be nice if we could think of a way to do the following. We have been trying to use the transport for marking about translator factories and such! IMHO this is a wrong encapulation ie. trying to populate the transport object with these details. We should probably have a class something like TAO_Message_Property or TAO_Message_Translator or whatever (I am sure you get the idea) and encapsulate all these details. Coupling these seems odd. if I have to be more cynical we can move this to the connection_handler and it may more sense with the DSCP stuff around there. Do you agree?

Definition at line 1191 of file Transport.h.

The queue will start draining no later than <queeing_deadline_> if* the deadline is

Definition at line 1130 of file Transport.h.

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1203 of file Transport.h.

Indicate that flushing needs to be done in post_open().

Definition at line 1223 of file Transport.h.

The timer ID.

Definition at line 1133 of file Transport.h.

ACE_Lock* TAO_Transport::handler_lock_ [mutable, protected]

Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock(). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 1147 of file Transport.h.

Implement the outgoing data queue.

Definition at line 1118 of file Transport.h.

size_t TAO_Transport::id_ [protected]

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 1157 of file Transport.h.

Queue of the consolidated, incoming messages..

Definition at line 1122 of file Transport.h.

TAO::Incoming_Message_Stack TAO_Transport::incoming_message_stack_ [protected]

Stack of incoming fragments, consolidated messages are going to be enqueued in "incoming_message_queue_"

Definition at line 1126 of file Transport.h.

bool TAO_Transport::is_connected_ [protected]

Is this transport really connected or not. In case of oneways with SYNC_NONE Policy we don't wait until the connection is ready and we buffer the requests in this transport until the connection is ready

Definition at line 1171 of file Transport.h.

Our messaging object.

Definition at line 1176 of file Transport.h.

These classes need privileged access to:

Definition at line 1115 of file Transport.h.

Global orbcore resource.

Definition at line 1081 of file Transport.h.

TAO_SYNCH_MUTEX TAO_Transport::output_cdr_mutex_ [mutable, private]

lock for synchronizing Transport OutputCDR access

Definition at line 1226 of file Transport.h.

Holds the partial GIOP message (if there is one).

Definition at line 1206 of file Transport.h.

unsigned long TAO_Transport::purging_order_ [protected]

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 1160 of file Transport.h.

Size of the buffer received.

Definition at line 1163 of file Transport.h.

size_t TAO_Transport::sent_byte_count_ [protected]

Number of bytes sent.

Definition at line 1166 of file Transport.h.

TAO::Transport::Stats* TAO_Transport::stats_ [private]

Statistics.

Definition at line 1219 of file Transport.h.

CORBA::ULong const TAO_Transport::tag_ [protected]

IOP protocol tag.

Definition at line 1078 of file Transport.h.

These classes need privileged access to:

Definition at line 1119 of file Transport.h.

The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1197 of file Transport.h.

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 1089 of file Transport.h.

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 1136 of file Transport.h.

These classes need privileged access to:

Definition at line 1192 of file Transport.h.

Strategy for waiting for the reply after sending the request.

Definition at line 1092 of file Transport.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on Sun Nov 22 23:27:19 2009 for TAO by  doxygen 1.6.1