i6engine  1.0
Scheduler.h
Go to the documentation of this file.
1 /*
2 * i6engine
3 * Copyright (2016) Daniel Bonrath, Michael Baer, All rights reserved.
4 *
5 * This file is part of i6engine; i6engine is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19 
25 #ifndef __I6ENGINE_CORE_SCHEDULER_H__
26 #define __I6ENGINE_CORE_SCHEDULER_H__
27 
28 #include <queue>
29 #include <set>
30 
31 #include "i6engine/utils/Clock.h"
33 
36 
37 #include "boost/thread.hpp"
38 
39 namespace i6e {
40 namespace core {
41 
42  template<class ClockUpdater>
43  class Scheduler {
44  public:
48  typedef struct Job {
52  Job(const boost::function<bool(void)> & f, uint64_t t, JobPriorities p, uint64_t d, uint64_t i = UINT64_MAX) : func(f), time(t), priority(p), interval(i), id(d) {
53  }
54 
58  boost::function<bool(void)> func;
59 
63  uint64_t time;
64 
68  JobPriorities priority; // the lower, the better
69 
73  uint64_t interval;
74 
78  uint64_t id;
79 
83  bool operator<(const Job & other) const {
84  if (time != other.time) {
85  return time > other.time;
86  }
87  if (priority != other.priority) {
88  return priority > other.priority;
89  }
90  return interval > other.interval;
91  }
92  } Job;
93 
97  explicit Scheduler(utils::Clock<ClockUpdater> & c) : _running(true), _clock(c), _queue(), _lock(), _id(), _workerThreads(), _removeIDs() {
98  for (uint16_t i = 0; i < SCHEDULER_THREAD_AMOUNT; i++) {
99  uint64_t tid = _clock.registerTimer();
100  _workerThreads.push_back(std::make_pair(tid, new boost::thread(boost::bind(&Scheduler<ClockUpdater>::worker, this, tid))));
101  }
102  }
103 
108  _running = false;
109  _lock.lock();
110  while (!_queue.empty()) {
111  _queue.pop();
112  }
113  _lock.unlock();
114  for (uint16_t i = 0; i < SCHEDULER_THREAD_AMOUNT; i++) {
115  _clock.updateWaitTime(_workerThreads[i].first, 0);
116  _workerThreads[i].second->join();
117  delete _workerThreads[i].second;
118  }
119  }
120 
129  uint64_t runOnce(uint64_t time, const boost::function<bool(void)> & f, JobPriorities priority) {
130  if (time <= 0) {
131  ISIXE_THROW_API("Scheduler", "time need to be > 0");
132  }
133 
134  Job j(f, _clock.getTime() + time, priority, _id++);
135  boost::mutex::scoped_lock sl(_lock);
136  _queue.push(j);
137  if (_queue.top().time == j.time) {
138  for (uint16_t i = 0; i < SCHEDULER_THREAD_AMOUNT; i++) {
139  _clock.updateWaitTime(_workerThreads[i].first, j.time);
140  }
141  }
142 
143  return j.id;
144  }
145 
154  uint64_t runRepeated(uint64_t interval, const boost::function<bool(void)> & f, JobPriorities priority) {
155  if (interval <= 0) {
156  ISIXE_THROW_API("Scheduler", "interval has to be greater than 0, otherwise there would be an instant call");
157  }
158 
159  Job j(f, _clock.getTime() + interval, priority, _id++, interval);
160  boost::mutex::scoped_lock sl(_lock);
161  _queue.push(j);
162  if (_queue.top().time == j.time) {
163  for (uint16_t i = 0; i < SCHEDULER_THREAD_AMOUNT; i++) {
164  _clock.updateWaitTime(_workerThreads[i].first, j.time);
165  }
166  }
167  return j.id;
168  }
169 
174  uint64_t getTimeLeft(uint64_t id) const {
175  _lock.lock();
176  std::priority_queue<Job> copy = _queue;
177  _lock.unlock();
178 
179  while (!copy.empty()) {
180  Job j = copy.top();
181  copy.pop();
182 
183  if (j.id == id) {
184  return j.time - _clock.getTime();
185  }
186  }
187 
188  return UINT64_MAX;
189  }
190 
194  void removeTimer(JobPriorities priority) {
195  boost::mutex::scoped_lock sl(_lock);
196  std::priority_queue<Job> copy = _queue;
197 
198  while (!_queue.empty()) {
199  _queue.pop();
200  }
201 
202  while (!copy.empty()) {
203  Job j = copy.top();
204  copy.pop();
205 
206  if (j.priority != priority) {
207  _queue.push(j);
208  }
209  }
210  }
211 
216  bool stop(uint64_t id) {
217  boost::mutex::scoped_lock sl(_lock);
218  _removeIDs.insert(id);
219  return true;
220  }
221 
222  private:
223  // TODO: (Michael) convert to a threadpool if needed
227  void worker(uint64_t tid) {
228  while (_running) {
229  _lock.lock();
230  while (!_queue.empty() && _queue.top().time <= _clock.getTime()) {
231  Job j = _queue.top();
232  _queue.pop();
233  if (_removeIDs.find(j.id) != _removeIDs.end()) {
234  _removeIDs.erase(j.id);
235  continue;
236  }
237  _lock.unlock();
238  bool b = j.func();
239  _lock.lock();
240  if (j.interval != UINT64_MAX && b) {
241  j.time = _clock.getTime() + j.interval;
242  if (_removeIDs.find(j.id) != _removeIDs.end()) {
243  _removeIDs.erase(j.id);
244  } else {
245  _queue.push(j);
246  }
247  }
248  }
249  uint64_t t = _clock.getTime() + 1000000; // sleep 1 second if no task is there
250  if (!_queue.empty()) {
251  t = _queue.top().time;
252  }
253  _lock.unlock();
254  if (!_clock.waitForTime(tid, t)) {
255  break;
256  }
257  }
258  }
259 
263  std::atomic<bool> _running;
264 
268  utils::Clock<ClockUpdater> & _clock;
269 
273  std::priority_queue<Job> _queue;
274 
278  mutable boost::mutex _lock;
279 
280  std::atomic<uint64_t> _id;
281 
285  std::vector<std::pair<uint64_t, boost::thread *>> _workerThreads;
286 
287  std::set<uint64_t> _removeIDs;
288 
292  Scheduler(const Scheduler &) = delete;
293  Scheduler & operator=(const Scheduler &) = delete;
294  };
295 
296 } /* namespace core */
297 } /* namespace i6e */
298 
299 #endif /* __I6ENGINE_CORE_SCHEDULER_H__ */
300 
const uint16_t SCHEDULER_THREAD_AMOUNT
uint64_t id
id of this job
Definition: Scheduler.h:78
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &f, JobPriorities priority)
starts a timer repeating in the given interval
Definition: Scheduler.h:154
uint64_t interval
interval in which this job is repeated, LONG_MAX if only once
Definition: Scheduler.h:73
#define ISIXE_THROW_API(module, message)
Definition: Exceptions.h:45
void updateWaitTime(uint64_t timerID, uint64_t time)
updates the time a timer is waiting for
Definition: Clock.h:109
bool operator<(const Job &other) const
operator for lower than, needed for priority_queue
Definition: Scheduler.h:83
uint64_t getTimeLeft(uint64_t id) const
returns time left until execution of the job if no job with given name found, -1 is returned ...
Definition: Scheduler.h:174
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &f, JobPriorities priority)
starts a timer beeing scheduled after the given time
Definition: Scheduler.h:129
uint64_t registerTimer()
registers a new timer waiting for this clock
Definition: Clock.h:87
bool waitForTime(uint64_t timerID, uint64_t time)
let's a timer wait for the given time
Definition: Clock.h:123
this struct represents a single timer event
Definition: Scheduler.h:48
uint64_t time
time this job will be scheduled
Definition: Scheduler.h:63
struct i6e::core::Scheduler::Job Job
this struct represents a single timer event
Job(const boost::function< bool(void)> &f, uint64_t t, JobPriorities p, uint64_t d, uint64_t i=UINT64_MAX)
constructor of the timer
Definition: Scheduler.h:52
uint64_t getTime() const
Will return the time since the Clock has been started.
Definition: Clock.h:80
~Scheduler()
destructor
Definition: Scheduler.h:107
bool stop(uint64_t id)
removes Timer registered for given id returns true, if Timer was found, otherwise false ...
Definition: Scheduler.h:216
Scheduler(utils::Clock< ClockUpdater > &c)
constructor for scheduler taking a clock
Definition: Scheduler.h:97
boost::function< bool(void)> func
method called by this timer
Definition: Scheduler.h:58
void removeTimer(JobPriorities priority)
removes all timers with given priority
Definition: Scheduler.h:194
JobPriorities priority
priority of this job
Definition: Scheduler.h:68