22 #ifndef __M2ETIS_PUBSUB_ROUTING_SPREADITROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_SPREADITROUTING_H__
35 #include "boost/date_time/posix_time/posix_time_types.hpp"
41 template<
class NetworkType>
45 typedef std::pair<uint64_t, typename NetworkType::Key>
TimePair;
46 typedef std::vector<typename NetworkType::Key>
KeyList;
85 , removed_subscriber_eventlistener_()
86 , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
87 , purging_(true), _root(root) {
88 purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&SpreaditRouting::purgeList,
this), 0);
96 void setSelf(
const typename NetworkType::Key &
self) {
101 removed_subscriber_eventlistener_ = listener;
123 if (!parent_vector.empty()) {
124 rInfo->node_adress = parent_vector.at(0).second;
141 M2ETIS_THROW_FAILURE(
"Spreadit Routing",
"configureRoutingInfo called with wrong action type", -1);
157 m.push_back(rInfo->node_adress);
158 }
else if (!parent_vector.empty()) {
159 m.push_back(parent_vector.begin()->second);
161 if (self_ == _root) {
169 if (receiver ==
typename NetworkType::Key()) {
176 m.resize(subscriber_.size());
177 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
179 m.push_back(receiver);
197 m.resize(subscriber_.size());
198 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
206 static typename NetworkType::Key get_from_pair(
const TimePair & p) {
211 m.resize(subscriber_.size());
212 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
214 if (!parent_vector.empty()) {
215 m.push_back(parent_vector.at(0).second);
227 bool is_subscribe_successful =
true;
229 typename TimeList::iterator iter;
231 for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
232 if (iter->second == sender) {
244 if (bandwidth_capacity > subscriber_.size()) {
246 subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
252 rInfo->node_adress =
typename NetworkType::Key(subscriber_.at(round_robin_pointer).second);
253 round_robin_pointer = (round_robin_pointer + 1) % subscriber_.size();
255 is_subscribe_successful =
false;
260 return is_subscribe_successful;
267 static bool test(
const typename NetworkType::Key & send,
const TimePair & paar) {
268 return paar.second == send;
272 subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
275 if (!parent_vector.empty()) {
276 if (sender == parent_vector.at(0).second) {
278 receiver = rInfo->node_adress;
290 if (subscriber_.empty()) {
299 if (parent_vector.empty()) {
300 parent_vector.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
302 parent_vector.begin()->second = sender;
308 receiver = rInfo->node_adress;
317 const unsigned int topic_name_;
319 typename NetworkType::Key self_;
324 unsigned int round_robin_pointer = 0;
327 TimeList subscriber_;
329 TimeList parent_vector;
332 volatile bool purging_;
334 typename NetworkType::Key _root;
349 static bool test(
const uint64_t & p,
const uint64_t & jetzt,
const TimePair & paar) {
350 if ((jetzt - paar.first) > p) {
357 if (subscriber_.empty() && parent_vector.empty()) {
367 auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
369 for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
373 subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
static const bool register_deliver_subscribe
boost::shared_ptr< SpreadItRoutingInfo< NetworkType > > Ptr
static const bool periodicSubscribtion
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
bool selfSubscribed() const
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType) override
message::SpreadItRoutingInfo< NetworkType > RoutingInfoType
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
#define M2ETIS_LOG_DEBUG(module, message)
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
PubSubSystemEnvironment * _pssi
void setSelf(const typename NetworkType::Key &self)
static const uint32_t ALLOWED_CHILDS
static const bool register_deliver_unsubscribe
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
static const bool register_forward_unsubscribe
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
static const bool register_forward_subscribe
std::pair< uint64_t, typename NetworkType::Key > TimePair
const uint64_t purge_distance_
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
virtual ~SpreaditRouting()
std::vector< TimePair > TimeList
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &msgType)
SpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
void selfSubscribed(const bool b)
static const uint64_t RESUBSCRIPTION_INTERVAL