Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
WorkStealingThreadPool.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2020-2024 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// Threadpool implementation,
19// based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20/****************************************************************************/
21#pragma once
22#include <config.h>
23
24#include <algorithm>
25#include <thread>
26#include "TaskQueue.h"
27
28
29template<typename CONTEXT = int>
31public:
32
33 explicit WorkStealingThreadPool(const bool workSteal, const std::vector<CONTEXT>& context)
34 : myQueues{ context.size() }, myTryoutCount(workSteal ? 1 : 0) {
35 size_t index = 0;
36 for (const CONTEXT& c : context) {
37 if (workSteal) {
38 myThreads.emplace_back([this, index, c] { workStealRun(index, c); });
39 } else {
40 myThreads.emplace_back([this, index, c] { run(index, c); });
41 }
42 index++;
43 }
44 }
45
47 for (auto& queue : myQueues) {
48 queue.setEnabled(false);
49 }
50 for (auto& thread : myThreads) {
51 thread.join();
52 }
53 }
54
55 template<typename TaskT>
56 auto executeAsync(TaskT&& task, int idx = -1) -> std::future<decltype(task(std::declval<CONTEXT>()))> {
57 const auto index = idx == -1 ? myQueueIndex++ : idx;
58 if (myTryoutCount > 0) {
59 for (size_t n = 0; n != myQueues.size() * myTryoutCount; ++n) {
60 // Here we need not to std::forward just copy task.
61 // Because if the universal reference of task has bound to an r-value reference
62 // then std::forward will have the same effect as std::move and thus task is not required to contain a valid task.
63 // Universal reference must only be std::forward'ed a exactly zero or one times.
64 bool success = false;
65 auto result = myQueues[(index + n) % myQueues.size()].tryPush(task, success);
66
67 if (success) {
68 return result;
69 }
70 }
71 }
72 return myQueues[index % myQueues.size()].push(std::forward<TaskT>(task));
73 }
74
75 void waitAll() {
76 std::vector<std::future<void>> results;
77 for (int n = 0; n != (int)myQueues.size(); ++n) {
78 results.push_back(executeAsync([](CONTEXT) {}, n));
79 }
80 for (auto& r : results) {
81 r.wait();
82 }
83 }
84
85private:
86 void run(size_t queueIndex, const CONTEXT& context) {
87 while (myQueues[queueIndex].isEnabled()) {
89 if (myQueues[queueIndex].waitAndPop(task)) {
90 task->exec(context);
91 }
92 }
93 }
94
95 void workStealRun(size_t queueIndex, const CONTEXT& context) {
96 while (myQueues[queueIndex].isEnabled()) {
98 for (size_t n = 0; n != myQueues.size()*myTryoutCount; ++n) {
99 if (myQueues[(queueIndex + n) % myQueues.size()].tryPop(task)) {
100 break;
101 }
102 }
103 if (!task && !myQueues[queueIndex].waitAndPop(task)) {
104 return;
105 }
106 task->exec(context);
107 }
108 }
109
110private:
111 std::vector<TaskQueue<CONTEXT> > myQueues;
112 std::atomic<size_t> myQueueIndex{ 0 };
113 const size_t myTryoutCount;
114 std::vector<std::thread> myThreads;
115};
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition TaskQueue.h:60
std::vector< TaskQueue< CONTEXT > > myQueues
auto executeAsync(TaskT &&task, int idx=-1) -> std::future< decltype(task(std::declval< CONTEXT >()))>
void workStealRun(size_t queueIndex, const CONTEXT &context)
std::vector< std::thread > myThreads
WorkStealingThreadPool(const bool workSteal, const std::vector< CONTEXT > &context)
void run(size_t queueIndex, const CONTEXT &context)
std::atomic< size_t > myQueueIndex