m2etis  0.4
Tree.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_TREE_H__
23 #define __M2ETIS_PUBSUB_TREE_H__
24 
25 #include "m2etis/util/Logger.h"
26 
34 
35 #include "boost/bind.hpp"
36 
37 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
38  #pragma warning(push)
39  #pragma warning(disable : 4127)
40 #endif
41 
42 namespace m2etis {
43 namespace pubsub {
44 
52  template<class ChannelType, class NetworkType, class EventType>
56  uint16_t topic_;
59  typename NetworkType::Key self_;
60  typename NetworkType::Key _rendezvous;
61  typename NetworkType::Key _root;
63  bool subscribed_;
64  mutable MessageBuffer buffer_;
65  int channelID_;
66  uint64_t resubscribeID_;
67 
68  public:
72  Tree(uint16_t tree_name, const typename NetworkType::Key & self, const typename NetworkType::Key & rendezvous, const typename NetworkType::Key & root, PubSubSystemEnvironment * pssi, int cI) : ChannelType::RoutingStrategy(tree_name, pssi, root), ChannelType::OrderStrategy(pssi, false), ChannelType::DeliverStrategy(pssi, self), ChannelType::ValidityStrategy(pssi), factory_(message::MessageFactory<ChannelType, NetworkType>()), topic_(tree_name), deliver_f(), controller_(pssi->_factory->createNetworkController(NetworkType())), self_(self), _rendezvous(rendezvous), _root(root), pssi_(pssi), subscribed_(false), buffer_(), channelID_(cI), resubscribeID_(UINT64_MAX) {
73  ChannelType::RoutingStrategy::setSelf(self_);
74  ChannelType::FilterStrategy::setSelf(self_);
75  ChannelType::OrderStrategy::setRoot(self_ == _rendezvous); // if this node is the RP, then set true flag to make this node sequenzer
76 
77  ChannelType::RoutingStrategy::setUnsubscriptionListener(boost::bind(&Tree::processRoutingStrategyUnsubscribeNotification, this, _1));
78 
79  ChannelType::RoutingStrategy::configureSendCallback(boost::bind(&Tree::sendRoutingControlMessage, this, _1, _2, _3));
80  ChannelType::DeliverStrategy::configureCallback(boost::bind(&MessageBuffer::deliver, &buffer_, _1, _2));
81  ChannelType::DeliverStrategy::configureSendCallback(boost::bind(&Tree::sendDeliverControlMessage, this, _1, _2, _3));
82  ChannelType::OrderStrategy::configureCallback(boost::bind(&MessageBuffer::deliver, &buffer_, _1, _2));
83  ChannelType::OrderStrategy::configureSendCallback(boost::bind(&Tree::sendOrderControlMessage, this, _1, _2, _3));
84  ChannelType::OrderStrategy::setHn(self_);
85  registerMessageTypes();
86  registerNetworkCallbacks();
87  }
88 
89  virtual ~Tree() {
90  if (resubscribeID_ != UINT64_MAX) {
91  pssi_->scheduler_.stop(resubscribeID_);
92  resubscribeID_ = UINT64_MAX;
93  }
94  deregisterNetworkCallbacks();
95  deliver_f = nullptr;
96  }
97 
98  inline typename NetworkType::Key getSelf() const {
99  return controller_->getSelf();
100  }
101 
102  inline typename NetworkType::Key getRoot() const {
103  return _root;
104  }
105 
106  inline unsigned short getChannel() const {
107  return channelID_;
108  }
109 
110  inline uint16_t getTopic() const {
111  return topic_;
112  }
113 
114  void subscribe(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
115  subscribe_impl(callback, predicate, Int2Type<ChannelType::RoutingStrategy::periodicSubscribtion>());
116  }
117 
118  void publish(const typename message::M2Message<EventType>::Ptr msg) {
119  typename IMessage::Ptr topublish = boost::static_pointer_cast<IMessage>(msg);
121  ChannelType::RoutingStrategy::configureRoutingInfo(tmp, topublish->routingInfo, topublish->receiver);
122  ChannelType::ValidityStrategy::configureValidityInfo(topublish->validityInfo);
123  topublish->type = tmp | topic_;
124  sendMessages(topublish);
125  }
126 
127  void unsubscribe() {
128  if (subscribed_) {
130  subscribed_ = false;
131  }
132 
133  if (resubscribeID_ != UINT64_MAX) {
134  pssi_->scheduler_.stop(resubscribeID_);
135  resubscribeID_ = UINT64_MAX;
136  }
137  }
138 
139  // for filter strategies: deregistering single filter:
140  void unsubscribe(boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
141  if (subscribed_) {
142  unsubscribe_impl(predicate);
143  subscribed_ = false;
144  }
145  }
146 
147  private:
148  void deliver(typename message::NetworkMessage<NetworkType>::Ptr msg) {
149  // Security::decrypt(message);
150  typename IMessage::Ptr todeliver = boost::static_pointer_cast<IMessage>(msg);
151 
152  if (todeliver->ctrlType_ == message::ControlType::ORDER) {
153  if (ChannelType::DeliverStrategy::processOtherControlPayload(todeliver->deliverInfo, todeliver->sender)) {
154  if (ChannelType::OrderStrategy::processControlPayload(todeliver->orderInfo, todeliver->sender)) {
155  // deliver further down the tree
156  sendMessages(todeliver);
157  }
158  }
159  return;
160  } else if (todeliver->ctrlType_ == message::ControlType::DELIVER) {
161  if (ChannelType::DeliverStrategy::processControlPayload(todeliver->deliverInfo, todeliver->sender)) {
162  // deliver further down the tree
163  sendMessages(todeliver);
164  }
165  return;
166  }
167 
168  message::ActionType tmp = static_cast<message::ActionType>(todeliver->type & message::ACTION_TYPE_MASK);
169 
170  switch(tmp) {
171  case message::CONTROL: {
172  if (ChannelType::DeliverStrategy::processOtherControlPayload(todeliver->deliverInfo, todeliver->sender)) {
173  ChannelType::RoutingStrategy::processControlPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
174  todeliver->type = tmp | topic_;
175  disseminateMessage(todeliver);
176  }
177  break;
178  }
179  case message::SUBSCRIBE: {
180  // give Routing and Filter the opportunity to update their internals
181  if (ChannelType::DeliverStrategy::processSubscribePayload(todeliver->deliverInfo, todeliver->sender)) {
182  if (ChannelType::RoutingStrategy::processSubscribePayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp)) {
183  ChannelType::OrderStrategy::processSubscribePayload(todeliver->orderInfo, todeliver->sender);
184 
185  ChannelType::FilterStrategy::processSubscribePayload(todeliver->sender, todeliver->filterInfo);
186  }
187  todeliver->type = tmp | topic_;
188  // message type was changed to control here perhaps need for logic change in routing strategies
189  disseminateMessage(todeliver);
190  }
191  break;
192  }
193  case message::UNSUBSCRIBE: {
194  if (ChannelType::DeliverStrategy::processUnsubscribePayload(todeliver->deliverInfo, todeliver->sender)) {
195  ChannelType::FilterStrategy::processUnsubscribePayload(todeliver->sender, todeliver->filterInfo);
196  ChannelType::RoutingStrategy::processUnsubscribePayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
197  todeliver->type = tmp | topic_;
198  disseminateMessage(todeliver);
199  }
200  break;
201  }
202  case message::PUBLISH: {
203  ChannelType::RoutingStrategy::processPublishPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
204  ChannelType::ValidityStrategy::processPublishPayload(todeliver->validityInfo);
205  todeliver->type = tmp | topic_;
206  if (ChannelType::DeliverStrategy::processPublishPayload(todeliver->deliverInfo, todeliver->sender)) {
207  ChannelType::OrderStrategy::processPublishPayload(todeliver->orderInfo, todeliver->sender);
208  if (todeliver->routingInfo->action != message::RoutingInfo<NetworkType>::RoutingType::STOP) {
209  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
210  sendMessages(todeliver);
211  }
212  }
213  if (ChannelType::RoutingStrategy::selfSubscribed()) {
214  if (todeliver->sender == todeliver->receiver) {
215  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
216  deliver_f->deliverCallback(todeliver);
217  }
218  } else {
219  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
220  uint64_t a = buffer_.insert(boost::bind(&BasicDeliverCallbackInterface<EventType>::deliverCallback, deliver_f, todeliver));
221  ChannelType::OrderStrategy::receive(a, todeliver->orderInfo, todeliver->sender);
222  }
223  }
224  }
225  }
226  break;
227  }
228  case message::NOTIFY: {
229  ChannelType::RoutingStrategy::processNotifyPayload(todeliver->routingInfo, todeliver->sender, todeliver->receiver, tmp);
230  ChannelType::ValidityStrategy::processNotifyPayload(todeliver->validityInfo);
231  todeliver->type = tmp | topic_;
232  if (ChannelType::DeliverStrategy::processNotifyPayload(todeliver->deliverInfo, todeliver->sender)) {
233  ChannelType::OrderStrategy::processNotifyPayload(todeliver->orderInfo, todeliver->sender);
234  if (todeliver->routingInfo->action != message::RoutingInfo<NetworkType>::RoutingType::STOP) {
235  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
236  sendMessages(todeliver);
237  } else {
238  for (typename NetworkType::Key to : ChannelType::RoutingStrategy::getTargetNodes(tmp, todeliver->routingInfo, todeliver->receiver)) {
239  ChannelType::OrderStrategy::notifyRemovedMessage(todeliver->orderInfo, to);
240  }
241  }
242  }
243  if (ChannelType::RoutingStrategy::selfSubscribed()) {
244  if (ChannelType::FilterStrategy::match(todeliver->payload)) {
245  if (todeliver->sender == todeliver->receiver) {
246  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
247  deliver_f->deliverCallback(todeliver);
248  }
249  } else {
250  if (ChannelType::ValidityStrategy::isValid(todeliver->validityInfo)) {
251  uint64_t a = buffer_.insert(boost::bind(&BasicDeliverCallbackInterface<EventType>::deliverCallback, deliver_f, todeliver));
252  ChannelType::OrderStrategy::receive(a, todeliver->orderInfo, todeliver->sender);
253  }
254  }
255  }
256  }
257  }
258  break;
259  }
260  default: {
261  M2ETIS_THROW_FAILURE("Tree", "deliver called with unknown action type", 1);
262  break;
263  }
264  }
265  }
266 
267  void registerMessageTypes() const {
268  // Register each message type for the tree (calculating from type and treeID)
269  controller_->registerMessageType(message::SUBSCRIBE | topic_);
270  controller_->registerMessageType(message::UNSUBSCRIBE | topic_);
271  controller_->registerMessageType(message::PUBLISH | topic_);
272  controller_->registerMessageType(message::NOTIFY | topic_);
273  controller_->registerMessageType(message::CONTROL | topic_);
274  }
275 
276  void registerNetworkCallbacks() {
277  typename net::NetworkController<NetworkType>::net_deliver_func deliv = boost::bind(&Tree::deliver, this, _1);
278  typename net::NetworkController<NetworkType>::net_forward_func forw = boost::bind(&Tree::forward, this, _1);
279 
280  if (ChannelType::RoutingStrategy::register_forward_subscribe) {
281  controller_->register_forward(message::SUBSCRIBE | topic_, forw);
282  }
283  if (ChannelType::RoutingStrategy::register_deliver_subscribe) {
284  controller_->register_deliver(message::SUBSCRIBE | topic_, deliv);
285  }
286  if (ChannelType::RoutingStrategy::register_forward_unsubscribe) {
287  controller_->register_forward(message::UNSUBSCRIBE | topic_, forw);
288  }
289  if (ChannelType::RoutingStrategy::register_deliver_unsubscribe) {
290  controller_->register_deliver(message::UNSUBSCRIBE | topic_, deliv);
291  }
292 
293  controller_->register_deliver(message::PUBLISH | topic_, deliv);
294  controller_->register_deliver(message::NOTIFY | topic_, deliv);
295  controller_->register_deliver(message::CONTROL | topic_, deliv);
296  }
297 
298  void deregisterNetworkCallbacks() {
299  /* look at http://www.boost.org/doc/libs/1_43_0/doc/html/function/tutorial.html#id866854
300  * for more information about binding a member method
301  * However, I'm using boost::bind here! (It's less clumsy.)
302  */
303  if (ChannelType::RoutingStrategy::register_forward_subscribe) {
304  controller_->deregister_forward(message::SUBSCRIBE | topic_);
305  }
306  if (ChannelType::RoutingStrategy::register_deliver_subscribe) {
307  controller_->deregister_deliver(message::SUBSCRIBE | topic_);
308  }
309  if (ChannelType::RoutingStrategy::register_forward_unsubscribe) {
310  controller_->deregister_forward(message::UNSUBSCRIBE | topic_);
311  }
312  if (ChannelType::RoutingStrategy::register_deliver_unsubscribe) {
313  controller_->deregister_deliver(message::UNSUBSCRIBE | topic_);
314  }
315 
316  controller_->deregister_deliver(message::PUBLISH | topic_);
317  controller_->deregister_deliver(message::NOTIFY | topic_);
318  controller_->deregister_deliver(message::CONTROL | topic_);
319  }
320 
321  void disseminateMessage(typename IMessage::Ptr msg) {
322  if (msg->routingInfo->action != message::RoutingInfo<NetworkType>::RoutingType::STOP) {
323  sendMessages(msg);
324  }
325  }
326 
327  void sendDirect(typename IMessage::Ptr msg) {
328  uint64_t a = buffer_.insert(boost::bind(&Tree::reallySendMsg, this, msg));
330  ChannelType::OrderStrategy::configureOrderInfo(a, action, msg->orderInfo, msg->receiver);
331  }
332 
333  void sendMessages(typename IMessage::Ptr msg) {
335 
336  // Algos remember: Publish is really handled in deliver!
337  // Even if you're "Root" and selfsubscribed, send that message to _yourself_
338  // because selfsubscribtion and "spread of message" is handled in deliver::publish!
339  std::vector<typename NetworkType::Key> v = ChannelType::RoutingStrategy::getTargetNodes(action, msg->routingInfo, msg->receiver);
340  // Build NetworkHeader from routingInfo
341  msg->sender = self_;
342  msg->type = action | topic_;
343 
344  for (typename NetworkType::Key to : v) {
345  // Using the same Message and sending it to all receivers. Don't change it later!
346  msg->receiver = to;
347 
348  if (action == message::NOTIFY && !ChannelType::FilterStrategy::match(to, msg->filterInfo, msg->payload)) {
349  ChannelType::OrderStrategy::notifyRemovedMessage(msg->orderInfo, to);
350  continue;
351  }
352  if ((action == message::NOTIFY || action == message::PUBLISH) && !ChannelType::ValidityStrategy::isValid(msg->validityInfo)) {
353  ChannelType::OrderStrategy::notifyRemovedMessage(msg->orderInfo, to);
354  continue;
355  }
356  typename IMessage::Ptr msg2 = boost::make_shared<IMessage>(*msg);
357  uint64_t a = buffer_.insert(boost::bind(&Tree::reallySendMsg, this, msg2));
358  ChannelType::OrderStrategy::configureOrderInfo(a, action, msg2->orderInfo, to);
359  }
360  }
361 
362  void reallySendMsg(typename IMessage::Ptr msg) {
364  uint64_t a = buffer_.insert(boost::bind(&net::NetworkController<NetworkType>::send, controller_, boost::static_pointer_cast<message::NetworkMessage<NetworkType>>(msg)));
365  ChannelType::DeliverStrategy::configureDeliverInfo(a, action, msg->deliverInfo, msg->receiver, msg->ctrlType_);
366  }
367 
368  void sendControlMessage(typename IMessage::Ptr msg, const typename NetworkType::Key & receiver, ControlTarget target) {
369  msg->sender = self_;
370  msg->type = message::CONTROL | topic_;
371 
372  if (target == ControlTarget::ROOT) {
373  msg->receiver = _root;
374  sendDirect(msg);
375  } else if (target == ControlTarget::ALL) {
377  ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
378  msg->type = tmp | topic_;
379  sendMessages(msg);
380  } else if (target == ControlTarget::SINGLE) {
381  msg->receiver = receiver;
382  sendDirect(msg);
383  } else {
384  assert(false && "ControlTarget unknown");
385  }
386  }
387 
388  void sendDeliverControlMessage(message::DeliverInfo::Ptr p, const typename NetworkType::Key & receiver, ControlTarget target) {
389 #ifdef WITH_SIM
390  SimulationEventType v;
391  v._simID = -1;
392  v._simChannel = channelID_;
393  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
394 #else
395  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
396 #endif /* WITH_SIM */
397 
398  msg->ctrlType_ = message::ControlType::DELIVER;
399  msg->deliverInfo = boost::static_pointer_cast<typename ChannelType::DeliverStrategy::DeliverInfoType>(p);
400 
401  sendControlMessage(msg, receiver, target);
402  }
403 
404  void sendOrderControlMessage(message::OrderInfo::Ptr p, const typename NetworkType::Key & receiver, ControlTarget target) {
405 #ifdef WITH_SIM
406  SimulationEventType v;
407  v._simID = -1;
408  v._simChannel = channelID_;
409  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
410 #else
411  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
412 #endif /* WITH_SIM */
413 
414  msg->ctrlType_ = message::ControlType::ORDER;
415  msg->orderInfo = boost::static_pointer_cast<typename ChannelType::OrderStrategy::OrderInfoType>(p);
416 
417  sendControlMessage(msg, receiver, target);
418  }
419 
420  void sendRoutingControlMessage(typename message::RoutingInfo<NetworkType>::Ptr p, const typename NetworkType::Key & receiver, ControlTarget target) {
421 #ifdef WITH_SIM
422  SimulationEventType v;
423  v._simID = -1;
424  v._simChannel = channelID_;
425  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
426 #else
427  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
428 #endif /* WITH_SIM */
429 
430  msg->ctrlType_ = message::ControlType::ROUTING;
431  msg->routingInfo = boost::static_pointer_cast<typename ChannelType::RoutingStrategy::RoutingInfoType>(p);
432 
433  sendControlMessage(msg, receiver, target);
434  }
435 
436  void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
437  deliver_f = &callback;
438 
439 #ifdef WITH_SIM
440  SimulationEventType v;
441  v._simID = -1;
442  v._simChannel = channelID_;
443  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
444 #else
445  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
446 #endif /* WITH_SIM */
447 
448  ChannelType::RoutingStrategy::selfSubscribed(true);
450  ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
451  msg->type = tmp | topic_;
452 
453  ChannelType::FilterStrategy::getSubscribePayload(predicate, subscribed_, msg->filterInfo);
454 
455  sendMessages(msg);
456  }
457 
458  bool subs(BasicDeliverCallbackInterface<EventType> * callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
459  if (subscribed_) {
460  subscribe_impl(*callback, predicate);
461  return true;
462  } else {
463  return false;
464  }
465  }
466 
467  void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate, Int2Type<true>) {
468  subscribe_impl(callback, predicate);
469  if (resubscribeID_ != UINT64_MAX) {
470  pssi_->scheduler_.stop(resubscribeID_);
471  }
472  resubscribeID_ = pssi_->scheduler_.runRepeated(ChannelType::RoutingStrategy::periodic_, boost::bind(&Tree::subs, this, &callback, predicate), 5);
473  subscribed_ = true;
474  }
475 
476  void subscribe_impl(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate, Int2Type<false>) {
477  subscribe_impl(callback, predicate);
478  subscribed_ = true;
479  }
480 
481  void unsubscribe_impl() {
482  subscribed_ = false;
483 
484 #ifdef WITH_SIM
485  SimulationEventType v;
486  v._simID = -1;
487  v._simChannel = channelID_;
488  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
489 #else
490  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
491 #endif /* WITH_SIM */
492 
493  deliver_f = nullptr;
494  ChannelType::RoutingStrategy::selfSubscribed(false);
496  ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
497  msg->type = tmp | topic_;
498  ChannelType::FilterStrategy::getUnsubscribePayload(msg->filterInfo);
499  sendMessages(msg);
500  }
501 
502  // for filter strategies: deregistering single filter:
503  void unsubscribe_impl(boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
504 #ifdef WITH_SIM
505  SimulationEventType v;
506  v._simID = -1;
507  v._simChannel = channelID_;
508  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>(v));
509 #else
510  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(factory_.template createMessage<EventType>());
511 #endif /* WITH_SIM */
512 
513  typename ChannelType::FilterStrategy::FilterUnsubscribeInformation unsubscribeAction = ChannelType::FilterStrategy::getUnsubscribePayload(msg->filterInfo, predicate);
514 
515  if (unsubscribeAction == ChannelType::FilterStrategy::CANCELUNSUBSCRIBE) {
516  // a subscriber of this node has registered the same filter,
517  // so do not forward the filter
518  return;
519  }
520 
521  if (unsubscribeAction == ChannelType::FilterStrategy::UNSUBSCRIBEFROMTREE) {
522  // the last filter has been deregistered => unsubscribe from the tree
523  unsubscribe_impl();
524  }
525 
526  if (unsubscribeAction == ChannelType::FilterStrategy::FORWARDUNSUBSCRIBE) {
527  // => forward unsubscribe, but dont unsubscribe from tree
528  // prevent routing strategies from unsubscribing from the tree by
529  // turning unsubscription of single filter into subscription
531  ChannelType::RoutingStrategy::configureRoutingInfo(tmp, msg->routingInfo, msg->receiver);
532  msg->type = tmp | topic_;
533  sendMessages(msg);
534  }
535  }
536 
537  void unsubscribe_impl(Int2Type<true>) {
538  subscribed_ = false;
539  unsubscribe_impl();
540  }
541 
542  void unsubscribe_impl(Int2Type<false>) {
543  subscribed_ = false;
544  unsubscribe_impl();
545  }
546 
547  // Callback Methods from Network
548  FIPtr forward(typename message::NetworkMessage<NetworkType>::Ptr msg) {
549  typename IMessage::Ptr toforward = boost::static_pointer_cast<IMessage>(msg);
550 
552 
553  if ((toforward->type & message::MSG_TYPE_MASK) != topic_) {
554  M2ETIS_THROW_FAILURE("Tree", "Tree got message for forwarding with incorrect channel topic", 1);
555  }
556  // Security::decrypt(message);
557  bool stop = false;
558  bool changes = false;
559  switch(tmp) {
560  case message::SUBSCRIBE: {
561  // Give Algo and Filter the oportunity to update their internals.
562  // Perhaps the Filter wants to merge the predicates?!
563  if (ChannelType::RoutingStrategy::processSubscribePayload(toforward->routingInfo, toforward->sender, toforward->receiver, tmp)) {
564  std::string filterinfo = ChannelType::FilterStrategy::processSubscribePayload(toforward->sender, toforward->filterInfo);
565  // FIXME what if Filter created a generalized version of the filter? How is this communicated to other nodes up in the tree?
566  }
567 
568  if (toforward->routingInfo->action != message::RoutingInfo<NetworkType>::RoutingType::STOP) {
569  changes = true;
570  msg->sender = self_;
571  } else {
572  stop = true;
573  }
574  break;
575  }
576  case message::UNSUBSCRIBE: {
577  ChannelType::FilterStrategy::processUnsubscribePayload(toforward->sender, toforward->filterInfo);
578  ChannelType::RoutingStrategy::processUnsubscribePayload(toforward->routingInfo, toforward->sender, toforward->receiver, tmp);
579  break;
580  }
581  default: {
582  M2ETIS_THROW_FAILURE("Tree", "Tree got message for forwarding with unknown action type", 1);
583  break;
584  }
585  }
586 
587  toforward->type = tmp | topic_;
588 
589  return boost::make_shared<ForwardInfo>(stop, changes);
590  }
591  };
592 
593 } /* namespace pubsub */
594 } /* namespace m2etis */
595 
596 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
597  #pragma warning(pop)
598 #endif
599 
600 #endif /* __M2ETIS_PUBSUB_TREE_H__ */
601 
boost::shared_ptr< OrderInfo > Ptr
Definition: OrderInfo.h:31
boost::shared_ptr< M2Message< EventType > > Ptr
Definition: M2Message.h:42
virtual void deliverCallback(const typename m2etis::message::M2Message< EventType >::Ptr m)=0
deliver a callback
void register_deliver(message::MessageType nr, net_deliver_func f)
void unsubscribe()
Definition: Tree.h:127
NetworkType::Key getSelf() const
Definition: Tree.h:98
boost::shared_ptr< const ForwardInfo > FIPtr
Definition: ForwardInfo.h:34
boost::shared_ptr< DeliverInfo > Ptr
static const uint32_t MSG_TYPE_MASK
Definition: MessageType.h:31
uint16_t getTopic() const
Definition: Tree.h:110
Tree(uint16_t tree_name, const typename NetworkType::Key &self, const typename NetworkType::Key &rendezvous, const typename NetworkType::Key &root, PubSubSystemEnvironment *pssi, int cI)
Constructor.
Definition: Tree.h:72
void deregister_deliver(message::MessageType nr)
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
Definition: Exceptions.h:33
unsigned short getChannel() const
Definition: Tree.h:106
NetworkType::Key getSelf() const
void deliver(uint64_t id, msgProcess proc)
Definition: MessageBuffer.h:42
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
virtual ~Tree()
Definition: Tree.h:89
void register_forward(message::MessageType nr, net_forward_func f)
Scheduler< util::RealTimeClock > scheduler_
PersistenceT PersistenceStrategy
Definition: ChannelType.h:45
void deregister_forward(message::MessageType nr)
void registerMessageType(message::MessageType type, const bool ack=true)
boost::shared_ptr< NetworkMessage > Ptr
uint64_t insert(const boost::function< void(void)> &func)
Definition: MessageBuffer.h:37
void stop(uint64_t id)
Definition: Scheduler.h:110
void subscribe(BasicDeliverCallbackInterface< EventType > &callback, boost::shared_ptr< filter::FilterExp< EventType > > predicate)
Definition: Tree.h:114
void publish(const typename message::M2Message< EventType >::Ptr msg)
Definition: Tree.h:118
Message Factory to create messages.
boost::shared_ptr< InternalMessage > Ptr
NetworkType::Key getRoot() const
Definition: Tree.h:102
void unsubscribe(boost::shared_ptr< filter::FilterExp< EventType > > predicate)
Definition: Tree.h:140
static const uint32_t ACTION_TYPE_MASK
Definition: MessageType.h:32