35 CosEventComm::PushSupplier_ptr pushSupplier)
38 if(CORBA::is_nil(pushSupplier))
45 throw CosEventChannelAdmin::AlreadyConnected();
51 CosEventComm::PushSupplier::_duplicate(pushSupplier)
53 _connections.insert( Connections_t::value_type(oidstr,newConnection) );
57 CORBA::Request_var req =pushSupplier->_request(
"_is_a");
58 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59 req->set_return_type(CORBA::_tc_boolean);
74 DB(5,
"ProxyPushConsumer_i::disconnect_push_consumer()")
80 CORBA::Request_var req =
81 pos->second->_target->_request(
"disconnect_push_supplier");
82 pos->second->_remove_ref();
93 log.
os<<
"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<
'\n';
97 DB(5,
"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
104 #ifdef OMNIEVENTS_REAL_TIME_PUSH 112 _queue.push_back(
new CORBA::Any(event));
117 PortableServer::POA_ptr p,
118 list<CORBA::Any*>& q,
121 :
Servant(PortableServer::POA::_nil()),
130 using namespace PortableServer;
141 CORBA::PolicyList policies;
143 policies[0]=p->create_lifespan_policy(PERSISTENT);
144 policies[1]=p->create_id_assignment_policy(USER_ID);
145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
154 string poaName =string(
_channelName.in())+
".ProxyPushConsumer";
155 POAManager_var parentManager =p->the_POAManager();
156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
158 catch(POA::AdapterAlreadyExists&)
160 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - " 161 "POA::AdapterAlreadyExists")
163 catch(POA::InvalidPolicy& ex)
165 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - " 166 "POA::InvalidPolicy: "<<ex.index)
170 for(CORBA::ULong i=0; i<policies.length(); ++i)
171 policies[i]->destroy();
174 _poa->set_servant(
this);
180 DB(20,
"~ProxyPushConsumer_i()")
185 i->second->_remove_ref();
193 CosEventChannelAdmin::ProxyPushConsumer_ptr
196 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
210 CORBA::Request_var req =
211 curr->second->_target->_request(
"disconnect_push_supplier");
212 curr->second->_remove_ref();
216 req->send_deferred();
225 for(map<string,PersistNode*>::const_iterator i=node.
_child.begin();
229 const char* oidstr =i->first.c_str();
230 string ior( i->second->attrString(
"IOR") );
231 bool isProxy( i->second->attrLong(
"proxy") );
235 using namespace CosEventComm;
236 using namespace CosEventChannelAdmin;
238 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
243 DB(5,
"Reincarnated ProxyPushConsumer: "<<oidstr)
248 DB(15,
"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
251 ProxyPushSupplier_var proxySupp =
252 string_to_<ProxyPushSupplier>(ior.c_str());
253 PortableServer::ObjectId_var objectId =
254 PortableServer::string_to_ObjectId(oidstr);
255 CORBA::Object_var obj =
256 _poa->create_reference_with_id(
258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261 proxySupp->connect_push_consumer(thisCons.in());
262 DB(7,
"Reconnected ProxyPushConsumer: "<<oidstr)
265 catch(CORBA::BAD_PARAM&) {
267 DB(5,
"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
269 catch(CosEventChannelAdmin::AlreadyConnected&){
271 DB(7,
"Remote ProxyPushSupplier already connected: "<<oidstr)
273 catch(CosEventChannelAdmin::TypeError&){
275 DB(2,
"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
277 catch(CORBA::OBJECT_NOT_EXIST&) {}
278 catch(CORBA::TRANSIENT& ) {}
279 catch(CORBA::COMM_FAILURE& ) {}
286 for(Connections_t::const_iterator i=
_connections.begin();
290 i->second->output(os);
300 using namespace PortableServer;
301 ObjectId_var oid =
Orb::inst()._POACurrent->get_object_id();
302 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303 return string(oidStr.in());
305 catch(PortableServer::Current::NoContext&)
309 catch(CORBA::BAD_PARAM&)
316 throw CORBA::NO_IMPLEMENT();
325 #if OMNIEVENTS__DEBUG_SERVANT 326 int ProxyPushConsumer_i::Connection::_objectCount =0;
330 const char* channelName,
331 const string& oidstr,
332 CosEventComm::PushSupplier_ptr pushSupplier,
337 _target(pushSupplier),
338 _targetIsProxy(isProxy)
340 #if OMNIEVENTS__DEBUG_SERVANT 342 DB(21,
"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
348 #if OMNIEVENTS__DEBUG_SERVANT 350 DB(20,
"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
352 DB(20,
"ProxyPushConsumer_i::Connection::~Connection()")
367 DB(15,
"ProxyPushConsumer is federated.");
372 DB(2,
"ProxyPushConsumer got unexpected callback.");
380 os<<
"/SupplierAdmin/ProxyPushConsumer/"<<
_oidstr;
382 if(!CORBA::is_nil(
_target.in()))
384 CORBA::String_var iorstr;
386 os<<
" IOR="<<iorstr.in();
bool _useLocalQueue
Switch between RT/chunked modes.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
Connection()
NO IMPLEMENTATION.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access...
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
static bool exists()
Library code may create Event Service objects without the need for persistency.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
map< string, PersistNode * > _child
Interface for classes that wish to receive callbacks from deferred requests.
void output(ostream &os) const
Save this object's state to a stream.
PortableServer::POA_var _poa
list< CORBA::Any * > & _queue
const char * _channelName
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any *> &q, ConsumerAdmin_i &consumerAdmin)
ConsumerAdmin_i & _consumerAdmin
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
CosEventComm::PushSupplier_var _target
string currentObjectId() const
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
virtual ~ProxyPushConsumer_i()
bool _targetIsProxy
TRUE if _target is a ProxyPushSupplier.
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Connections_t _connections
Default servant for ProxyPushConsumer objects.
void output(ostream &os) const
Save this object's state to a stream.
CORBA::String_var _channelName
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.