22 #ifndef __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__ 
   23 #define __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__ 
   34 #include "boost/thread.hpp" 
   40         template<
class NetworkType>
 
   44                 typedef std::vector<typename NetworkType::Key> 
KeyList;
 
   45                 typedef std::pair<uint64_t, typename NetworkType::Key> 
TimePair;
 
   83                         , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
 
   84                         , purging_(true), _root(root) {
 
   85                         purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&ScribeRouting::purgeList, 
this), 0);
 
   93                 void setSelf(
const typename NetworkType::Key & 
self) {
 
   98             removed_subscriber_eventlistener_ = listener;
 
  127                                         M2ETIS_LOG_ERROR(
"Scribe Routing", 
"configureRoutingInfo called with wrong action type");
 
  143                                 if (self_ == _root) {
 
  162                                 m.resize(subscriber_.size());
 
  163                                 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
 
  169                         if (sender == self_) { 
 
  174                         typename TimeList::iterator iter;
 
  175                         for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
 
  176                                 if (iter->second == sender) {
 
  186                                 subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
 
  198                                 static bool test(
const typename NetworkType::Key & send, 
const TimePair & paar) {
 
  199                                         return paar.second == send;
 
  203                         subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
 
  225                 const uint16_t topic_name_;
 
  227                 typename NetworkType::Key self_;
 
  232                 TimeList subscriber_;
 
  235                 boost::thread purger_;
 
  236                 volatile bool purging_;
 
  238                 typename NetworkType::Key _root;
 
  255                                         if ((jetzt - paar.first) > p) {
 
  262                         if (subscriber_.empty()) {
 
  271                         auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
 
  273                         for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
 
  277                         subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
 
message::ScribeRoutingInfo< NetworkType > RoutingInfoType
static const bool register_forward_unsubscribe
static const bool register_forward_subscribe
boost::shared_ptr< ScribeRoutingInfo< NetworkType > > Ptr
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
std::pair< uint64_t, typename NetworkType::Key > TimePair
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
#define M2ETIS_LOG_INFO(module, message)
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
bool selfSubscribed() const 
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly 
ScribeRouting(const uint16_t topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
std::vector< typename NetworkType::Key > KeyList
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const 
Will return the time since the Clock on the rendezvouz node has started. 
static const bool register_deliver_subscribe
void setSelf(const typename NetworkType::Key &self)
static const bool periodicSubscribtion
const uint64_t purge_distance_
static const bool register_deliver_unsubscribe
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
static const uint64_t PURGE_DISTANCE
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const 
PubSubSystemEnvironment * _pssi
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
std::vector< TimePair > TimeList
static const uint64_t RESUBSCRIPTION_INTERVAL
void selfSubscribed(const bool b)