Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
TaskQueue.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2020-2024 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// Threadpool implementation,
19// based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20/****************************************************************************/
21#pragma once
22#include <config.h>
23
24#include <condition_variable>
25#include <functional>
26#include <queue>
27#ifdef _MSC_VER
28#pragma warning(push)
29#pragma warning(disable: 4355 5204 5220) // mask warnings in MSVCs ppl-stdlib
30#endif
31#include <future>
32#ifdef _MSC_VER
33#pragma warning(pop)
34#endif
35
36
37template <typename C>
38class TaskBase {
39public:
40 virtual ~TaskBase() = default;
41 virtual void exec(const C& context) = 0;
42};
43
44template <typename T, typename C>
45class Task : public TaskBase<C> {
46public:
47 Task(T&& t) : task(std::move(t)) {}
48 void exec(const C& context) override {
49 task(context);
50 }
51
53};
54
55template <typename C>
56class TaskQueue {
57 using LockType = std::unique_lock<std::mutex>;
58
59public:
60 using TaskPtrType = std::unique_ptr<TaskBase<C> >;
61 TaskQueue() = default;
62 ~TaskQueue() = default;
63
64 void setEnabled(bool enabled) {
65 {
66 LockType lock{ myMutex };
67 myEnabled = enabled;
68 }
69 if (!enabled) {
70 myReady.notify_all();
71 }
72 }
73
74 bool isEnabled() const {
75 LockType lock{ myMutex };
76 return myEnabled;
77 }
78
79 bool waitAndPop(TaskPtrType& task) {
80 LockType lock{ myMutex };
81 myReady.wait(lock, [this] { return !myEnabled || !myQueue.empty(); });
82 if (myEnabled && !myQueue.empty()) {
83 task = std::move(myQueue.front());
84 myQueue.pop();
85 return true;
86 }
87 return false;
88 }
89
90 template <typename TaskT>
91 auto push(TaskT&& task) -> std::future<decltype(task(std::declval<C>()))> {
92 using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
93 auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
94 auto future = job->task.get_future();
95 {
96 LockType lock{ myMutex };
97 myQueue.emplace(std::move(job));
98 }
99
100 myReady.notify_one();
101 return future;
102 }
103
104 bool tryPop(TaskPtrType& task) {
105 LockType lock{ myMutex, std::try_to_lock };
106 if (!lock || !myEnabled || myQueue.empty()) {
107 return false;
108 }
109 task = std::move(myQueue.front());
110 myQueue.pop();
111 return true;
112 }
113
114 template <typename TaskT>
115 auto tryPush(TaskT&& task, bool& success) -> std::future<decltype(task(std::declval<C>()))> {
116 std::future<decltype(task(std::declval<C>()))> future;
117 success = false;
118 {
119 LockType lock{ myMutex, std::try_to_lock };
120 if (!lock) {
121 return future;
122 }
123 using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
124 auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
125 future = job->task.get_future();
126 success = true;
127 myQueue.emplace(std::move(job));
128 }
129
130 myReady.notify_one();
131 return future;
132 }
133
134private:
135 TaskQueue(const TaskQueue&) = delete;
136 TaskQueue& operator=(const TaskQueue&) = delete;
137
138 std::queue<TaskPtrType> myQueue;
139 bool myEnabled = true;
140 mutable std::mutex myMutex;
141 std::condition_variable myReady;
142};
virtual ~TaskBase()=default
virtual void exec(const C &context)=0
T task
Definition TaskQueue.h:52
Task(T &&t)
Definition TaskQueue.h:47
void exec(const C &context) override
Definition TaskQueue.h:48
auto tryPush(TaskT &&task, bool &success) -> std::future< decltype(task(std::declval< C >()))>
Definition TaskQueue.h:115
bool waitAndPop(TaskPtrType &task)
Definition TaskQueue.h:79
bool tryPop(TaskPtrType &task)
Definition TaskQueue.h:104
std::condition_variable myReady
Definition TaskQueue.h:141
TaskQueue()=default
std::queue< TaskPtrType > myQueue
Definition TaskQueue.h:138
bool myEnabled
Definition TaskQueue.h:139
bool isEnabled() const
Definition TaskQueue.h:74
TaskQueue(const TaskQueue &)=delete
std::unique_lock< std::mutex > LockType
Definition TaskQueue.h:57
auto push(TaskT &&task) -> std::future< decltype(task(std::declval< C >()))>
Definition TaskQueue.h:91
TaskQueue & operator=(const TaskQueue &)=delete
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition TaskQueue.h:60
~TaskQueue()=default
std::mutex myMutex
Definition TaskQueue.h:140
void setEnabled(bool enabled)
Definition TaskQueue.h:64
Definition json.hpp:4471