TAO's COS Event Service

To poll the values of stocks constantly just to check if they have changed is not an efficient or scalable solution. We want to be informed when the price changes so we can take appropriate action. We could devise our own call back mechanism, but this kind of task is easier to achieve using the CORBA Event Service.

Defining the Event Type

We need to define an IDL struct that will carry our event data. Of course we want to include the stock price, its symbol and maybe its full name in the event:

  struct Event {
    double price;
    string symbol;
    string full_name;
  };

We also extend the Stock interface so we can modify the value:

  interface Modify_Stock : Stock {
    void set_price (in double new_price);
  };

Getting the Price Changes

Connecting as a consumer

Connecting as a consumer is a similar process, but we will use the more traditional inheritance based approach instead of TIE. First let us define the consumer object:

class Stock_Consumer : public POA_CosEventComm::PushConsumer {
public:
  Stock_Consumer ();

  void push (const CORBA::Any& data);
  void disconnect_push_consumer void);

  // details omitted
};

The disconnect_push_consumer() method is invoked by the Events Service when it is disconnecting, for example, because it was shut down before the Consumer got a chance to disconnect itself. The push() method is invoked by the Events Service whenever some event is sent by a supplier. Let's take a look at this method. First we need to extract the event data from the any:

void
Stock_Consumer::push (const CORBA::Any& data)
{
  const Quoter::Event *event = 0;
  if ((data >>= event) == 0)
    return; // Invalid event

Notice that the extraction can fail: anys can store all IDL data types, and only at extraction time are the types checked. Also notice that we use a pointer to the event; the CORBA rules are that variable sized structures, i.e., structures that contain elements of variable size, such as strings, are extracted by reference. We do not need to manage this memory, the ORB will destroy it for us. Now we can print out the new stock price:

  std::cout << "The new price for one stock in \""
            << event->full_name.in ()
            << "\" (" << event->symbol.in ()
            << ") is " << event->price << std::endl;
}

Going back to our example, when the event channel disconnects we receive a callback, too. At that point we want to forget about the original connection:

void
Stock_Consumer::disconnect_push_consumer void)
{
  this->supplier_proxy_ = CosEventChannelAdmin::ProxyPushSupplier::_nil ();
}

But why do we need to have a connection to the event channel in the first place? All we want is to receive events. The connection to the event channel will let you disconnect gracefully, so the event channel does not have to maintain resources for old consumers. For example, we could implement a method such as:

void
Stock_Consumer::disconnect ()
{
  // Do not receive any more events...
  this->supplier_proxy_->disconnect_push_supplier ();
}

How to connect to the event channel

Connecting to the event channel is a 3 step process. First we obtain a factory used by all the consumers that want to connect. Next we obtain a supplier proxy, so we can report when we do not want any more events. Finally we connect to the proxy to start receiving events.

We will assume that we are using the naming service or something similar to obtain a reference to the event service:

    CORBA::Object_var tmp = naming_context->resolve (name);
    CosEventChannelAdmin::EventChannel_var event_channel =
      CosEventChannelAdmin::EventChannel::_narrow (tmp);

Now we use the event channel to obtain the factory used for consumer connections:

    CosEventChannelAdmin::ConsumerAdmin_var consumer_admin =
      event_channel->for_consumers ();

And use the factory to obtain a proxy:

void
Stock_Consumer::connect (CosEventChannelAdmin::ConsumerAdmin_ptr consumer_admin)
{
    this->supplier_proxy_ =
      consumer_admin->obtain_push_supplier ();

And finally we connect:

    CosEventComm::PushConsumer_var myself = this->_this ();
    this->supplier_proxy_->connect_push_consumer (myself.in ());
}

Notifying the Price Changes

We will now examine how the suppliers generate events. Let us look at an implementation of the Modify_Stock interface:

class Quoter_Modify_Stock_i : public POA_Quoter::Modify_Stock {
public:
  Quoter_Modify_Stock_i (const char *symbol,
                         const char *full_name,
                         CORBA::Double price);

