Eclipse SUMO - Simulation of Urban MObility
MFXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 // Copyright (C) 2004-2024 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // A thread class together with a pool and a task for parallelized computation
19 /****************************************************************************/
20 #pragma once
21 #include <config.h>
22 
23 // #define WORKLOAD_PROFILING
24 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
25 // undefine to use summary report only
26 #define WORKLOAD_INTERVAL 100
27 
28 #include <list>
29 #include <vector>
30 #include "fxheader.h"
31 #ifdef WORKLOAD_PROFILING
32 #include <chrono>
34 #include <utils/common/ToString.h>
35 #endif
37 
38 
39 // ===========================================================================
40 // class definitions
41 // ===========================================================================
46 class MFXWorkerThread : public FXThread {
47 
48 public:
53  class Task {
54  public:
56  virtual ~Task() {};
57 
66  virtual void run(MFXWorkerThread* context) = 0;
67 
74  void setIndex(const int newIndex) {
75  myIndex = newIndex;
76  }
77  private:
79  int myIndex;
80  };
81 
86  class Pool {
87  public:
94  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
95 #ifdef WORKLOAD_PROFILING
96  , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
97 #endif
98  {
99 #ifdef WORKLOAD_PROFILING
100  long long int timeDiff = 0;
101  for (int i = 0; i < 100; i++) {
102  const auto begin = std::chrono::high_resolution_clock::now();
103  const auto end = std::chrono::high_resolution_clock::now();
104  timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
105  }
106  //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
107 #endif
108  while (numThreads > 0) {
109  new MFXWorkerThread(*this);
110  numThreads--;
111  }
112  }
113 
118  virtual ~Pool() {
119  clear();
120  }
121 
124  void clear() {
125  for (MFXWorkerThread* const worker : myWorkers) {
126  delete worker;
127  }
128  myWorkers.clear();
129  }
130 
135  void addWorker(MFXWorkerThread* const w) {
136  myWorkers.push_back(w);
137  }
138 
145  void add(Task* const t, int index = -1) {
146  if (index < 0) {
147  index = myRunningIndex % myWorkers.size();
148  }
149 #ifdef WORKLOAD_PROFILING
150  if (myRunningIndex == 0) {
151  for (MFXWorkerThread* const worker : myWorkers) {
152  worker->startProfile();
153  }
154  myProfileStart = std::chrono::high_resolution_clock::now();
155  }
156 #endif
157  t->setIndex(myRunningIndex++);
158  myWorkers[index]->add(t);
159  }
160 
167  void addFinished(std::list<Task*>& tasks) {
168  myMutex.lock();
169  myFinishedTasks.splice(myFinishedTasks.end(), tasks);
170  myCondition.signal();
171  myMutex.unlock();
172  }
173 
175  myMutex.lock();
176  if (myException == nullptr) {
177  myException = new ProcessError(e);
178  }
179  myMutex.unlock();
180  }
181 
183  void waitAll(const bool deleteFinished = true) {
184  myMutex.lock();
185  while ((int)myFinishedTasks.size() < myRunningIndex) {
186  myCondition.wait(myMutex);
187  }
188 #ifdef WORKLOAD_PROFILING
189  if (myRunningIndex > 0) {
190  const auto end = std::chrono::high_resolution_clock::now();
191  const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
192  double minLoad = std::numeric_limits<double>::max();
193  double maxLoad = 0.;
194  for (MFXWorkerThread* const worker : myWorkers) {
195  const double load = worker->endProfile(elapsed);
196  minLoad = MIN2(minLoad, load);
197  maxLoad = MAX2(maxLoad, load);
198  }
199 #ifdef WORKLOAD_INTERVAL
200  myTotalMaxLoad += maxLoad;
201  myTotalSpread += maxLoad / minLoad;
202  myNumBatches++;
203  if (myNumBatches % WORKLOAD_INTERVAL == 0) {
204  WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
205  myTotalMaxLoad = 0.;
206  myTotalSpread = 0.;
207  }
208 #endif
209  }
210 #endif
211  if (deleteFinished) {
212  for (Task* task : myFinishedTasks) {
213  delete task;
214  }
215  }
216  ProcessError* toRaise = myException;
217  myException = nullptr;
218  myFinishedTasks.clear();
219  myRunningIndex = 0;
220  myMutex.unlock();
221  if (toRaise != nullptr) {
222  ProcessError err = *toRaise;
223  delete toRaise;
224  throw err;
225  }
226  }
227 
235  bool isFull() const {
236  return myRunningIndex - (int)myFinishedTasks.size() >= size();
237  }
238 
243  int size() const {
244  return (int)myWorkers.size();
245  }
246 
248  void lock() {
249  myPoolMutex.lock();
250  }
251 
253  void unlock() {
254  myPoolMutex.unlock();
255  }
256 
257  const std::vector<MFXWorkerThread*>& getWorkers() {
258  return myWorkers;
259  }
260  private:
262  std::vector<MFXWorkerThread*> myWorkers;
264  FXMutex myMutex;
266  FXMutex myPoolMutex;
268  FXCondition myCondition;
270  std::list<Task*> myFinishedTasks;
275 #ifdef WORKLOAD_PROFILING
277  int myNumBatches;
279  double myTotalMaxLoad;
281  double myTotalSpread;
283  std::chrono::high_resolution_clock::time_point myProfileStart;
284 #endif
285  };
286 
287 public:
294  MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
295 #ifdef WORKLOAD_PROFILING
296  , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
297 #endif
298  {
299  pool.addWorker(this);
300  start();
301  }
302 
307  virtual ~MFXWorkerThread() {
308  stop();
309 #ifdef WORKLOAD_PROFILING
310  const double load = 100. * myTotalBusyTime / myTotalTime;
311  WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
312  " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
313  "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
314 #endif
315  }
316 
321  void add(Task* t) {
322  myMutex.lock();
323  myTasks.push_back(t);
324  myCondition.signal();
325  myMutex.unlock();
326  }
327 
334  FXint run() {
335  while (!myStopped) {
336  myMutex.lock();
337  while (!myStopped && myTasks.empty()) {
338  myCondition.wait(myMutex);
339  }
340  if (myStopped) {
341  myMutex.unlock();
342  break;
343  }
344  myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
345  myMutex.unlock();
346  try {
347  for (Task* const t : myCurrentTasks) {
348 #ifdef WORKLOAD_PROFILING
349  const auto before = std::chrono::high_resolution_clock::now();
350 #endif
351  t->run(this);
352 #ifdef WORKLOAD_PROFILING
353  const auto after = std::chrono::high_resolution_clock::now();
354  myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
355  myCounter++;
356 #endif
357  }
358  } catch (ProcessError& e) {
359  myPool.setException(e);
360  }
362  }
363  return 0;
364  }
365 
370  void stop() {
371  myMutex.lock();
372  myStopped = true;
373  myCondition.signal();
374  myMutex.unlock();
375  join();
376  }
377 
378 #ifdef WORKLOAD_PROFILING
379  void startProfile() {
380  myBusyTime = 0;
381  }
382 
383  double endProfile(const long long int time) {
384  myTotalTime += time;
385  myTotalBusyTime += myBusyTime;
386  return time == 0 ? 100. : 100. * myBusyTime / time;
387  }
388 #endif
389 
390 private:
394  FXMutex myMutex;
396  FXCondition myCondition;
398  std::list<Task*> myTasks;
400  std::list<Task*> myCurrentTasks;
402  bool myStopped;
403 #ifdef WORKLOAD_PROFILING
405  int myCounter;
407  long long int myBusyTime;
409  long long int myTotalBusyTime;
411  long long int myTotalTime;
412 #endif
413 };
#define WORKLOAD_INTERVAL
#define WRITE_MESSAGE(msg)
Definition: MsgHandler.h:297
T MIN2(T a, T b)
Definition: StdDefs.h:76
T MAX2(T a, T b)
Definition: StdDefs.h:82
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition: ToString.h:46
A pool of worker threads which distributes the tasks and collects the results.
std::list< Task * > myFinishedTasks
list of finished tasks
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
std::vector< MFXWorkerThread * > myWorkers
the current worker threads
bool isFull() const
Checks whether there are currently more pending tasks than threads.
void lock()
locks the pool mutex
int myRunningIndex
the running index for the next task
void clear()
Stops and deletes all worker threads.
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index....
virtual ~Pool()
Destructor.
void addFinished(std::list< Task * > &tasks)
Adds the given tasks to the list of finished tasks.
const std::vector< MFXWorkerThread * > & getWorkers()
void setException(ProcessError &e)
FXCondition myCondition
the semaphore to wait on for finishing all tasks
ProcessError * myException
the exception from a child thread
void addWorker(MFXWorkerThread *const w)
Adds the given thread to the pool.
void unlock()
unlocks the pool mutex
FXMutex myPoolMutex
the pool mutex for external sync
Pool(int numThreads=0)
Constructor.
int size() const
Returns the number of threads in the pool.
FXMutex myMutex
the internal mutex for the task list
Abstract superclass of a task to be run with an index to keep track of pending tasks.
int myIndex
the index of the task, valid only after the task has been added to the pool
virtual void run(MFXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
virtual ~Task()
Desctructor.
void setIndex(const int newIndex)
Sets the running index of this task.
A thread repeatingly calculating incoming tasks.
FXMutex myMutex
the mutex for the task list
FXint run()
Main execution method of this thread.
Pool & myPool
the pool for this thread
MFXWorkerThread(Pool &pool)
Constructor.
bool myStopped
whether we are still running
void add(Task *t)
Adds the given task to this thread to be calculated.
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
void stop()
Stops the thread.
FXCondition myCondition
the semaphore when waiting for new tasks
virtual ~MFXWorkerThread()
Destructor.
std::list< Task * > myTasks
the list of pending tasks