22 #ifndef __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__
35 #include "boost/date_time/posix_time/posix_time_types.hpp"
41 template<
class NetworkType,
unsigned int ChildAmount>
45 typedef std::pair<long, typename NetworkType::Key>
TimePair;
46 typedef std::vector<typename NetworkType::Key>
KeyList;
85 , removed_subscriber_eventlistener_()
86 , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
87 , purging_(true), _root(root) {
88 purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&HierarchicalSpreaditRouting::purgeList,
this), 0);
96 void setSelf(
const typename NetworkType::Key &
self) {
101 removed_subscriber_eventlistener_ = listener;
123 if (!parent_vector.empty()) {
124 rInfo->node_adress = parent_vector.at(0).second;
130 rInfo->selfSend =
true;
131 rInfo->sender = self_;
136 rInfo->sender = self_;
144 M2ETIS_THROW_FAILURE(
"HierarchicalSpreaditRouting",
"configureRoutingInfo called with wrong action type", -1);
160 m.push_back(rInfo->node_adress);
161 }
else if (!parent_vector.empty()) {
162 m.push_back(parent_vector.begin()->second);
164 if (self_ == _root) {
172 if (receiver ==
typename NetworkType::Key()) {
179 m.resize(subscriber_.size());
180 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
182 m.push_back(receiver);
189 if (!parent_vector.empty()) {
190 if (rInfo->sender != parent_vector.begin()->second) {
191 m.push_back(parent_vector.begin()->second);
194 if (self_ != _root) {
199 for (TimePair p : subscriber_) {
200 if (p.second != rInfo->sender) {
201 m.push_back(p.second);
204 if (rInfo->selfSend) {
207 rInfo->sender = self_;
215 static typename NetworkType::Key get_from_pair(
const TimePair & p) {
220 m.resize(subscriber_.size());
221 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
223 if (!parent_vector.empty()) {
224 m.push_back(parent_vector.at(0).second);
236 bool is_subscribe_successful =
true;
238 typename TimeList::iterator iter;
240 for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
241 if (iter->second == sender) {
253 if (bandwidth_capacity > subscriber_.size()) {
255 subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
261 rInfo->node_adress =
typename NetworkType::Key(subscriber_.at(round_robin_pointer).second);
262 round_robin_pointer = (round_robin_pointer + 1) % subscriber_.size();
264 is_subscribe_successful =
false;
269 return is_subscribe_successful;
276 static bool test(
const typename NetworkType::Key & send,
const TimePair & paar) {
277 return paar.second == send;
281 subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
284 if (!parent_vector.empty()) {
285 if (sender == parent_vector.at(0).second) {
287 receiver = rInfo->node_adress;
299 if ((subscriber_.empty() && (parent_vector.empty() && self_ == _root)) || sender == self_) {
308 if (parent_vector.empty()) {
309 parent_vector.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
311 parent_vector.begin()->second = sender;
317 receiver = rInfo->node_adress;
326 const unsigned int topic_name_;
328 typename NetworkType::Key self_;
332 unsigned int bandwidth_capacity = ChildAmount;
333 unsigned int round_robin_pointer = 0;
336 TimeList subscriber_;
338 TimeList parent_vector;
341 volatile bool purging_;
343 typename NetworkType::Key _root;
358 static bool test(
const long & p,
const long & jetzt,
const TimePair & paar) {
359 if ((jetzt - paar.first) > p) {
366 if (subscriber_.empty() && parent_vector.empty()) {
376 auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
378 for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
382 subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
const long purge_distance_
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType) override
std::vector< TimePair > TimeList
static const bool register_deliver_subscribe
static const bool register_deliver_unsubscribe
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
bool selfSubscribed() const
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
#define M2ETIS_LOG_DEBUG(module, message)
HierarchicalSpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
PubSubSystemEnvironment * _pssi
void setSelf(const typename NetworkType::Key &self)
static const bool register_forward_unsubscribe
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
virtual ~HierarchicalSpreaditRouting()
std::pair< long, typename NetworkType::Key > TimePair
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
static const bool periodicSubscribtion
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
static const bool register_forward_subscribe
void selfSubscribed(const bool b)
boost::shared_ptr< HierarchicalSpreadItRoutingInfo< NetworkType > > Ptr
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
message::HierarchicalSpreadItRoutingInfo< NetworkType > RoutingInfoType
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
static const uint64_t RESUBSCRIPTION_INTERVAL