22 #ifndef __M2ETIS_PUBSUB_ROUTING_DIRECTROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_DIRECTROUTING_H__
34 #include "boost/date_time/posix_time/posix_time_types.hpp"
40 template<
class NetworkType>
44 typedef std::vector<typename NetworkType::Key>
KeyList;
45 typedef std::pair<uint64_t, typename NetworkType::Key>
TimePair;
81 DirectRouting(
const unsigned short topic_name,
PubSubSystemEnvironment * pssi,
const typename NetworkType::Key & root) :
BaseRouting<NetworkType>(topic_name, pssi), periodic_(direct::
RESUBSCRIPTION_INTERVAL), purge_distance_(direct::
PURGE_DISTANCE), _pssi(pssi), removed_subscriber_eventlistener_(), topic_name_(topic_name), self_(), selfsubscribed_(false), subscriber_(), purging_(true), _root(root) {
82 purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&DirectRouting::purgeList,
this), 6);
90 void setSelf(
const typename NetworkType::Key &
self) {
95 removed_subscriber_eventlistener_ = listener;
105 return selfsubscribed_;
132 M2ETIS_LOG_ERROR(
"Direct Routing",
"configureRoutingInfo called with wrong action type");
153 if (self_ == _root) {
161 static typename NetworkType::Key get_from_pair(
const TimePair & p) {
166 m.resize(subscriber_.size());
167 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
183 if (sender != self_) {
185 typename TimeList::iterator iter;
186 for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
187 if (iter->second == sender) {
197 subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
213 static bool test(
const typename NetworkType::Key & send,
const TimePair & paar) {
214 return paar.second == send;
218 subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
232 assert(self_ == _root);
233 if (self_ == _root) {
234 if (!subscriber_.empty()) {
263 const unsigned short topic_name_;
265 typename NetworkType::Key self_;
267 bool selfsubscribed_;
270 TimeList subscriber_;
273 mutable boost::mutex subscriber_mutex_;
274 volatile bool purging_;
276 typename NetworkType::Key _root;
289 static bool test(
const uint64_t & p,
const uint64_t & jetzt,
const TimePair & paar) {
290 if ((jetzt - paar.first) > p) {
297 if (subscriber_.empty()) {
306 auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
308 for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
312 subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
PubSubSystemEnvironment * _pssi
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &)
bool selfSubscribed() const
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &) override
static const bool register_deliver_unsubscribe
static const bool register_forward_subscribe
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
message::DirectRoutingInfo< NetworkType > RoutingInfoType
std::pair< uint64_t, typename NetworkType::Key > TimePair
const uint64_t purge_distance_
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
DirectRouting(const unsigned short topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
boost::shared_ptr< DirectRoutingInfo< NetworkType > > Ptr
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
std::vector< TimePair > TimeList
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
static const bool periodicSubscribtion
void setSelf(const typename NetworkType::Key &self)
static const bool register_deliver_subscribe
void selfSubscribed(const bool b)
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &) const
static const bool register_forward_unsubscribe
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &msgType)
static const uint64_t RESUBSCRIPTION_INTERVAL