22 #ifndef __M2ETIS_PUBSUB_DELIVER_ACKDELIVER_H__
23 #define __M2ETIS_PUBSUB_DELIVER_ACKDELIVER_H__
41 bool operator()(
const std::pair<uint64_t, std::pair<uint64_t, int>> & a,
const std::pair<uint64_t, std::pair<uint64_t, int>> & b) {
46 template<
class NetworkType,
int Retries, Amount Multiples>
90 newInfo->
nr = info->nr;
92 if (delivered_[sender].contains(info->nr)) {
99 delivered_[sender].insert(info->nr);
108 newInfo->
nr = info->nr;
110 if (delivered_[sender].contains(info->nr)) {
113 delivered_[sender].insert(info->nr);
126 queue_.push(std::make_pair(cT + 100000, std::make_pair(
id, 0)));
138 acked_.insert(info->nr);
147 while (!queue_.empty() && queue_.top().first < cT) {
149 std::set<uint64_t>::const_iterator it = acked_.find(queue_.top().second.first);
150 if (it != acked_.end()) {
154 if (queue_.top().second.second < Retries) {
156 queue_.push(std::make_pair(cT + 100000, std::make_pair(queue_.top().second.first, queue_.top().second.second + 1)));
170 std::priority_queue<std::pair<uint64_t, std::pair<uint64_t, int>>, std::vector<std::pair<uint64_t, std::pair<uint64_t, int>>>,
Comp> queue_;
175 std::set<uint64_t> acked_;
177 std::map<typename NetworkType::Key, util::SegmentTree<uint64_t>> delivered_;
179 uint64_t periodicID_;
bool periodicCheck()
checks the queue for un-acked messages
bool process(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
message::AckDeliverInfo DeliverInfoType
boost::shared_ptr< DeliverInfo > Ptr
bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &receiver, message::ControlType ct)
creates the DeliverInfo
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
AckDeliver(PubSubSystemEnvironment *pssi, const typename NetworkType::Key &self)
bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Scheduler< util::RealTimeClock > scheduler_
bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
creates the DeliverInfo
bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes Control Messages (id got acked)
boost::function< void(message::DeliverInfo::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
stores the function to send new messages This function will send a newly created control message cont...
bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
boost::shared_ptr< AckDeliverInfo > Ptr
boost::function< void(uint64_t, msgProcess)> process_
stores the function to be called for delivering
bool operator()(const std::pair< uint64_t, std::pair< uint64_t, int >> &a, const std::pair< uint64_t, std::pair< uint64_t, int >> &b)