22 #ifndef __M2ETIS_PUBSUB_SCHEDULER_H__
23 #define __M2ETIS_PUBSUB_SCHEDULER_H__
31 #include "boost/thread.hpp"
36 template<
class ClockUpdater>
40 Job(
const boost::function<
bool(
void)> & f, uint64_t t, int16_t p, uint64_t d, uint64_t i = UINT64_MAX) :
func(f),
time(t),
priority(p),
interval(i),
id(d) {
42 boost::function<bool(void)>
func;
49 if (time != other.
time) {
50 return time > other.
time;
78 uint64_t
runOnce(uint64_t time,
const boost::function<
bool(
void)> & func, int16_t priority) {
80 Job j(func, clock_.
getTime() + time, priority, id_++);
94 uint64_t
runRepeated(uint64_t interval,
const boost::function<
bool(
void)> & func, int16_t priority) {
96 Job j(func, clock_.
getTime() + interval, priority, id_++, interval);
125 while (!queue_.empty()) {
139 while (!queue_.empty() && queue_.top().time <= clock_.
getTime()) {
140 Job j = queue_.top();
143 bool b = stopMap_.find(j.id) != stopMap_.end();
146 stopMap_.erase(j.id);
154 if (b && j.interval != UINT64_MAX) {
156 j.time = cTime + j.interval;
168 volatile bool running_;
170 util::Clock<ClockUpdater> & clock_;
172 std::priority_queue<Job> queue_;
176 boost::thread worker_;
178 std::atomic<uint64_t> id_;
180 std::map<uint64_t, bool> stopMap_;
184 uint64_t tID = clock_.registerTimer();
188 while (running_ && !queue_.empty() && queue_.top().time <= clock_.getTime()) {
189 Job j = queue_.top();
191 bool b = stopMap_.find(j.id) != stopMap_.end();
194 stopMap_.erase(j.id);
206 if (b && j.interval != UINT64_MAX) {
207 uint64_t cTime = clock_.getTime();
208 j.time = cTime + j.interval;
212 uint64_t t = clock_.getTime() + 1000000;
213 if (!queue_.empty()) {
214 t = queue_.top().time;
217 if (!running_ || !clock_.waitForTime(tID, t)) {
struct m2etis::pubsub::Scheduler::Job Job
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &func, int16_t priority)
adds new job running only once
boost::function< bool(void)> func
Scheduler(util::Clock< ClockUpdater > &c)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
void Stop()
Stops whole Scheduler and removes all tasks.
bool operator<(const Job &other) const
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
uint64_t getTime() const
returns current time
Job(const boost::function< bool(void)> &f, uint64_t t, int16_t p, uint64_t d, uint64_t i=UINT64_MAX)