We have already explored how to use TAO's COS Event Service to receive updated stock prices, but what if we are not interested in all the stocks? One approach is to use multiple event channels, each one carrying different traffic. For example, each event channel could carry only a subset of the stocks. In this section we will explore another solution, using TAO's real-time Event Service to perform filtering for us. TAO's real-time Event Service can do many other things, like preserving priority end-to-end, using multicast to conserve network resources, generating timeout and interval events, and it can collaborate with TAO's Scheduling Service to analyze the schedulability of your system.
For this example we will use the same data structures that we used in the previous example, i.e., the events will be identical. TAO's RT Event Service can be configured to carry your events in a type-safe manner, or you can use custom marshaling to send non-IDL structures in the event, but it is easier to use it like the COS Event Service first.
Connecting as a consumer is very similar. Some of the base classes and signatures change, but it is basically the same idea: First let us define the consumer object:
class Stock_Consumer : public POA_RtecEventComm::PushConsumer { public: Stock_Consumer (); void push (const RtecEventComm::EventSet& data); void disconnect_push_consumer (void); // details omitted };
Notice that we receive an event set instead of a single event. The event channel can use this feature to queue multiple events and push them in a single operation. First we need to extract the event data from the any:
void Stock_Consumer::push (const RtecEventComm::EventSet &data) { for (CORBA::ULong i = 0; i != data.length (); ++i) { RtecEventComm::Event &e = data[i]; Quoter::Event *event; if ((e.data.any_value >>= event) == 0) continue; // Invalid event
Notice that the events have more structure, they have a clearly separated header and data, and the data has more than just an any. The header is used to provide filtering, and the event data field can be configured at compile time to carry whatever IDL structures you want. Now we can print out the new stock price:
std::cout << "The new price for one stock in \"" << event->full_name.in () << "\" (" << event->symbol.in () << ") is " << event->price << std::endl; }
We also need to implement the disconnect callback:
void Stock_Consumer::disconnect_push_consumer (void) { this->supplier_proxy_ = CosEventChannelAdmin::ProxyPushSupplier::_nil (); }
As with the COS Event Channel we can voluntarily disconnect, too:
void Stock_Consumer::disconnect () { // Do not receive any more events... this->supplier_proxy_->disconnect_push_supplier (); }
Connecting to the RT event channel is very similar to connecting to the regular event channel. The only difference is that we must specify the events that we want to receive. This is described using a fairly complex IDL structure, but TAO provides a helper class to generate it. We will assume that we are using the naming service or something similar to obtain a reference to the event service:
CORBA::Object_var tmp = naming_context->resolve (name); RtecEventChannelAdmin::EventChannel_var event_channel = RtecEventChannelAdmin::EventChannel::_narrow (tmp);
Now we use the event channel to obtain the factory used for consumer connections:
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = event_channel->for_consumers ();
And use the factory to obtain a proxy:
void Stock_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin) { this->supplier_proxy_ = consumer_admin->obtain_push_supplier ();
Now we list the events that we want to receive. We use a simple algorithm to assign an event type to each stock symbol:
CORBA::ULong rhat_event_type = (int('R') << 24) | (int('H') << 16) | (int('A') << 8) | int('T'); CORBA::ULong aaaa_event_type = (int('A') << 24) | (int('A') << 16) | (int('A') << 8) | int('A');
Now we create the subscription:
ACE_ConsumerQOS_Factory subscriptions; subscriptions.insert_type (rhat_event_type, 0); subscriptions.insert_type (aaaa_event_type, 0);
And connect to the proxy:
RtecEventComm::PushConsumer_var myself = this->_this (); this->supplier_proxy_->connect_push_consumer ( myself.in (), subscriptions.get_ConsumerQOS ()); }
As with the COS Event Channel example we will make our
implementation of the Modify_Stock
interface
generate events whenever the price changes:
class Quoter_Modify_Stock_i : public POA_Quoter::Modify_Stock { public: Quoter_Modify_Stock_i (const char *symbol, const char *full_name, CORBA::Double price); void set_price (CORBA::Double new_price); void disconnect_push_supplier (void); private: Quoter::Event data_; RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; POA_RtecEventComm::PushSupplier_tie < Quoter_Stock_i > supplier_personality_; };
The implementation of the set_price()
method is
very similar.
First we store the new price:
void Quoter_Stock_i::set_price (CORBA::Double new_price) { this->data_.price = new_price;
Next we prepare the event. This time we must create a sequence, but we just have one element in it:
RtecEventComm::EventSet event (1); event.length (1);
We set the event type based on the stock symbol:
RtecEventComm::Event &e = event[0]; const char *symbol = this->data_.symbol; e.header.type = ((int(symbol[0]) << 24) | (int(symbol[1]) << 16) | (int(symbol[2]) << 8) | int(symbol[3])); e.header.source = 1;
The event source is not used in this example, but it must be non-zero. Now we can set the data:
e.data.any_value <<= this->data_;
and send the event to the event channel:
this->consumer_proxy_->push (event); }
As in the COS Event Channel case we need a supplier personality to connect to it. We gain access to the Event Service, for example using the naming service:
CORBA::Object_var tmp = naming_context->resolve (name); RtecEventChannelAdmin::EventChannel_var event_channel = RtecEventChannelAdmin::EventChannel::_narrow (tmp);
Now we use the event channel to obtain the factory used for supplier connections:
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = event_channel->for_suppliers ();
And use the factory to obtain a proxy:
this->consumer_proxy_ = supplier_admin->obtain_push_consumer ();
We build our publications so the event channel can match consumers and suppliers based on their common events:
const char *symbol = this->data_.symbol; CORBA::ULong type = ((int(symbol[0]) << 24) | (int(symbol[1]) << 16) | (int(symbol[2]) << 8) | int(symbol[3])); CORBA::ULong source = 1; ACE_SupplierQOS_Factory publications; publications.insert_type (type, source, 0, 1);
Finally we connect to the consumer proxy:
RtecEventComm::PushSupplier_var supplier = this->supplier_personality_._this (); this->consumer_proxy_->connect_push_supplier (supplier);
The implementation of the disconnect callback is as before:
void Quoter_Stock_i::disconnect_push_supplier (void) { // Forget about the consumer it is not there anymore this->consumer_proxy_ = RtecEventChannelAdmin::ProxyPushConsumer::_nil (); }
Implement a consumer that receives the price update events,
The header file is already provided, along with a sample client.cpp. And other support files Quoter.idl, Makefile, Stock_i.h, Stock_i.cpp, Stock_Factory_i.h, Stock_Factory_i.cpp, and server.cpp.
Compare your solution with Stock_Consumer.cpp.
To test your changes you need to run three programs, first TAO's Naming Service:
$ $TAO_ROOT/orbsvcs/Naming_Service/tao_cosnaming
then run TAO's Real-time Event Service
$ $TAO_ROOT/orbsvcs/Event_Service/tao_rtevent
Now you can run your client:
$ client AAAA CCCC
and finally the server:
$ server MSFT BBBB CCCC < stock_list.txt
Here is the stock_list.txt file.
Run the same configuration as above, but this time run multiple clients and servers:
$ client AAAA MSFT $ client PPPP $ server AAAA < stock_list1.txt $ server QQQQ < stock_list2.txt
Do the clients receive all the events from both servers?
Here are the stock_list1.txt and stock_list2.txt files.