22 #ifndef __M2ETIS_PUBSUB_DELIVER_NACKDELIVER_H__
23 #define __M2ETIS_PUBSUB_DELIVER_NACKDELIVER_H__
38 template<
class NetworkType,
int Retries, Amount Multiples>
44 periodicID_ = pssi->
scheduler_.
runRepeated(REQUESTTIME / 2, boost::bind(&NackDeliver::periodicCheck,
this), 1);
55 return processPublishAndNotify(ptr, sender);
62 return processPublishAndNotify(ptr, sender);
69 return processPublishAndNotify(ptr, sender);
76 return processPublishAndNotify(ptr, sender);
80 return processPublishAndNotify(ptr, sender);
89 info->
nr = lastID_[receiver]++;
90 info->dropped = dropped_[receiver];
91 dropped_[receiver].clear();
92 buffer_.insert(std::make_pair(std::make_pair(self_, info->nr), std::make_pair(
id,
BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + BUFFERTIME)));
104 typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator it = buffer_.find(std::make_pair(self_, info->nr));
105 if (it != buffer_.end()) {
109 dropped_[sender].push_back(info->nr);
117 typename std::map<typename NetworkType::Key, uint64_t>::iterator it = lastMessages_.find(sender);
119 if (it != lastMessages_.end()) {
120 if (it->second > info->nr) {
121 if (missing_[sender].find(info->nr) == missing_[sender].end()) {
128 missing_[sender].erase(info->nr);
131 }
else if (it->second < info->nr) {
132 typename std::map<typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> >::iterator it2 = missing_.find(sender);
133 for (uint64_t i = it->second + 1; i < info->nr; ++i) {
134 it2->second[i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0);
136 it->second = info->nr;
146 lastMessages_[sender] = info->nr;
147 std::map<uint64_t, std::pair<uint64_t, int>> missings;
148 for (uint64_t i = 0; i < info->nr; ++i) {
149 missings[i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0);
151 missing_[sender] = missings;
154 for (uint64_t p : info->dropped) {
155 if (p < lastMessages_[sender]) {
156 missing_[sender].erase(p);
158 for (uint64_t i = lastMessages_[sender] + 1; i < p; ++i) {
159 missing_[sender][i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0);
161 lastMessages_[sender] = p;
170 bool periodicCheck() {
171 uint64_t cT = BaseDeliver<NetworkType>::pssi_->scheduler_.getTime();
172 for (
typename std::map<
typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> >::iterator it = missing_.begin(); it != missing_.end(); ++it) {
173 for (
typename std::map<uint64_t, std::pair<uint64_t, int>>::iterator it2 = it->second.begin(); it2 != it->second.end();) {
174 if (it2->second.first < cT) {
177 newInfo->nr = it2->first;
179 it2->second.second++;
180 it2->second.first = cT + REQUESTTIME;
183 typename std::map<uint64_t, std::pair<uint64_t, int>>::iterator itDelete = it2;
185 it->second.erase(itDelete);
192 for (
typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator it = buffer_.begin(); it != buffer_.end();) {
193 if (it->second.second < cT) {
195 typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator itDelete = it;
197 buffer_.erase(itDelete);
208 std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> > buffer_;
210 std::map<typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> > missing_;
212 std::map<typename NetworkType::Key, uint64_t> lastMessages_;
214 std::map<typename NetworkType::Key, std::vector<uint64_t>> dropped_;
216 std::map<typename NetworkType::Key, uint64_t> lastID_;
218 typename NetworkType::Key self_;
220 uint64_t periodicID_;
222 util::SegmentTree<uint64_t> delivered_;
224 const uint64_t REQUESTTIME = 100000;
226 const uint64_t BUFFERTIME = 10000000;
bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
boost::shared_ptr< DeliverInfo > Ptr
bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
NackDeliver(PubSubSystemEnvironment *pssi, const typename NetworkType::Key &self)
boost::shared_ptr< NackDeliverInfo > Ptr
bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes Control Messages (id got acked)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Scheduler< util::RealTimeClock > scheduler_
bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &receiver, message::ControlType ct)
creates the DeliverInfo
boost::function< void(message::DeliverInfo::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
stores the function to send new messages This function will send a newly created control message cont...
message::NackDeliverInfo DeliverInfoType
boost::function< void(uint64_t, msgProcess)> process_
stores the function to be called for delivering
bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
creates the DeliverInfo