  void set_price (CORBA::Double new_price);

private:
  Quoter::Event data_;

  CosEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_;
};

Notice how we use the IDL structure to maintain the data. This is just to make the code a little shorter. The consumer_proxy_ object is just like the supplier_proxy_ discussed above, except that we also use it to send the events. The start of the set_price() method will look like this:

void
Quoter_Stock_i::set_price (CORBA::Double new_price)
{
   this->data_.price = new_price;

Next we prepare the event. The COS Events Service uses a CORBA any to send all the data, like this:

   CORBA::Any event;
   event <<= this->data_;

Finally we send the event to the consumer:

  this->consumer_proxy_->push (event);
}

Connecting to the Event Service as a Supplier

Sending the event was easy. Connecting to the Event Channel as a supplier is very similar to the connection as a consumer. We will need a CosEventComm::PushSupplier object. This is a good application of the TIE objects:

class Quoter_Stock_i : public POA_Quoter::Modify_Stock {
public:
  // some details removed...

  void disconnect_push_supplier (void);

private:
  POA_CosEventComm::PushSupplier_tie < Quoter_Stock_i > supplier_personality_;
};

The PushSupplier_tie is a template generated by the IDL compiler. It implements the CosEventComm::PushSupplier interface, but it actually just forwards all the calls to its single template argument. For example, in this case the disconnect_push_supplier call is implemented as:

template void
POA_CosEventComm::PushSupplier_tie < T >::disconnect_push_supplier ()
{
  this->ptr_->disconnect_push_supplier ();
}

The ptr_ field is actually a pointer to the template argument, so we don't have to implement a separate class just to receive a disconnect callback, we can use the same Modify_Stock_i class to do it.

Going back to the connection code, first we gain access to the Event Service, for example using the naming service:

    CORBA::Object_var tmp = naming_context->resolve (name);
    CosEventChannelAdmin::EventChannel_var event_channel =
      CosEventChannelAdmin::EventChannel::_narrow (tmp);

Now we use the event channel to obtain the factory used for supplier connections:

    CosEventChannelAdmin::SupplierAdmin_var supplier_admin =
      event_channel->for_suppliers ();

And use the factory to obtain a proxy:

    this->consumer_proxy_ =
      supplier_admin->obtain_push_consumer ();

And finally we use our supplier personality to connect to the consumer proxy:

    CosEventComm::PushSupplier_var supplier =
      this->supplier_personality_._this ();
    this->consumer_proxy_->connect_push_supplier (supplier);

Finally we implement the disconnect callback:

void
Quoter_Stock_i::disconnect_push_supplier (void)
{
  // Forget about the consumer.  It is not there anymore
  this->consumer_proxy_ =
    CosEventChannelAdmin::ProxyPushConsumer::_nil ();
}

Exercise 1

Implement a consumer that receives the price update events.

The header file is already provided, along with a sample client.cpp. And other support files Quoter.idl, Makefile, Stock_i.h, Stock_i.cpp, Stock_Factory_i.h, Stock_Factory_i.cpp, and server.cpp.

Solution

Compare your solution with Stock_Consumer.cpp.

Testing

To test your changes you need to run four programs, first TAO's Naming Service:

$ $TAO_ROOT/orbsvcs/Naming_Service/tao_cosnaming -m 1

The CORBA Event Service

$ $TAO_ROOT/orbsvcs/CosEvent_Service/tao_cosevent

Now you can run your client:

$ client

and finally the server:

$ server AAAA MSFT RHAT < stock_list.txt

Here is the stock_list.txt file.

Exercise 2

Run the same configuration as above, but this time run multiple clients and servers:

$ client
$ client
$ server AAAA BBBB < stock_list1.txt
$ server ZZZZ YYYY < stock_list2.txt

Do the clients receive all the events from both servers? What if you don't want to receive all the events, for example, because you are only interested in certain stocks?

Here are the stock_list1.txt and stock_list2.txt files.


Carlos O'Ryan
Last modified: Sun Apr 1 13:59:59 PDT 2001