22 #ifndef __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__ 
   23 #define __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__ 
   42         template<
class NetworkType>
 
   46                 typedef std::vector<typename NetworkType::Key> 
KeyList;
 
   47                 typedef std::set<typename NetworkType::Key> 
KeySet;
 
   48                 typedef std::pair<uint64_t, typename NetworkType::Key> 
TimePair;
 
   86                 DirectBroadcastRouting(
const unsigned short topic_name, 
PubSubSystemEnvironment * pssi, 
const typename NetworkType::Key & root) : 
BaseRouting<NetworkType>(topic_name, pssi), periodic_(directbroadcast::
RESUBSCRIPTION_INTERVAL), purge_distance_(directbroadcast::
PURGE_DISTANCE), _pssi(pssi), _removed_subscribereventlistener(), purgeID_(), registerID_(), topic_name_(topic_name), self_(), selfSubscribed_(false), _subscriber(), _purging(true), _newSubs(), _nodes(), _root(root) {
 
   87                         purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&DirectBroadcastRouting::purgeList, 
this), 6);
 
  101                         if (self_ != _root) {
 
  102 #if I6E_PLATFORM == I6E_PLATFORM_WIN32 
  104 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX 
  108 #if I6E_PLATFORM == I6E_PLATFORM_WIN32 
  110 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX 
  114                                 _nodes.insert(self_);
 
  118                 void setSelf(
const typename NetworkType::Key & 
self) {
 
  120                         if (self_ == _root) {
 
  121                                 _nodes.insert(self_);
 
  126                         _removed_subscribereventlistener = listener;
 
  136                         return selfSubscribed_;
 
  163                                         M2ETIS_LOG_ERROR(
"DirectBroadcastRouting", 
"configureRoutingInfo called with wrong action type");
 
  193                                 if (!_nodes.empty()) {
 
  194                                         m.resize(_nodes.size());
 
  195                                         std::copy(_nodes.begin(), _nodes.end(), m.begin());
 
  198                                 m.erase(std::remove_if(m.begin(), m.end(), [
this](
const typename NetworkType::Key & other) {
 
  199                                         return self_ == other;
 
  202                                 m.resize(_nodes.size());
 
  203                                 std::copy(_nodes.begin(), _nodes.end(), m.begin());
 
  205                                 m.erase(std::remove_if(m.begin(), m.end(), [
this](
const typename NetworkType::Key & other) {
 
  206                                         return self_ == other;
 
  211                                 if (self_ == _root) {
 
  212                                         for (
auto n : _nodes) {
 
  223                                         static typename NetworkType::Key get_from_pair(
const TimePair & p) {
 
  228                                 m.resize(_subscriber.size());
 
  229                                 std::transform(_subscriber.begin(), _subscriber.end(), m.begin(), T::get_from_pair);
 
  232                                 if (
selfSubscribed() && std::find(m.begin(), m.end(), self_) == m.end()) {
 
  252                         typename TimeList::iterator iter;
 
  253                         for (iter = _subscriber.begin(); iter != _subscriber.end(); ++iter) {
 
  254                                 if (iter->second == sender) {
 
  264                                 _subscriber.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
 
  280                                 static bool test(
const typename NetworkType::Key & send, 
const TimePair & paar) {
 
  281                                         return paar.second == send;
 
  285                         _subscriber.erase(std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, sender, _1)), _subscriber.end());
 
  318                                 if (self_ == _root) {
 
  320                                         if (_nodes.insert(sender).second) {
 
  321                                                 if (selfSubscribed_) {
 
  324                                                         _newSubs.push_back(sender);
 
  330                                                 newInfo->_nodes = _nodes;
 
  334                                         _nodes = rInfo->_nodes;
 
  335                                         if (selfSubscribed_) {
 
  338                                                 for (
auto n : _nodes) {
 
  340                                                         for (TimePair p : _subscriber) {
 
  347                                                                 _newSubs.push_back(n);
 
  350                                                 rInfo->_nodes.clear();
 
  364                 const unsigned short topic_name_;
 
  366                 typename NetworkType::Key self_;
 
  368                 bool selfSubscribed_;
 
  371                 TimeList _subscriber;
 
  374                 volatile bool _purging;
 
  375                 std::vector<typename NetworkType::Key> _newSubs;
 
  378                 typename NetworkType::Key _root;
 
  383                                 M2ETIS_LOG_ERROR(
"DirectBroadcastRouting", 
"Downcast error of routingInfo");
 
  391                                 static bool test(
const uint64_t & p, 
const uint64_t & jetzt, 
const TimePair & paar) {
 
  392                                         if ((jetzt - paar.first) > p) {
 
  399                         if (_subscriber.empty()) {
 
  408                         auto iter_first_erased_subscriber = std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
 
  410                         for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != _subscriber.end(); ++iter_subscriber) {
 
  414                         _subscriber.erase(iter_first_erased_subscriber, _subscriber.end());
 
boost::function< void(const typename NetworkType::Key)> _removed_subscribereventlistener
bool selfSubscribed() const 
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &func, int16_t priority)
adds new job running only once 
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &) const 
virtual ~DirectBroadcastRouting()
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
std::pair< uint64_t, typename NetworkType::Key > TimePair
creates connections between every single node and sends messages directly, so every node is a root no...
boost::function< void(typename message::RoutingInfo< NetworkType >::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
DirectBroadcastRouting(const unsigned short topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
static const bool register_deliver_subscribe
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &) override
void setSelf(const typename NetworkType::Key &self)
static const bool register_forward_subscribe
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly 
std::set< typename NetworkType::Key > KeySet
static const bool register_deliver_unsubscribe
Scheduler< util::RealTimeClock > scheduler_
const uint64_t purge_distance_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
uint64_t getTime() const 
Will return the time since the Clock on the rendezvouz node has started. 
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &)
std::vector< TimePair > TimeList
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &msgType)
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
void selfSubscribed(const bool b)
static const uint64_t PURGE_DISTANCE
static const bool register_forward_unsubscribe
std::vector< typename NetworkType::Key > KeyList
message::DirectBroadcastRoutingInfo< NetworkType > RoutingInfoType
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
static const bool periodicSubscribtion
boost::shared_ptr< DirectBroadcastRoutingInfo< NetworkType > > Ptr
PubSubSystemEnvironment * _pssi
static const uint64_t RESUBSCRIPTION_INTERVAL