22 #ifndef __M2ETIS_PUBSUB_ORDER_MTPORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_MTPORDER_H__
32 #include "boost/thread.hpp"
34 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
36 #pragma warning(disable : 4127)
57 template<
class NetworkType, u
int64_t Timeout, LateDeliver deli>
74 return !queue_.empty();
103 sendQueue_.push(std::make_pair(
id, info));
119 p->seqNr = nextSend_++;
124 assert(sendQueue_.size() > 0);
125 sendQueue_.front().second->seqNr = info->seqNr;
150 for (uint64_t i : info->missing_) {
151 droppedQueue_.push(i);
170 if (info->seqNr == nextRec_) {
175 }
else if (info->seqNr == UINT64_MAX) {
180 }
else if (info->seqNr < nextRec_) {
199 remHelp.
dropped(info->seqNr, receiver);
207 std::priority_queue<std::tuple<uint64_t, uint64_t, uint64_t>, std::vector<std::tuple<uint64_t, uint64_t, uint64_t>>, std::greater<std::tuple<uint64_t, uint64_t, uint64_t>>> queue_;
213 std::queue<std::pair<uint64_t, OrderInfoType::Ptr>> sendQueue_;
218 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> droppedQueue_;
240 uint64_t periodicID_;
247 while (!queue_.empty()) {
248 if (std::get<SEQNR>(queue_.top()) == nextRec_) {
254 }
else if (std::get<TIME>(queue_.top()) + Timeout < cT) {
256 nextRec_ = std::get<SEQNR>(queue_.top());
257 }
else if (!droppedQueue_.empty() && nextRec_ == droppedQueue_.top()) {
283 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
boost::shared_ptr< OrderInfo > Ptr
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
receives a message stores the message in the queue and wait for it's deliver
LateDeliver
what to do when recieving an old message after Timeout
void processNotifyPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
bool hasPending()
pending messages if the buffer is non-empty
MTPOrder(PubSubSystemEnvironment *pssi, bool b)
void notifyRemovedMessage(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &receiver)
notified when message dropped by filter or validity strategy
boost::function< void(message::OrderInfo::Ptr, const typename NetworkType::Key &, ControlTarget)> sendMsg_
stores the function to send new messages This function will send a newly created control message cont...
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
boost::shared_ptr< MTPOrderInfo > Ptr
bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a control message
void clear(const KEYType &rec)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
std::vector< uint64_t > missing_
stores information about dropped messages
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &rec)
used to manage sending a message All messages but Publish messages can be sent directly without any s...
Scheduler< util::RealTimeClock > scheduler_
std::vector< IDType > getDropped(const KEYType &rec)
void otherOrders(const std::vector< MTPOrder * > &)
processes a subscribe message
message::MTPOrderInfo OrderInfoType
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
processes a subscribe message
void dropped(IDType id, const KEYType &rec)
void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
processes a subscribe message
This class implements the MTP Order strategy The root node is used as a fixed sequencer. Every node that wants to send a message, request a sequenzer number (TOKEN) from the sequencer and afterwards sends his message along with this sequence number.