Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2004-2025 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file MFXWorkerThread.h
15 : /// @author Michael Behrisch
16 : /// @date 2014-07-13
17 : ///
18 : // A thread class together with a pool and a task for parallelized computation
19 : /****************************************************************************/
20 : #pragma once
21 : #include <config.h>
22 :
23 : // #define WORKLOAD_PROFILING
24 : // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
25 : // undefine to use summary report only
26 : #define WORKLOAD_INTERVAL 100
27 :
28 : #include <list>
29 : #include <vector>
30 : #include "fxheader.h"
31 : #ifdef WORKLOAD_PROFILING
32 : #include <chrono>
33 : #include <utils/common/MsgHandler.h>
34 : #include <utils/common/ToString.h>
35 : #endif
36 : #include <utils/common/UtilExceptions.h>
37 :
38 :
39 : // ===========================================================================
40 : // class definitions
41 : // ===========================================================================
42 : /**
43 : * @class MFXWorkerThread
44 : * @brief A thread repeatingly calculating incoming tasks
45 : */
46 : class MFXWorkerThread : public FXThread {
47 :
48 : public:
49 : /**
50 : * @class MFXWorkerThread::Task
51 : * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks
52 : */
53 : class Task {
54 : public:
55 : /// @brief Desctructor
56 : virtual ~Task() {};
57 :
58 : /** @brief Abstract method which in subclasses should contain the computations to be performed.
59 : *
60 : * If there is data to be shared among several tasks (but not among several threads) it can be put in the
61 : * a thread class subclassing the MFXWorkerThread. the instance of the thread is then made available
62 : * via the context parameter.
63 : *
64 : * @param[in] context The thread which runs the task
65 : */
66 : virtual void run(MFXWorkerThread* context) = 0;
67 :
68 : /** @brief Sets the running index of this task.
69 : *
70 : * Every task receive an index which is unique among all pending tasks of the same thread pool.
71 : *
72 : * @param[in] newIndex the index to assign
73 : */
74 : void setIndex(const int newIndex) {
75 28965437 : myIndex = newIndex;
76 : }
77 : private:
78 : /// @brief the index of the task, valid only after the task has been added to the pool
79 : int myIndex;
80 : };
81 :
82 : /**
83 : * @class MFXWorkerThread::Pool
84 : * @brief A pool of worker threads which distributes the tasks and collects the results
85 : */
86 : class Pool {
87 : public:
88 : /** @brief Constructor
89 : *
90 : * May initialize the pool with a given number of workers.
91 : *
92 : * @param[in] numThreads the number of threads to create
93 : */
94 42868 : Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
95 : #ifdef WORKLOAD_PROFILING
96 : , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
97 : #endif
98 : {
99 : #ifdef WORKLOAD_PROFILING
100 : long long int timeDiff = 0;
101 : for (int i = 0; i < 100; i++) {
102 : const auto begin = std::chrono::high_resolution_clock::now();
103 : const auto end = std::chrono::high_resolution_clock::now();
104 : timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
105 : }
106 : //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
107 : #endif
108 42876 : while (numThreads > 0) {
109 8 : new MFXWorkerThread(*this);
110 8 : numThreads--;
111 : }
112 42868 : }
113 :
114 : /** @brief Destructor
115 : *
116 : * Stopping and deleting all workers by calling clear.
117 : */
118 42425 : virtual ~Pool() {
119 42425 : clear();
120 42425 : }
121 :
122 : /** @brief Stops and deletes all worker threads.
123 : */
124 80608 : void clear() {
125 97801 : for (MFXWorkerThread* const worker : myWorkers) {
126 17193 : delete worker;
127 : }
128 : myWorkers.clear();
129 80608 : }
130 :
131 : /** @brief Adds the given thread to the pool.
132 : *
133 : * @param[in] w the thread to add
134 : */
135 : void addWorker(MFXWorkerThread* const w) {
136 17477 : myWorkers.push_back(w);
137 17477 : }
138 :
139 : /** @brief Gives a number to the given task and assigns it to the worker with the given index.
140 : * If the index is negative, assign to the next (round robin) one.
141 : *
142 : * @param[in] t the task to add
143 : * @param[in] index index of the worker thread to use or -1 for an arbitrary one
144 : */
145 28965437 : void add(Task* const t, int index = -1) {
146 28965437 : if (index < 0) {
147 187739 : index = myRunningIndex % myWorkers.size();
148 : }
149 : #ifdef WORKLOAD_PROFILING
150 : if (myRunningIndex == 0) {
151 : for (MFXWorkerThread* const worker : myWorkers) {
152 : worker->startProfile();
153 : }
154 : myProfileStart = std::chrono::high_resolution_clock::now();
155 : }
156 : #endif
157 28965437 : t->setIndex(myRunningIndex++);
158 28965437 : myWorkers[index]->add(t);
159 28965437 : }
160 :
161 : /** @brief Adds the given tasks to the list of finished tasks.
162 : *
163 : * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only.
164 : *
165 : * @param[in] tasks the tasks to add
166 : */
167 15529943 : void addFinished(std::list<Task*>& tasks) {
168 15529943 : myMutex.lock();
169 : myFinishedTasks.splice(myFinishedTasks.end(), tasks);
170 15529943 : myCondition.signal();
171 15529943 : myMutex.unlock();
172 15529943 : }
173 :
174 25 : void setException(ProcessError& e) {
175 25 : myMutex.lock();
176 25 : if (myException == nullptr) {
177 25 : myException = new ProcessError(e);
178 : }
179 25 : myMutex.unlock();
180 25 : }
181 :
182 : /// @brief waits for all tasks to be finished
183 21525171 : void waitAll(const bool deleteFinished = true) {
184 21525171 : myMutex.lock();
185 33812617 : while ((int)myFinishedTasks.size() < myRunningIndex) {
186 12287446 : myCondition.wait(myMutex);
187 : }
188 : #ifdef WORKLOAD_PROFILING
189 : if (myRunningIndex > 0) {
190 : const auto end = std::chrono::high_resolution_clock::now();
191 : const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
192 : double minLoad = std::numeric_limits<double>::max();
193 : double maxLoad = 0.;
194 : for (MFXWorkerThread* const worker : myWorkers) {
195 : const double load = worker->endProfile(elapsed);
196 : minLoad = MIN2(minLoad, load);
197 : maxLoad = MAX2(maxLoad, load);
198 : }
199 : #ifdef WORKLOAD_INTERVAL
200 : myTotalMaxLoad += maxLoad;
201 : myTotalSpread += maxLoad / minLoad;
202 : myNumBatches++;
203 : if (myNumBatches % WORKLOAD_INTERVAL == 0) {
204 : WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
205 : myTotalMaxLoad = 0.;
206 : myTotalSpread = 0.;
207 : }
208 : #endif
209 : }
210 : #endif
211 21525171 : if (deleteFinished) {
212 10981341 : for (Task* task : myFinishedTasks) {
213 191419 : delete task;
214 : }
215 : }
216 21525171 : ProcessError* toRaise = myException;
217 21525171 : myException = nullptr;
218 : myFinishedTasks.clear();
219 21525171 : myRunningIndex = 0;
220 21525171 : myMutex.unlock();
221 21525171 : if (toRaise != nullptr) {
222 25 : ProcessError err = *toRaise;
223 25 : delete toRaise;
224 50 : throw err;
225 : }
226 21525146 : }
227 :
228 : /** @brief Checks whether there are currently more pending tasks than threads.
229 : *
230 : * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the
231 : * number of tasks is large.
232 : *
233 : * @return whether there are enough tasks to let all threads work
234 : */
235 : bool isFull() const {
236 31 : return myRunningIndex - (int)myFinishedTasks.size() >= size();
237 : }
238 :
239 : /** @brief Returns the number of threads in the pool.
240 : *
241 : * @return the number of threads
242 : */
243 : int size() const {
244 158914344 : return (int)myWorkers.size();
245 : }
246 :
247 : /// @brief locks the pool mutex
248 : void lock() {
249 : myPoolMutex.lock();
250 : }
251 :
252 : /// @brief unlocks the pool mutex
253 : void unlock() {
254 : myPoolMutex.unlock();
255 : }
256 :
257 : const std::vector<MFXWorkerThread*>& getWorkers() {
258 : return myWorkers;
259 : }
260 : private:
261 : /// @brief the current worker threads
262 : std::vector<MFXWorkerThread*> myWorkers;
263 : /// @brief the internal mutex for the task list
264 : FXMutex myMutex;
265 : /// @brief the pool mutex for external sync
266 : FXMutex myPoolMutex;
267 : /// @brief the semaphore to wait on for finishing all tasks
268 : FXCondition myCondition;
269 : /// @brief list of finished tasks
270 : std::list<Task*> myFinishedTasks;
271 : /// @brief the running index for the next task
272 : int myRunningIndex;
273 : /// @brief the exception from a child thread
274 : ProcessError* myException;
275 : #ifdef WORKLOAD_PROFILING
276 : /// @brief the number of finished batch runs
277 : int myNumBatches;
278 : /// @brief the sum over the maximum loads
279 : double myTotalMaxLoad;
280 : /// @brief the sum over the load spreads
281 : double myTotalSpread;
282 : /// @brief the time when profiling started
283 : std::chrono::high_resolution_clock::time_point myProfileStart;
284 : #endif
285 : };
286 :
287 : public:
288 : /** @brief Constructor
289 : *
290 : * Adds the thread to the given pool and starts it.
291 : *
292 : * @param[in] pool the pool for this thread
293 : */
294 17477 : MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
295 : #ifdef WORKLOAD_PROFILING
296 : , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
297 : #endif
298 : {
299 17477 : pool.addWorker(this);
300 17477 : start();
301 17477 : }
302 :
303 : /** @brief Destructor
304 : *
305 : * Stops the thread by calling stop.
306 : */
307 17201 : virtual ~MFXWorkerThread() {
308 17193 : stop();
309 : #ifdef WORKLOAD_PROFILING
310 : const double load = 100. * myTotalBusyTime / myTotalTime;
311 : WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
312 : " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
313 : "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
314 : #endif
315 17201 : }
316 :
317 : /** @brief Adds the given task to this thread to be calculated
318 : *
319 : * @param[in] t the task to add
320 : */
321 28965437 : void add(Task* t) {
322 28965437 : myMutex.lock();
323 28965437 : myTasks.push_back(t);
324 28965437 : myCondition.signal();
325 28965437 : myMutex.unlock();
326 28965437 : }
327 :
328 : /** @brief Main execution method of this thread.
329 : *
330 : * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped.
331 : *
332 : * @return always 0
333 : */
334 17477 : FXint run() {
335 15547420 : while (!myStopped) {
336 15547366 : myMutex.lock();
337 29971463 : while (!myStopped && myTasks.empty()) {
338 14424381 : myCondition.wait(myMutex);
339 : }
340 15547082 : if (myStopped) {
341 17139 : myMutex.unlock();
342 17139 : break;
343 : }
344 : myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
345 15529943 : myMutex.unlock();
346 : try {
347 44495355 : for (Task* const t : myCurrentTasks) {
348 : #ifdef WORKLOAD_PROFILING
349 : const auto before = std::chrono::high_resolution_clock::now();
350 : #endif
351 28965437 : t->run(this);
352 : #ifdef WORKLOAD_PROFILING
353 : const auto after = std::chrono::high_resolution_clock::now();
354 : myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
355 : myCounter++;
356 : #endif
357 : }
358 25 : } catch (ProcessError& e) {
359 25 : myPool.setException(e);
360 25 : }
361 15529943 : myPool.addFinished(myCurrentTasks);
362 : }
363 17193 : return 0;
364 : }
365 :
366 : /** @brief Stops the thread
367 : *
368 : * The currently running task will be finished but all further tasks are discarded.
369 : */
370 34370 : void stop() {
371 34370 : myMutex.lock();
372 34370 : myStopped = true;
373 34370 : myCondition.signal();
374 34370 : myMutex.unlock();
375 34370 : join();
376 34370 : }
377 :
378 : #ifdef WORKLOAD_PROFILING
379 : void startProfile() {
380 : myBusyTime = 0;
381 : }
382 :
383 : double endProfile(const long long int time) {
384 : myTotalTime += time;
385 : myTotalBusyTime += myBusyTime;
386 : return time == 0 ? 100. : 100. * myBusyTime / time;
387 : }
388 : #endif
389 :
390 : private:
391 : /// @brief the pool for this thread
392 : Pool& myPool;
393 : /// @brief the mutex for the task list
394 : FXMutex myMutex;
395 : /// @brief the semaphore when waiting for new tasks
396 : FXCondition myCondition;
397 : /// @brief the list of pending tasks
398 : std::list<Task*> myTasks;
399 : /// @brief the list of tasks which are currently calculated
400 : std::list<Task*> myCurrentTasks;
401 : /// @brief whether we are still running
402 : bool myStopped;
403 : #ifdef WORKLOAD_PROFILING
404 : /// @brief counting completed tasks
405 : int myCounter;
406 : /// @brief the time spent in calculations during the current batch
407 : long long int myBusyTime;
408 : /// @brief the total busy time
409 : long long int myTotalBusyTime;
410 : /// @brief the total time while anyone had tasks
411 : long long int myTotalTime;
412 : #endif
413 : };
|