22 #ifndef __M2ETIS_PUBSUB_TREE_H__
23 #define __M2ETIS_PUBSUB_TREE_H__
35 #include "boost/bind.hpp"
37 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
39 #pragma warning(disable : 4127)
52 template<
class ChannelType,
class NetworkType,
class EventType>
59 typename NetworkType::Key self_;
60 typename NetworkType::Key _rendezvous;
61 typename NetworkType::Key _root;
66 uint64_t resubscribeID_;
72 Tree(uint16_t tree_name,
const typename NetworkType::Key &
self,
const typename NetworkType::Key & rendezvous,
const typename NetworkType::Key & root,
PubSubSystemEnvironment * pssi,
int cI) :
ChannelType::RoutingStrategy(tree_name, pssi, root),
ChannelType::OrderStrategy(pssi, false),
ChannelType::DeliverStrategy(pssi, self),
ChannelType::ValidityStrategy(pssi), factory_(message::MessageFactory<
ChannelType, NetworkType>()), topic_(tree_name), deliver_f(), controller_(pssi->_factory->createNetworkController(NetworkType())), self_(self), _rendezvous(rendezvous), _root(root), pssi_(pssi), subscribed_(false), buffer_(), channelID_(cI), resubscribeID_(UINT64_MAX) {
73 ChannelType::RoutingStrategy::setSelf(self_);
74 ChannelType::FilterStrategy::setSelf(self_);
75 ChannelType::OrderStrategy::setRoot(self_ == _rendezvous);
77 ChannelType::RoutingStrategy::setUnsubscriptionListener(boost::bind(&Tree::processRoutingStrategyUnsubscribeNotification,
this, _1));
79 ChannelType::RoutingStrategy::configureSendCallback(boost::bind(&Tree::sendRoutingControlMessage,
this, _1, _2, _3));
81 ChannelType::DeliverStrategy::configureSendCallback(boost::bind(&Tree::sendDeliverControlMessage,
this, _1, _2, _3));
83 ChannelType::OrderStrategy::configureSendCallback(boost::bind(&Tree::sendOrderControlMessage,
this, _1, _2, _3));
84 ChannelType::OrderStrategy::setHn(self_);
85 registerMessageTypes();
86 registerNetworkCallbacks();
90 if (resubscribeID_ != UINT64_MAX) {
92 resubscribeID_ = UINT64_MAX;
94 deregisterNetworkCallbacks();
98 inline typename NetworkType::Key
getSelf()
const {
102 inline typename NetworkType::Key
getRoot()
const {
119 typename IMessage::Ptr topublish = boost::static_pointer_cast<IMessage>(msg);
121 ChannelType::RoutingStrategy::configureRoutingInfo(tmp, topublish->routingInfo, topublish->receiver);
122 ChannelType::ValidityStrategy::configureValidityInfo(topublish->validityInfo);
123 topublish->type = tmp | topic_;
124 sendMessages(topublish);
133 if (resubscribeID_ != UINT64_MAX) {
135 resubscribeID_ = UINT64_MAX;
142 unsubscribe_impl(predicate);
150 typename IMessage::Ptr todeliver = boost::static_pointer_cast<IMessage>(msg);
153 if (ChannelType::DeliverStrategy::processOtherControlPayload(todeliver->deliverInfo, todeliver->sender)) {
154 if (ChannelType::OrderStrategy::processControlPayload(todeliver->orderInfo, todeliver->sender)) {
156 sendMessages(todeliver);
161 if (ChannelType::DeliverStrategy::processControlPayload(todeliver->deliverInfo, todeliver->sender)) {
163 sendMessages(todeliver);
172 if (ChannelType::DeliverStrategy::processOtherControlPayload(todeliver->deliverInfo, todeliver->sender)) {
173 ChannelType::RoutingStrategy::processControlPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
174 todeliver->type = tmp | topic_;
175 disseminateMessage(todeliver);
181 if (ChannelType::DeliverStrategy::processSubscribePayload(todeliver->deliverInfo, todeliver->sender)) {
182 if (ChannelType::RoutingStrategy::processSubscribePayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp)) {
183 ChannelType::OrderStrategy::processSubscribePayload(todeliver->orderInfo, todeliver->sender);
185 ChannelType::FilterStrategy::processSubscribePayload(todeliver->sender, todeliver->filterInfo);
187 todeliver->type = tmp | topic_;
189 disseminateMessage(todeliver);
194 if (ChannelType::DeliverStrategy::processUnsubscribePayload(todeliver->deliverInfo, todeliver->sender)) {
195 ChannelType::FilterStrategy::processUnsubscribePayload(todeliver->sender, todeliver->filterInfo);
196 ChannelType::RoutingStrategy::processUnsubscribePayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
197 todeliver->type = tmp | topic_;
198 disseminateMessage(todeliver);
203 ChannelType::RoutingStrategy::processPublishPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
204 ChannelType::ValidityStrategy::processPublishPayload(todeliver->validityInfo);
205 todeliver->type = tmp | topic_;
206 if (ChannelType::DeliverStrategy::processPublishPayload(todeliver->deliverInfo, todeliver->sender)) {
207 ChannelType::OrderStrategy::processPublishPayload(todeliver->orderInfo, todeliver->sender);
209 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
210 sendMessages(todeliver);
213 if (ChannelType::RoutingStrategy::selfSubscribed()) {
214 if (todeliver->sender == todeliver->receiver) {
215 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
219 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
220 uint64_t a = buffer_.
insert(boost::bind(&BasicDeliverCallbackInterface<EventType>::deliverCallback, deliver_f, todeliver));
221 ChannelType::OrderStrategy::receive(a, todeliver->orderInfo, todeliver->sender);
229 ChannelType::RoutingStrategy::processNotifyPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
230 ChannelType::ValidityStrategy::processNotifyPayload(todeliver->validityInfo);
231 todeliver->type = tmp | topic_;
232 if (ChannelType::DeliverStrategy::processNotifyPayload(todeliver->deliverInfo, todeliver->sender)) {
233 ChannelType::OrderStrategy::processNotifyPayload(todeliver->orderInfo, todeliver->sender);
235 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
236 sendMessages(todeliver);
238 for (
typename NetworkType::Key to : ChannelType::RoutingStrategy::getTargetNodes(tmp, todeliver->routingInfo, todeliver->receiver)) {
239 ChannelType::OrderStrategy::notifyRemovedMessage(todeliver->orderInfo, to);
243 if (ChannelType::RoutingStrategy::selfSubscribed()) {
244 if (ChannelType::FilterStrategy::match(todeliver->payload)) {
245 if (todeliver->sender == todeliver->receiver) {
246 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
250 if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
251 uint64_t a = buffer_.
insert(boost::bind(&BasicDeliverCallbackInterface<EventType>::deliverCallback, deliver_f, todeliver));
252 ChannelType::OrderStrategy::receive(a, todeliver->orderInfo, todeliver->sender);
267 void registerMessageTypes()
const {
276 void registerNetworkCallbacks() {
277 typename net::NetworkController<NetworkType>::net_deliver_func deliv = boost::bind(&Tree::deliver,
this, _1);
278 typename net::NetworkController<NetworkType>::net_forward_func forw = boost::bind(&Tree::forward,
this, _1);
280 if (ChannelType::RoutingStrategy::register_forward_subscribe) {
283 if (ChannelType::RoutingStrategy::register_deliver_subscribe) {
286 if (ChannelType::RoutingStrategy::register_forward_unsubscribe) {
289 if (ChannelType::RoutingStrategy::register_deliver_unsubscribe) {
298 void deregisterNetworkCallbacks() {
303 if (ChannelType::RoutingStrategy::register_forward_subscribe) {
306 if (ChannelType::RoutingStrategy::register_deliver_subscribe) {
309 if (ChannelType::RoutingStrategy::register_forward_unsubscribe) {
312 if (ChannelType::RoutingStrategy::register_deliver_unsubscribe) {
328 uint64_t a = buffer_.
insert(boost::bind(&Tree::reallySendMsg,
this, msg));
330 ChannelType::OrderStrategy::configureOrderInfo(a, action, msg->orderInfo, msg->receiver);
339 std::vector<typename NetworkType::Key> v = ChannelType::RoutingStrategy::getTargetNodes(action, msg->routingInfo, msg->receiver);
342 msg->type = action | topic_;
344 for (
typename NetworkType::Key to : v) {
348 if (action ==
message::NOTIFY && !ChannelType::FilterStrategy::match(to, msg->filterInfo, msg->payload)) {
349 ChannelType::OrderStrategy::notifyRemovedMessage(msg->orderInfo, to);
353 ChannelType::OrderStrategy::notifyRemovedMessage(msg->orderInfo, to);
356 typename IMessage::Ptr msg2 = boost::make_shared<IMessage>(*msg);
357 uint64_t a = buffer_.
insert(boost::bind(&Tree::reallySendMsg,
this, msg2));
358 ChannelType::OrderStrategy::configureOrderInfo(a, action, msg2->orderInfo, to);
364 uint64_t a = buffer_.
insert(boost::bind(&net::NetworkController<NetworkType>::send, controller_, boost::static_pointer_cast<message::NetworkMessage<NetworkType>>(msg)));
365 ChannelType::DeliverStrategy::configureDeliverInfo(a, action, msg->deliverInfo, msg->receiver, msg->ctrlType_);
373 msg->receiver = _root;
377 ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
378 msg->type = tmp | topic_;
381 msg->receiver = receiver;
384 assert(
false &&
"ControlTarget unknown");
390 SimulationEventType v;
392 v._simChannel = channelID_;
393 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
395 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
399 msg->deliverInfo = boost::static_pointer_cast<
typename ChannelType::DeliverStrategy::DeliverInfoType>(p);
401 sendControlMessage(msg, receiver, target);
406 SimulationEventType v;
408 v._simChannel = channelID_;
409 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
411 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
415 msg->orderInfo = boost::static_pointer_cast<
typename ChannelType::OrderStrategy::OrderInfoType>(p);
417 sendControlMessage(msg, receiver, target);
420 void sendRoutingControlMessage(
typename message::RoutingInfo<NetworkType>::Ptr p,
const typename NetworkType::Key & receiver,
ControlTarget target) {
422 SimulationEventType v;
424 v._simChannel = channelID_;
425 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
427 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
431 msg->routingInfo = boost::static_pointer_cast<
typename ChannelType::RoutingStrategy::RoutingInfoType>(p);
433 sendControlMessage(msg, receiver, target);
436 void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
437 deliver_f = &callback;
440 SimulationEventType v;
442 v._simChannel = channelID_;
443 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
445 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
448 ChannelType::RoutingStrategy::selfSubscribed(
true);
450 ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
451 msg->type = tmp | topic_;
453 ChannelType::FilterStrategy::getSubscribePayload(predicate, subscribed_, msg->filterInfo);
458 bool subs(BasicDeliverCallbackInterface<EventType> * callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
460 subscribe_impl(*callback, predicate);
467 void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate, Int2Type<true>) {
468 subscribe_impl(callback, predicate);
469 if (resubscribeID_ != UINT64_MAX) {
472 resubscribeID_ = pssi_->
scheduler_.
runRepeated(ChannelType::RoutingStrategy::periodic_, boost::bind(&Tree::subs,
this, &callback, predicate), 5);
476 void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate, Int2Type<false>) {
477 subscribe_impl(callback, predicate);
481 void unsubscribe_impl() {
485 SimulationEventType v;
487 v._simChannel = channelID_;
488 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
490 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
494 ChannelType::RoutingStrategy::selfSubscribed(
false);
496 ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
497 msg->type = tmp | topic_;
498 ChannelType::FilterStrategy::getUnsubscribePayload(msg->filterInfo);
503 void unsubscribe_impl(boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
505 SimulationEventType v;
507 v._simChannel = channelID_;
508 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
510 typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
513 typename ChannelType::FilterStrategy::FilterUnsubscribeInformation unsubscribeAction = ChannelType::FilterStrategy::getUnsubscribePayload(msg->filterInfo, predicate);
515 if (unsubscribeAction == ChannelType::FilterStrategy::CANCELUNSUBSCRIBE) {
521 if (unsubscribeAction == ChannelType::FilterStrategy::UNSUBSCRIBEFROMTREE) {
526 if (unsubscribeAction == ChannelType::FilterStrategy::FORWARDUNSUBSCRIBE) {
531 ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
532 msg->type = tmp | topic_;
537 void unsubscribe_impl(Int2Type<true>) {
542 void unsubscribe_impl(Int2Type<false>) {
548 FIPtr forward(
typename message::NetworkMessage<NetworkType>::Ptr msg) {
549 typename IMessage::Ptr toforward = boost::static_pointer_cast<IMessage>(msg);
554 M2ETIS_THROW_FAILURE(
"Tree",
"Tree got message for forwarding with incorrect channel topic", 1);
558 bool changes =
false;
563 if (ChannelType::RoutingStrategy::processSubscribePayload(toforward->routingInfo, toforward->sender, toforward->receiver, tmp)) {
564 std::string filterinfo = ChannelType::FilterStrategy::processSubscribePayload(toforward->sender, toforward->filterInfo);
577 ChannelType::FilterStrategy::processUnsubscribePayload(toforward->sender, toforward->filterInfo);
578 ChannelType::RoutingStrategy::processUnsubscribePayload(toforward->routingInfo, toforward->sender, toforward->receiver, tmp);
587 toforward->type = tmp | topic_;
589 return boost::make_shared<ForwardInfo>(stop, changes);
596 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
boost::shared_ptr< OrderInfo > Ptr
boost::shared_ptr< M2Message< EventType > > Ptr
virtual void deliverCallback(const typename m2etis::message::M2Message< EventType >::Ptr m)=0
deliver a callback
void register_deliver(message::MessageType nr, net_deliver_func f)
NetworkType::Key getSelf() const
boost::shared_ptr< const ForwardInfo > FIPtr
boost::shared_ptr< DeliverInfo > Ptr
static const uint32_t MSG_TYPE_MASK
uint16_t getTopic() const
Tree(uint16_t tree_name, const typename NetworkType::Key &self, const typename NetworkType::Key &rendezvous, const typename NetworkType::Key &root, PubSubSystemEnvironment *pssi, int cI)
Constructor.
void deregister_deliver(message::MessageType nr)
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
unsigned short getChannel() const
NetworkType::Key getSelf() const
void deliver(uint64_t id, msgProcess proc)
ValidityT ValidityStrategy
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
void register_forward(message::MessageType nr, net_forward_func f)
Scheduler< util::RealTimeClock > scheduler_
PersistenceT PersistenceStrategy
void deregister_forward(message::MessageType nr)
void registerMessageType(message::MessageType type, const bool ack=true)
boost::shared_ptr< NetworkMessage > Ptr
uint64_t insert(const boost::function< void(void)> &func)
void subscribe(BasicDeliverCallbackInterface< EventType > &callback, boost::shared_ptr< filter::FilterExp< EventType > > predicate)
SecurityT SecurityStrategy
void publish(const typename message::M2Message< EventType >::Ptr msg)
Message Factory to create messages.
boost::shared_ptr< InternalMessage > Ptr
NetworkType::Key getRoot() const
void unsubscribe(boost::shared_ptr< filter::FilterExp< EventType > > predicate)
static const uint32_t ACTION_TYPE_MASK