22 #ifndef __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__
34 #include "boost/thread.hpp"
40 template<
class NetworkType>
44 typedef std::vector<typename NetworkType::Key>
KeyList;
45 typedef std::pair<uint64_t, typename NetworkType::Key>
TimePair;
83 , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
84 , purging_(true), _root(root) {
85 purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&ScribeRouting::purgeList,
this), 0);
93 void setSelf(
const typename NetworkType::Key &
self) {
98 removed_subscriber_eventlistener_ = listener;
127 M2ETIS_LOG_ERROR(
"Scribe Routing",
"configureRoutingInfo called with wrong action type");
143 if (self_ == _root) {
162 m.resize(subscriber_.size());
163 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
169 if (sender == self_) {
174 typename TimeList::iterator iter;
175 for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
176 if (iter->second == sender) {
186 subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
198 static bool test(
const typename NetworkType::Key & send,
const TimePair & paar) {
199 return paar.second == send;
203 subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
225 const uint16_t topic_name_;
227 typename NetworkType::Key self_;
232 TimeList subscriber_;
235 boost::thread purger_;
236 volatile bool purging_;
238 typename NetworkType::Key _root;
255 if ((jetzt - paar.first) > p) {
262 if (subscriber_.empty()) {
271 auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
273 for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
277 subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
message::ScribeRoutingInfo< NetworkType > RoutingInfoType
static const bool register_forward_unsubscribe
static const bool register_forward_subscribe
boost::shared_ptr< ScribeRoutingInfo< NetworkType > > Ptr
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
std::pair< uint64_t, typename NetworkType::Key > TimePair
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
#define M2ETIS_LOG_INFO(module, message)
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
bool selfSubscribed() const
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
ScribeRouting(const uint16_t topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
std::vector< typename NetworkType::Key > KeyList
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
static const bool register_deliver_subscribe
void setSelf(const typename NetworkType::Key &self)
static const bool periodicSubscribtion
const uint64_t purge_distance_
static const bool register_deliver_unsubscribe
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
static const uint64_t PURGE_DISTANCE
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
PubSubSystemEnvironment * _pssi
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
std::vector< TimePair > TimeList
static const uint64_t RESUBSCRIPTION_INTERVAL
void selfSubscribed(const bool b)