22 #ifndef __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__
42 template<
class NetworkType>
46 typedef std::vector<typename NetworkType::Key>
KeyList;
47 typedef std::set<typename NetworkType::Key>
KeySet;
48 typedef std::pair<uint64_t, typename NetworkType::Key>
TimePair;
86 DirectBroadcastRouting(
const unsigned short topic_name,
PubSubSystemEnvironment * pssi,
const typename NetworkType::Key & root) :
BaseRouting<NetworkType>(topic_name, pssi), periodic_(directbroadcast::
RESUBSCRIPTION_INTERVAL), purge_distance_(directbroadcast::
PURGE_DISTANCE), _pssi(pssi), _removed_subscribereventlistener(), purgeID_(), registerID_(), topic_name_(topic_name), self_(), selfSubscribed_(false), _subscriber(), _purging(true), _newSubs(), _nodes(), _root(root) {
87 purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&DirectBroadcastRouting::purgeList,
this), 6);
101 if (self_ != _root) {
102 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
104 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX
108 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
110 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX
114 _nodes.insert(self_);
118 void setSelf(
const typename NetworkType::Key &
self) {
120 if (self_ == _root) {
121 _nodes.insert(self_);
126 _removed_subscribereventlistener = listener;
136 return selfSubscribed_;
163 M2ETIS_LOG_ERROR(
"DirectBroadcastRouting",
"configureRoutingInfo called with wrong action type");
193 if (!_nodes.empty()) {
194 m.resize(_nodes.size());
195 std::copy(_nodes.begin(), _nodes.end(), m.begin());
198 m.erase(std::remove_if(m.begin(), m.end(), [
this](
const typename NetworkType::Key & other) {
199 return self_ == other;
202 m.resize(_nodes.size());
203 std::copy(_nodes.begin(), _nodes.end(), m.begin());
205 m.erase(std::remove_if(m.begin(), m.end(), [
this](
const typename NetworkType::Key & other) {
206 return self_ == other;
211 if (self_ == _root) {
212 for (
auto n : _nodes) {
223 static typename NetworkType::Key get_from_pair(
const TimePair & p) {
228 m.resize(_subscriber.size());
229 std::transform(_subscriber.begin(), _subscriber.end(), m.begin(), T::get_from_pair);
232 if (
selfSubscribed() && std::find(m.begin(), m.end(), self_) == m.end()) {
252 typename TimeList::iterator iter;
253 for (iter = _subscriber.begin(); iter != _subscriber.end(); ++iter) {
254 if (iter->second == sender) {
264 _subscriber.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
280 static bool test(
const typename NetworkType::Key & send,
const TimePair & paar) {
281 return paar.second == send;
285 _subscriber.erase(std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, sender, _1)), _subscriber.end());
318 if (self_ == _root) {
320 if (_nodes.insert(sender).second) {
321 if (selfSubscribed_) {
324 _newSubs.push_back(sender);
330 newInfo->_nodes = _nodes;
334 _nodes = rInfo->_nodes;
335 if (selfSubscribed_) {
338 for (
auto n : _nodes) {
340 for (TimePair p : _subscriber) {
347 _newSubs.push_back(n);
350 rInfo->_nodes.clear();
364 const unsigned short topic_name_;
366 typename NetworkType::Key self_;
368 bool selfSubscribed_;
371 TimeList _subscriber;
374 volatile bool _purging;
375 std::vector<typename NetworkType::Key> _newSubs;
378 typename NetworkType::Key _root;
383 M2ETIS_LOG_ERROR(
"DirectBroadcastRouting",
"Downcast error of routingInfo");
391 static bool test(
const uint64_t & p,
const uint64_t & jetzt,
const TimePair & paar) {
392 if ((jetzt - paar.first) > p) {
399 if (_subscriber.empty()) {
408 auto iter_first_erased_subscriber = std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
410 for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != _subscriber.end(); ++iter_subscriber) {
414 _subscriber.erase(iter_first_erased_subscriber, _subscriber.end());
boost::function< void(const typename NetworkType::Key)> _removed_subscribereventlistener
bool selfSubscribed() const
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &func, int16_t priority)
adds new job running only once
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &) const
virtual ~DirectBroadcastRouting()
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
std::pair< uint64_t, typename NetworkType::Key > TimePair
creates connections between every single node and sends messages directly, so every node is a root no...
boost::function< void(typename message::RoutingInfo< NetworkType >::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
DirectBroadcastRouting(const unsigned short topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
static const bool register_deliver_subscribe
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &) override
void setSelf(const typename NetworkType::Key &self)
static const bool register_forward_subscribe
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
std::set< typename NetworkType::Key > KeySet
static const bool register_deliver_unsubscribe
Scheduler< util::RealTimeClock > scheduler_
const uint64_t purge_distance_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &)
std::vector< TimePair > TimeList
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &msgType)
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
void selfSubscribed(const bool b)
static const uint64_t PURGE_DISTANCE
static const bool register_forward_unsubscribe
std::vector< typename NetworkType::Key > KeyList
message::DirectBroadcastRoutingInfo< NetworkType > RoutingInfoType
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
static const bool periodicSubscribtion
boost::shared_ptr< DirectBroadcastRoutingInfo< NetworkType > > Ptr
PubSubSystemEnvironment * _pssi
static const uint64_t RESUBSCRIPTION_INTERVAL