22 #ifndef __M2ETIS_PUBSUB_CHANNEL_H__
23 #define __M2ETIS_PUBSUB_CHANNEL_H__
44 #include "boost/bind.hpp"
45 #include "boost/date_time/posix_time/posix_time_types.hpp"
46 #include "boost/foreach.hpp"
47 #include "boost/make_shared.hpp"
48 #include "boost/shared_ptr.hpp"
49 #include "boost/thread/locks.hpp"
51 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
53 #pragma warning(disable : 4127)
65 template<
class ChannelType,
class NetworkType,
class EventType>
70 typename NetworkType::Key _self;
71 typename NetworkType::Key _rendezvous;
73 const ChannelName topic_;
75 std::vector<TreeType *> trees_;
81 std::map<typename NetworkType::Key, uint64_t> _nodeList;
84 std::pair<BasicDeliverCallbackInterface<EventType> *, boost::shared_ptr<filter::FilterExp<EventType>>> _lastSubscribe;
86 uint64_t dynamicPeriodID_;
92 Channel(
const ChannelName topic_name,
const std::string & ip, uint16_t port,
const std::string & rendezvousIP, uint16_t known_hostport,
PubSubSystemEnvironment * pssi,
const std::vector<std::string> & rootList) :
ChannelType::PartitionStrategy(),
ChannelType::RendezvousStrategy(rootList), _self(ip +
std::string(
":") +
std::to_string(port)), _rendezvous(rendezvousIP +
std::string(
":") +
std::to_string(known_hostport)), factory_(message::MessageFactory<
ChannelType, NetworkType>()), topic_(topic_name), controller_(pssi->_factory->createNetworkController(NetworkType())), msgQueue_(), subscribeQueue_(), unsubscribeQueue_(), unsubscribe_(false), pssi_(pssi), _nodeList({std::make_pair(_self, 0)}), joined(
false), _hasSubscribe(
false), _lastSubscribe(), dynamicPeriodID_(UINT64_MAX) {
93 if (!ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
94 for (
auto name : ChannelType::PartitionStrategy::getTreeNames()) {
95 trees_.push_back(pssi->_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous,
typename NetworkType::Key(ChannelType::RendezvousStrategy::getRoot() +
":" + std::to_string(known_hostport)), pssi, topic_));
99 ChannelType::PartitionStrategy::createRendezvousPartition(_rendezvous);
101 for (
auto name : ChannelType::PartitionStrategy::getTreeNames()) {
102 trees_.push_back(pssi->_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous, _rendezvous, pssi, topic_));
114 if (_self != _rendezvous) {
115 pssi->scheduler_.runOnce(1000, boost::bind(&Channel::startJoin,
this), 1);
117 dynamicPeriodID_ = pssi->scheduler_.runRepeated(6000000, boost::bind(&Channel::purgeNodes,
this), 1);
124 if (dynamicPeriodID_ != UINT64_MAX) {
128 for (TreeType * t : trees_) {
133 if (_self != _rendezvous) {
135 SimulationEventType v;
137 v._simChannel = topic_;
144 msg->receiver = _rendezvous;
146 controller_->
send(msg);
153 return trees_[0]->getSelf().toStr();
161 subscribeQueue_.
push(std::make_pair(&callback, predicate));
166 unsubscribeQueue_.
push(predicate);
178 return factory_.template createMessage<EventType>();
187 return factory_.template transformToNetworkMessage<EventType>(msg);
191 return factory_.template transformToM2Message<EventType>(msg);
196 while (!subscribeQueue_.
empty()) {
197 _lastSubscribe = subscribeQueue_.
poll();
198 _hasSubscribe =
true;
200 for (
auto tree_index : ChannelType::PartitionStrategy::getSubscribeTrees(_lastSubscribe.second)) {
201 trees_[tree_index]->subscribe(*_lastSubscribe.first, _lastSubscribe.second);
206 for (
auto tree : trees_) {
210 unsubscribe_ =
false;
214 while (!unsubscribeQueue_.
empty()) {
215 boost::shared_ptr<filter::FilterExp<EventType> > deregisteredFilter = unsubscribeQueue_.
poll();
217 for (
auto tree_index : ChannelType::PartitionStrategy::getSubscribeTrees(deregisteredFilter)) {
218 trees_[tree_index]->unsubscribe(deregisteredFilter);
222 uint32_t msgPublished = 0;
225 std::vector<int>::size_type i = ChannelType::PartitionStrategy::getPublishTree(msg->
payload, _self);
226 if (i < trees_.size()) {
228 trees_[ChannelType::PartitionStrategy::getPublishTree(msg->
payload, _self)]->publish(msg);
241 typename IMessage::Ptr msg2 = boost::static_pointer_cast<IMessage>(msg);
246 if (_self == _rendezvous) {
247 if (_nodeList.find(msg2->sender) == _nodeList.end()) {
250 if (ChannelType::PartitionStrategy::createPartition(msg2->sender)) {
251 trees_.push_back(pssi_->
_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous, msg2->sender, pssi_, topic_));
256 unsubscribePartitions();
261 for (std::pair<typename NetworkType::Key, uint64_t> node : _nodeList) {
262 msg2->_nodeList.push_back(node.first);
265 msg2->sender = _self;
269 if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
270 for (
size_t i = 0; i < trees_.size(); ++i) {
271 msg2->_trees.push_back(
typename IMessage::TreeHelper(trees_[i]->getTopic(), ChannelType::PartitionStrategy::getPredicate(i), trees_[i]->getRoot()));
275 for (
typename NetworkType::Key node : msg2->_nodeList) {
277 msg2->receiver = node;
278 controller_->
send(boost::make_shared<IMessage>(*msg2));
283 subscribe(*_lastSubscribe.first, _lastSubscribe.second);
291 for (
typename NetworkType::Key node : msg2->_nodeList) {
300 if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
301 bool newTree =
false;
302 for (
size_t i = 0; i < msg2->_trees.size(); ++i) {
304 for (
size_t j = 0; j < trees_.size(); ++j) {
305 if (trees_[j]->getTopic() == msg2->_trees[i].topic) {
306 ChannelType::PartitionStrategy::changePredicate(j, msg2->_trees[i].predicates);
307 ChannelType::PartitionStrategy::changeRoot(j, msg2->_trees[i].root);
313 ChannelType::PartitionStrategy::addPartition(msg2->_trees[i].predicates, msg2->_trees[i].root);
320 if (newTree && _hasSubscribe) {
323 unsubscribePartitions();
325 subscribe(*_lastSubscribe.first, _lastSubscribe.second);
329 _nodeList.erase(msg2->sender);
331 if (_self == _rendezvous) {
332 if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
333 for (
size_t i = 0; i < trees_.size(); ++i) {
334 if (trees_[i]->getRoot() == msg2->sender) {
335 msg2->_topics.insert(trees_[i]->getTopic());
339 removeTopics(msg2->_topics);
344 unsubscribePartitions();
346 subscribe(*_lastSubscribe.first, _lastSubscribe.second);
350 for (std::pair<typename NetworkType::Key, uint64_t> node : _nodeList) {
351 if (node.first != _self) {
352 msg2->receiver = node.first;
353 controller_->
send(boost::make_shared<IMessage>(*msg2));
357 if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
358 removeTopics(msg2->_topics);
363 unsubscribePartitions();
365 subscribe(*_lastSubscribe.first, _lastSubscribe.second);
379 dynamicPeriodID_ = pssi_->
scheduler_.
runRepeated(5000000, boost::bind(&Channel::updateState,
this), 1);
388 assert(_self == _rendezvous);
389 std::vector<typename NetworkType::Key> purges;
391 for (std::pair<typename NetworkType::Key, uint64_t> p : _nodeList) {
392 if (p.first != _rendezvous && pssi_->
clock_.
getTime() - p.second >= 6000000) {
393 purges.push_back(p.first);
397 for (
typename NetworkType::Key node : purges) {
398 _nodeList.erase(node);
401 for (
typename NetworkType::Key node : purges) {
403 SimulationEventType v;
405 v._simChannel = topic_;
414 if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
415 for (
size_t i = 0; i < trees_.size(); ++i) {
416 if (trees_[i]->getRoot() == node) {
417 msg->_topics.insert(trees_[i]->getTopic());
421 removeTopics(msg->_topics);
426 unsubscribePartitions();
428 subscribe(*_lastSubscribe.first, _lastSubscribe.second);
432 for (std::pair<typename NetworkType::Key, uint64_t> nodes : _nodeList) {
433 if (nodes.first != _self) {
434 msg->receiver = nodes.first;
435 controller_->
send(boost::make_shared<IMessage>(*msg));
448 SimulationEventType v;
450 v._simChannel = topic_;
457 msg->receiver = _rendezvous;
459 controller_->
send(msg);
467 void updateOrderTrees() {
468 std::vector<typename ChannelType::OrderStrategy *> v;
469 for (
size_t i = 0; i < trees_.size(); ++i) {
470 v.push_back(static_cast<typename ChannelType::OrderStrategy *>(trees_[i]));
472 for (
size_t i = 0; i < trees_.size(); i++) {
473 trees_[i]->otherOrders(v);
480 void removeTopics(
const std::set<uint16_t> & topics) {
481 for (uint16_t ttr : topics) {
482 for (
size_t j = 0; j < trees_.size(); ++j) {
483 if (trees_[j]->getTopic() == ttr) {
484 ChannelType::PartitionStrategy::removePartition(j);
486 trees_.erase(trees_.begin() + int(j));
496 void unsubscribePartitions() {
497 std::vector<unsigned int> subs = ChannelType::PartitionStrategy::getSubscribeTrees(_lastSubscribe.second);
501 for (
size_t i = 0; i < trees_.size(); ++i) {
502 if (i == subs[index]) {
505 trees_[i]->unsubscribe(_lastSubscribe.second);
514 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
boost::shared_ptr< M2Message< EventType > > Ptr
TreeFactory * _tree_factory
void setOffset(uint64_t offset)
sets offset of this Clock to adjust times over network
void register_deliver(message::MessageType nr, net_deliver_func f)
ChannelName getChannel() const
Gets a identification of the channel.
PayloadPtr payload
payload
bool empty() const
returns true if the queue is empty, otherwise false
void unsubscribe(boost::shared_ptr< filter::FilterExp< EventType >> predicate) override
Channel(const ChannelName topic_name, const std::string &ip, uint16_t port, const std::string &rendezvousIP, uint16_t known_hostport, PubSubSystemEnvironment *pssi, const std::vector< std::string > &rootList)
Constructor.
T front()
returns first entry of the queue
void send(typename message::NetworkMessage< NetworkType >::Ptr msg)
sends a message to the receiver defined within the message if the sender is the receiver, message is put into deliver method directly to remove overhead by serializing and trying to send message otherwise the message is forwarded to the corresponding wrapper
void push(const T &value)
pushes the given value into the queue
static const uint32_t PULL_PARSEMESSAGE
RendezvousT RendezvousStrategy
static const uint32_t EXPECTED_LATENCY
void subscribe(BasicDeliverCallbackInterface< EventType > &callback, boost::shared_ptr< filter::FilterExp< EventType > > predicate)
subscribes to the channel
static const uint32_t PUBLISH_MESSAGECOUNT_MAX
void publish(const typename message::M2Message< EventType >::Ptr msg)
publishes a message on the channel
uint64_t getRealTime() const
Will return the real time since the Clock has been started.
void deliver(typename message::NetworkMessage< NetworkType >::Ptr msg)
Channel receives messages from network in here, used for dynamic partitions.
message::M2Message< EventType >::Ptr createMessage() const
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Scheduler< util::RealTimeClock > scheduler_
void clear()
removes all elements in the queue
util::Clock< util::RealTimeClock > clock_
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
T poll()
remoes first entry of the queue and returns its value
M2Message< EventType >::Ptr createMessage(const EventType &w) const
creates a Message
boost::shared_ptr< NetworkMessage > Ptr
std::string getSelf() const
PartitionT PartitionStrategy
Message Factory to create messages.
boost::shared_ptr< InternalMessage > Ptr
message::M2Message< EventType >::Ptr createMessage(const EventType &payload) const
static const uint32_t ACTION_TYPE_MASK
void pop()
removes first entry of the queue