22 #ifndef __M2ETIS_PUBSUB_ORDER_GMSORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_GMSORDER_H__
30 #include "boost/any.hpp"
41 template<
class NetworkType,
unsigned int Timeout>
47 MsgInfo(uint64_t a, uint64_t b, uint64_t c) : seqNr(a), treeId(b), msgId(c) {
65 bool operator<(
const MsgInfo & b)
const {
66 return !(std::tie(seqNr, treeId, msgId) < std::tie(b.seqNr, b.treeId, b.msgId));
69 bool operator>(
const MsgInfo & b)
const {
80 for (
size_t i = 0; i <
others_.size(); ++i) {
104 std::vector<std::set<typename NetworkType::Key>>
subs_;
109 std::map<typename NetworkType::Key, uint64_t>
nextSend_;
113 std::map<typename NetworkType::Key, uint64_t>
nextRec_;
118 std::map<typename NetworkType::Key, uint64_t>
mainTree_;
148 std::map<typename NetworkType::Key, std::set<uint64_t>>
subOn_;
152 typedef std::pair<uint64_t, uint64_t>
BufId;
159 std::priority_queue<QueueTuple, std::vector<QueueTuple>, std::greater<QueueTuple>>
ctrlQueue_;
170 return !ctrlQueue_.empty();
191 if (masterTree == UINT64_MAX) {
193 for (
size_t i = 0; i < others_.size(); ++i) {
194 others_[i]->setMasterTree(myTree_);
197 }
else if (masterTree == myTree_) {
203 if (selfSub_.find(myTree_) == selfSub_.end()) {
204 selfSub_.insert(myTree_);
213 if (nextSend_.find(rec) == nextSend_.end()) {
216 info->seqNr = nextSend_[rec]++;
220 selfSub_.erase(myTree_);
221 if (myTree_ == masterTree) {
223 if (selfSub_.empty()) {
224 masterTree = UINT64_MAX;
226 masterTree = *(selfSub_.begin());
234 p->seqNr = 123456789;
252 info->realTree = info->realTree == UINT64_MAX ? 0 : info->realTree;
254 subOn_[sender].insert(info->realTree);
256 subs_[uint32_t(info->realTree)].insert(sender);
258 mainTree_[sender] = info->mT;
262 others_[0]->processSubscribePayload(ptr, sender);
276 info->msgNr = nextSend++;
278 others_[0]->sendCtrl(myTree_, info->msgNr);
297 info->realTree = info->realTree == UINT64_MAX ? 0 : info->realTree;
299 subOn_[sender].erase(info->realTree);
301 subs_[uint32_t(info->realTree)].erase(sender);
303 mainTree_[sender] = info->mT;
307 others_[0]->processControlPayload(ptr, sender);
313 if (masterTree != myTree_) {
335 while (b && !ctrlQueue_.empty()) {
337 MsgInfo head = std::get<0>(ctrlQueue_.top());
338 if (ctrlQueue_.size() > 0 && head.seqNr ==
nextFinished) {
339 if (head.msgId == UINT64_MAX) {
344 BufId bid = std::make_pair(head.treeId, head.msgId);
346 if (pubQueue_.find(bid) != pubQueue_.end()) {
348 others_[uint32_t(pubQueue_[bid].first)]->deliver(pubQueue_[bid].second);
349 pubQueue_.erase(bid);
357 nextFinished = std::get<0>(ctrlQueue_.top()).seqNr;
375 if (masterTree == UINT64_MAX || myTree_ == masterTree) {
377 pubQueue_[std::make_pair(info->realTree, info->msgNr)] = std::make_pair(info->realTree,
id);
381 others_[uint32_t(masterTree)]->receive(
id, ptr, sender);
394 for (
size_t i = 0; i < others_.size(); ++i) {
395 if (others_[i] ==
this) {
402 subs_ = std::vector<std::set<typename NetworkType::Key>>(others_.size());
412 std::set<uint64_t> v;
413 for (
typename std::set<typename NetworkType::Key>::iterator subsOnTree = subs_[uint32_t(treeId)].begin(); subsOnTree != subs_[uint32_t(treeId)].end(); ++subsOnTree) {
415 v.insert(mainTree_[*subsOnTree]);
418 for (std::set<uint64_t>::iterator it = v.begin(); it != v.end(); ++it) {
419 others_[uint32_t(*it)]->sendCtrl2(treeId, msgId);
431 p->realTree = treeId;
433 p->seqNr = 123456789;
boost::shared_ptr< OrderInfo > Ptr
std::priority_queue< QueueTuple, std::vector< QueueTuple >, std::greater< QueueTuple > > ctrlQueue_
Queue containing all recieved but not yet further processed ctrl messages.
void deliver(uint64_t a)
the Message with id a shall be delivered This is used to force other instances to deliver a message ...
std::tuple< MsgInfo, uint64_t > QueueTuple
uint64_t masterTree
tree on which Control Msgs are expected. Default: None
void sendCtrl2(uint64_t treeId, uint64_t msgId)
send a ctrlMsg on this tree
void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
notified when message dropped by filter or validity strategy
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &rec)
configures the info struct
std::map< typename NetworkType::Key, uint64_t > nextSend_
Receiver -> next Seq. Nr. to use.
uint64_t nextFinished
next ID to be expected
std::set< uint64_t > selfSub_
this node is subscribed on these trees
void updateQueue()
checks the queue for new messages to deliver
bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a control payload (Root)
uint64_t nextSend
next number to be used as the global Sequence number
std::pair< uint64_t, uint64_t > BufId
std::map< typename NetworkType::Key, uint64_t > nextRec_
Sender -> next Seq. Nr. to expect.
std::map< typename NetworkType::Key, std::set< uint64_t > > subOn_
which node is subscribed on which tree
boost::function< void(message::OrderInfo::Ptr, const typename NetworkType::Key &, ControlTarget)> sendMsg_
stores the function to send new messages This function will send a newly created control message cont...
bool hasPending()
returns whether pending messages exist
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
boost::shared_ptr< GMSOrderInfo > Ptr
uint64_t myTree_
index number of this tree
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
called by the Tree whenever a message is ready to be delivered This function stores the messages (to ...
std::pair< uint64_t, uint64_t > MsgIdent
void processUnsubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
void processSubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a subscribe payload (Root) the message will be processed on tree 0
void setMasterTree(uint64_t t)
this node will only accept controlmessages from this tree
PubSubSystemEnvironment * pssi_
pointer to the PubSubSystemEnvironment
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
GMSOrder(PubSubSystemEnvironment *pssi, bool isRoot)
std::map< MsgIdent, BufId > pubQueue_
stores all NOTIFY msgs that arrived
void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
std::vector< std::set< typename NetworkType::Key > > subs_
for every tree, save all subscribers only neccessary on root node
message::GMSOrderInfo< NetworkType > OrderInfoType
implements the Order Strategy from Garcia-Molina and Spauster The Strategy ensures synchronisation al...
void processPublishPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
processes a publish payload
std::vector< GMSOrder * > others_
list of all Order classes from the other trees
std::map< typename NetworkType::Key, uint64_t > mainTree_
Receiver -> Tree that should be used for the Control msg only used in root node.
void sendCtrl(uint64_t treeId, uint64_t msgId)
the tree shall publish CtrlMsgs on all trees that have subscribers This function is only used on Tree...
void otherOrders(const std::vector< GMSOrder * > &others)
stores the instances of the OrderStrategy from the other Channels Also sets the myTree_ variable and ...