Eclipse SUMO - Simulation of Urban MObility
TaskQueue.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 // Copyright (C) 2020-2024 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // Threadpool implementation,
19 // based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20 /****************************************************************************/
21 #pragma once
22 #include <config.h>
23 
24 #include <condition_variable>
25 #include <functional>
26 #include <queue>
27 #ifdef _MSC_VER
28 #pragma warning(push)
29 #pragma warning(disable: 4355 5204 5220) // mask warnings in MSVCs ppl-stdlib
30 #endif
31 #include <future>
32 #ifdef _MSC_VER
33 #pragma warning(pop)
34 #endif
35 
36 
37 template <typename C>
38 class TaskBase {
39 public:
40  virtual ~TaskBase() = default;
41  virtual void exec(const C& context) = 0;
42 };
43 
44 template <typename T, typename C>
45 class Task : public TaskBase<C> {
46 public:
47  Task(T&& t) : task(std::move(t)) {}
48  void exec(const C& context) override {
49  task(context);
50  }
51 
52  T task;
53 };
54 
55 template <typename C>
56 class TaskQueue {
57  using LockType = std::unique_lock<std::mutex>;
58 
59 public:
60  using TaskPtrType = std::unique_ptr<TaskBase<C> >;
61  TaskQueue() = default;
62  ~TaskQueue() = default;
63 
64  void setEnabled(bool enabled) {
65  {
66  LockType lock{ myMutex };
67  myEnabled = enabled;
68  }
69  if (!enabled) {
70  myReady.notify_all();
71  }
72  }
73 
74  bool isEnabled() const {
75  LockType lock{ myMutex };
76  return myEnabled;
77  }
78 
79  bool waitAndPop(TaskPtrType& task) {
80  LockType lock{ myMutex };
81  myReady.wait(lock, [this] { return !myEnabled || !myQueue.empty(); });
82  if (myEnabled && !myQueue.empty()) {
83  task = std::move(myQueue.front());
84  myQueue.pop();
85  return true;
86  }
87  return false;
88  }
89 
90  template <typename TaskT>
91  auto push(TaskT&& task) -> std::future<decltype(task(std::declval<C>()))> {
92  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
93  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
94  auto future = job->task.get_future();
95  {
96  LockType lock{ myMutex };
97  myQueue.emplace(std::move(job));
98  }
99 
100  myReady.notify_one();
101  return future;
102  }
103 
104  bool tryPop(TaskPtrType& task) {
105  LockType lock{ myMutex, std::try_to_lock };
106  if (!lock || !myEnabled || myQueue.empty()) {
107  return false;
108  }
109  task = std::move(myQueue.front());
110  myQueue.pop();
111  return true;
112  }
113 
114  template <typename TaskT>
115  auto tryPush(TaskT&& task, bool& success) -> std::future<decltype(task(std::declval<C>()))> {
116  std::future<decltype(task(std::declval<C>()))> future;
117  success = false;
118  {
119  LockType lock{ myMutex, std::try_to_lock };
120  if (!lock) {
121  return future;
122  }
123  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
124  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
125  future = job->task.get_future();
126  success = true;
127  myQueue.emplace(std::move(job));
128  }
129 
130  myReady.notify_one();
131  return future;
132  }
133 
134 private:
135  TaskQueue(const TaskQueue&) = delete;
136  TaskQueue& operator=(const TaskQueue&) = delete;
137 
138  std::queue<TaskPtrType> myQueue;
139  bool myEnabled = true;
140  mutable std::mutex myMutex;
141  std::condition_variable myReady;
142 };
virtual ~TaskBase()=default
virtual void exec(const C &context)=0
Definition: TaskQueue.h:45
T task
Definition: TaskQueue.h:52
Task(T &&t)
Definition: TaskQueue.h:47
void exec(const C &context) override
Definition: TaskQueue.h:48
auto tryPush(TaskT &&task, bool &success) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:115
bool waitAndPop(TaskPtrType &task)
Definition: TaskQueue.h:79
bool tryPop(TaskPtrType &task)
Definition: TaskQueue.h:104
std::condition_variable myReady
Definition: TaskQueue.h:141
TaskQueue()=default
TaskQueue & operator=(const TaskQueue &)=delete
std::queue< TaskPtrType > myQueue
Definition: TaskQueue.h:138
bool myEnabled
Definition: TaskQueue.h:139
bool isEnabled() const
Definition: TaskQueue.h:74
TaskQueue(const TaskQueue &)=delete
std::unique_lock< std::mutex > LockType
Definition: TaskQueue.h:57
auto push(TaskT &&task) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:91
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition: TaskQueue.h:60
~TaskQueue()=default
std::mutex myMutex
Definition: TaskQueue.h:140
void setEnabled(bool enabled)
Definition: TaskQueue.h:64
Definition: json.hpp:4471