22 #ifndef __M2ETIS_PUBSUB_ORDER_DETMERGEORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_DETMERGEORDER_H__
40 template<
class NetworkType,
class Config>
49 return !(*(a.first.get()) < *(b.first.get()));
75 return !queue_.empty();
90 lastEv->c = std::max(int64_t(0), int64_t(lastEv->r + lastEv->c - rt));
92 for (uint32_t t = 0; t < Config::eps * 2; ++t) {
93 if (t + rt - lastEv->r < 2 * Config::eps) {
94 lastEv->kn[t] = lastEv->kn[t + size_t(rt - lastEv->r)];
101 info->
ts->r = lastEv->r;
102 info->
ts->c = lastEv->c;
103 info->
ts->kn = lastEv->kn;
149 lastEv->c = std::max(int64_t(0), std::max(int64_t(lastEv->r + lastEv->c - rt), int64_t(int64_t(info->ts->r) + info->ts->c - rt)));
151 for(uint32_t t = 0; t < Config::eps * 2; ++t) {
156 if (t + rt - lastEv->r < 2 * Config::eps && t + rt - lastEv->r >= 0) {
157 v2 = lastEv->kn[t + size_t(rt - lastEv->r)];
159 if (t + rt - info->ts->r < 2 * Config::eps && t + rt - info->ts->r >= 0) {
160 v3 = info->ts->kn[t + size_t(rt - info->ts->r)];
162 lastEv->kn[t] = max(v1, max(v2, v3));
166 queue_.push(std::make_pair(info,
id));
172 rt = a / Config::raster;
174 while (!queue_.empty() && queue_.top().first->ts->r + queue_.top().first->ts->c + Config::delta + Config::eps <= rt) {
191 T max(
const T & a,
const T & b) {
204 volatile uint64_t rt;
211 std::priority_queue<std::pair<typename OrderInfoType::Ptr, uint64_t>, std::vector<std::pair<typename OrderInfoType::Ptr, uint64_t>>, Compare> queue_;
216 PubSubSystemEnvironment * pssi_;
218 uint64_t periodicID_;
boost::shared_ptr< OrderInfo > Ptr
boost::shared_ptr< DetMergeOrderInfo > Ptr
void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
notified when message dropped by filter or validity strategy
void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
this strategie doesn't care about subscribes
bool hasPending()
returns whether there are messages waiting
timestamp_p ts
timestamp for this OrderInfo
Implements the Deterministic Merge Order Strategy.
DetMergeOrder(PubSubSystemEnvironment *pssi, bool b)
Constructor.
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
called when a message is received This function updates the internal state of this strategie and push...
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Scheduler< util::RealTimeClock > scheduler_
uint64_t getTime() const
returns current time
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
configure Infos This functions sets all necessary information for this message
void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
this strategie doesn't care about publish payloads here
void otherOrders(const std::vector< DetMergeOrder * > &)
this strategie doesn't need the other trees
void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
boost::shared_ptr< Timestamp > timestamp_p
virtual ~DetMergeOrder()
Destructor.
message::DetMergeOrderInfo< Config > OrderInfoType
bool processControlPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
process Control Messages They are only used to sync