Line data Source code
1 : /****************************************************************************/ 2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo 3 : // Copyright (C) 2004-2024 German Aerospace Center (DLR) and others. 4 : // This program and the accompanying materials are made available under the 5 : // terms of the Eclipse Public License 2.0 which is available at 6 : // https://www.eclipse.org/legal/epl-2.0/ 7 : // This Source Code may also be made available under the following Secondary 8 : // Licenses when the conditions for such availability set forth in the Eclipse 9 : // Public License 2.0 are satisfied: GNU General Public License, version 2 10 : // or later which is available at 11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html 12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later 13 : /****************************************************************************/ 14 : /// @file MFXWorkerThread.h 15 : /// @author Michael Behrisch 16 : /// @date 2014-07-13 17 : /// 18 : // A thread class together with a pool and a task for parallelized computation 19 : /****************************************************************************/ 20 : #pragma once 21 : #include <config.h> 22 : 23 : // #define WORKLOAD_PROFILING 24 : // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING 25 : // undefine to use summary report only 26 : #define WORKLOAD_INTERVAL 100 27 : 28 : #include <list> 29 : #include <vector> 30 : #include "fxheader.h" 31 : #ifdef WORKLOAD_PROFILING 32 : #include <chrono> 33 : #include <utils/common/MsgHandler.h> 34 : #include <utils/common/ToString.h> 35 : #endif 36 : #include <utils/common/UtilExceptions.h> 37 : 38 : 39 : // =========================================================================== 40 : // class definitions 41 : // =========================================================================== 42 : /** 43 : * @class MFXWorkerThread 44 : * @brief A thread repeatingly calculating incoming tasks 45 : */ 46 : class MFXWorkerThread : public FXThread { 47 : 48 : public: 49 : /** 50 : * @class MFXWorkerThread::Task 51 : * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks 52 : */ 53 : class Task { 54 : public: 55 : /// @brief Desctructor 56 : virtual ~Task() {}; 57 : 58 : /** @brief Abstract method which in subclasses should contain the computations to be performed. 59 : * 60 : * If there is data to be shared among several tasks (but not among several threads) it can be put in the 61 : * a thread class subclassing the MFXWorkerThread. the instance of the thread is then made available 62 : * via the context parameter. 63 : * 64 : * @param[in] context The thread which runs the task 65 : */ 66 : virtual void run(MFXWorkerThread* context) = 0; 67 : 68 : /** @brief Sets the running index of this task. 69 : * 70 : * Every task receive an index which is unique among all pending tasks of the same thread pool. 71 : * 72 : * @param[in] newIndex the index to assign 73 : */ 74 : void setIndex(const int newIndex) { 75 23051799 : myIndex = newIndex; 76 : } 77 : private: 78 : /// @brief the index of the task, valid only after the task has been added to the pool 79 : int myIndex; 80 : }; 81 : 82 : /** 83 : * @class MFXWorkerThread::Pool 84 : * @brief A pool of worker threads which distributes the tasks and collects the results 85 : */ 86 : class Pool { 87 : public: 88 : /** @brief Constructor 89 : * 90 : * May initialize the pool with a given number of workers. 91 : * 92 : * @param[in] numThreads the number of threads to create 93 : */ 94 39744 : Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr) 95 : #ifdef WORKLOAD_PROFILING 96 : , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.) 97 : #endif 98 : { 99 : #ifdef WORKLOAD_PROFILING 100 : long long int timeDiff = 0; 101 : for (int i = 0; i < 100; i++) { 102 : const auto begin = std::chrono::high_resolution_clock::now(); 103 : const auto end = std::chrono::high_resolution_clock::now(); 104 : timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count(); 105 : } 106 : //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl; 107 : #endif 108 39760 : while (numThreads > 0) { 109 16 : new MFXWorkerThread(*this); 110 16 : numThreads--; 111 : } 112 39744 : } 113 : 114 : /** @brief Destructor 115 : * 116 : * Stopping and deleting all workers by calling clear. 117 : */ 118 39267 : virtual ~Pool() { 119 39267 : clear(); 120 39267 : } 121 : 122 : /** @brief Stops and deletes all worker threads. 123 : */ 124 73488 : void clear() { 125 88701 : for (MFXWorkerThread* const worker : myWorkers) { 126 15213 : delete worker; 127 : } 128 : myWorkers.clear(); 129 73488 : } 130 : 131 : /** @brief Adds the given thread to the pool. 132 : * 133 : * @param[in] w the thread to add 134 : */ 135 : void addWorker(MFXWorkerThread* const w) { 136 15477 : myWorkers.push_back(w); 137 15477 : } 138 : 139 : /** @brief Gives a number to the given task and assigns it to the worker with the given index. 140 : * If the index is negative, assign to the next (round robin) one. 141 : * 142 : * @param[in] t the task to add 143 : * @param[in] index index of the worker thread to use or -1 for an arbitrary one 144 : */ 145 23051799 : void add(Task* const t, int index = -1) { 146 23051799 : if (index < 0) { 147 130743 : index = myRunningIndex % myWorkers.size(); 148 : } 149 : #ifdef WORKLOAD_PROFILING 150 : if (myRunningIndex == 0) { 151 : for (MFXWorkerThread* const worker : myWorkers) { 152 : worker->startProfile(); 153 : } 154 : myProfileStart = std::chrono::high_resolution_clock::now(); 155 : } 156 : #endif 157 23051799 : t->setIndex(myRunningIndex++); 158 23051799 : myWorkers[index]->add(t); 159 23051799 : } 160 : 161 : /** @brief Adds the given tasks to the list of finished tasks. 162 : * 163 : * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only. 164 : * 165 : * @param[in] tasks the tasks to add 166 : */ 167 12736015 : void addFinished(std::list<Task*>& tasks) { 168 12736015 : myMutex.lock(); 169 : myFinishedTasks.splice(myFinishedTasks.end(), tasks); 170 12736015 : myCondition.signal(); 171 12736015 : myMutex.unlock(); 172 12736015 : } 173 : 174 24 : void setException(ProcessError& e) { 175 24 : myMutex.lock(); 176 24 : if (myException == nullptr) { 177 48 : myException = new ProcessError(e); 178 : } 179 24 : myMutex.unlock(); 180 24 : } 181 : 182 : /// @brief waits for all tasks to be finished 183 13277210 : void waitAll(const bool deleteFinished = true) { 184 13277210 : myMutex.lock(); 185 20111778 : while ((int)myFinishedTasks.size() < myRunningIndex) { 186 6834568 : myCondition.wait(myMutex); 187 : } 188 : #ifdef WORKLOAD_PROFILING 189 : if (myRunningIndex > 0) { 190 : const auto end = std::chrono::high_resolution_clock::now(); 191 : const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count(); 192 : double minLoad = std::numeric_limits<double>::max(); 193 : double maxLoad = 0.; 194 : for (MFXWorkerThread* const worker : myWorkers) { 195 : const double load = worker->endProfile(elapsed); 196 : minLoad = MIN2(minLoad, load); 197 : maxLoad = MAX2(maxLoad, load); 198 : } 199 : #ifdef WORKLOAD_INTERVAL 200 : myTotalMaxLoad += maxLoad; 201 : myTotalSpread += maxLoad / minLoad; 202 : myNumBatches++; 203 : if (myNumBatches % WORKLOAD_INTERVAL == 0) { 204 : WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL)); 205 : myTotalMaxLoad = 0.; 206 : myTotalSpread = 0.; 207 : } 208 : #endif 209 : } 210 : #endif 211 13277210 : if (deleteFinished) { 212 6793705 : for (Task* task : myFinishedTasks) { 213 131007 : delete task; 214 : } 215 : } 216 13277210 : ProcessError* toRaise = myException; 217 13277210 : myException = nullptr; 218 : myFinishedTasks.clear(); 219 13277210 : myRunningIndex = 0; 220 13277210 : myMutex.unlock(); 221 13277210 : if (toRaise != nullptr) { 222 : ProcessError err = *toRaise; 223 24 : delete toRaise; 224 48 : throw err; 225 : } 226 13277186 : } 227 : 228 : /** @brief Checks whether there are currently more pending tasks than threads. 229 : * 230 : * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the 231 : * number of tasks is large. 232 : * 233 : * @return whether there are enough tasks to let all threads work 234 : */ 235 : bool isFull() const { 236 30 : return myRunningIndex - (int)myFinishedTasks.size() >= size(); 237 : } 238 : 239 : /** @brief Returns the number of threads in the pool. 240 : * 241 : * @return the number of threads 242 : */ 243 : int size() const { 244 175718753 : return (int)myWorkers.size(); 245 : } 246 : 247 : /// @brief locks the pool mutex 248 : void lock() { 249 : myPoolMutex.lock(); 250 : } 251 : 252 : /// @brief unlocks the pool mutex 253 : void unlock() { 254 : myPoolMutex.unlock(); 255 : } 256 : 257 : const std::vector<MFXWorkerThread*>& getWorkers() { 258 : return myWorkers; 259 : } 260 : private: 261 : /// @brief the current worker threads 262 : std::vector<MFXWorkerThread*> myWorkers; 263 : /// @brief the internal mutex for the task list 264 : FXMutex myMutex; 265 : /// @brief the pool mutex for external sync 266 : FXMutex myPoolMutex; 267 : /// @brief the semaphore to wait on for finishing all tasks 268 : FXCondition myCondition; 269 : /// @brief list of finished tasks 270 : std::list<Task*> myFinishedTasks; 271 : /// @brief the running index for the next task 272 : int myRunningIndex; 273 : /// @brief the exception from a child thread 274 : ProcessError* myException; 275 : #ifdef WORKLOAD_PROFILING 276 : /// @brief the number of finished batch runs 277 : int myNumBatches; 278 : /// @brief the sum over the maximum loads 279 : double myTotalMaxLoad; 280 : /// @brief the sum over the load spreads 281 : double myTotalSpread; 282 : /// @brief the time when profiling started 283 : std::chrono::high_resolution_clock::time_point myProfileStart; 284 : #endif 285 : }; 286 : 287 : public: 288 : /** @brief Constructor 289 : * 290 : * Adds the thread to the given pool and starts it. 291 : * 292 : * @param[in] pool the pool for this thread 293 : */ 294 15477 : MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false) 295 : #ifdef WORKLOAD_PROFILING 296 : , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0) 297 : #endif 298 : { 299 15477 : pool.addWorker(this); 300 15477 : start(); 301 15477 : } 302 : 303 : /** @brief Destructor 304 : * 305 : * Stops the thread by calling stop. 306 : */ 307 15229 : virtual ~MFXWorkerThread() { 308 15213 : stop(); 309 : #ifdef WORKLOAD_PROFILING 310 : const double load = 100. * myTotalBusyTime / myTotalTime; 311 : WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) + 312 : " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) + 313 : "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task."); 314 : #endif 315 15229 : } 316 : 317 : /** @brief Adds the given task to this thread to be calculated 318 : * 319 : * @param[in] t the task to add 320 : */ 321 23051799 : void add(Task* t) { 322 23051799 : myMutex.lock(); 323 23051799 : myTasks.push_back(t); 324 23051799 : myCondition.signal(); 325 23051799 : myMutex.unlock(); 326 23051799 : } 327 : 328 : /** @brief Main execution method of this thread. 329 : * 330 : * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped. 331 : * 332 : * @return always 0 333 : */ 334 15472 : FXint run() { 335 12751487 : while (!myStopped) { 336 12750656 : myMutex.lock(); 337 24160261 : while (!myStopped && myTasks.empty()) { 338 11409864 : myCondition.wait(myMutex); 339 : } 340 12750397 : if (myStopped) { 341 14382 : myMutex.unlock(); 342 14382 : break; 343 : } 344 12736015 : myCurrentTasks.splice(myCurrentTasks.end(), myTasks); 345 12736015 : myMutex.unlock(); 346 : try { 347 35787790 : for (Task* const t : myCurrentTasks) { 348 : #ifdef WORKLOAD_PROFILING 349 : const auto before = std::chrono::high_resolution_clock::now(); 350 : #endif 351 23051799 : t->run(this); 352 : #ifdef WORKLOAD_PROFILING 353 : const auto after = std::chrono::high_resolution_clock::now(); 354 : myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count(); 355 : myCounter++; 356 : #endif 357 : } 358 24 : } catch (ProcessError& e) { 359 24 : myPool.setException(e); 360 24 : } 361 12736015 : myPool.addFinished(myCurrentTasks); 362 : } 363 15213 : return 0; 364 : } 365 : 366 : /** @brief Stops the thread 367 : * 368 : * The currently running task will be finished but all further tasks are discarded. 369 : */ 370 30406 : void stop() { 371 30406 : myMutex.lock(); 372 30406 : myStopped = true; 373 30406 : myCondition.signal(); 374 30406 : myMutex.unlock(); 375 30406 : join(); 376 30406 : } 377 : 378 : #ifdef WORKLOAD_PROFILING 379 : void startProfile() { 380 : myBusyTime = 0; 381 : } 382 : 383 : double endProfile(const long long int time) { 384 : myTotalTime += time; 385 : myTotalBusyTime += myBusyTime; 386 : return time == 0 ? 100. : 100. * myBusyTime / time; 387 : } 388 : #endif 389 : 390 : private: 391 : /// @brief the pool for this thread 392 : Pool& myPool; 393 : /// @brief the mutex for the task list 394 : FXMutex myMutex; 395 : /// @brief the semaphore when waiting for new tasks 396 : FXCondition myCondition; 397 : /// @brief the list of pending tasks 398 : std::list<Task*> myTasks; 399 : /// @brief the list of tasks which are currently calculated 400 : std::list<Task*> myCurrentTasks; 401 : /// @brief whether we are still running 402 : bool myStopped; 403 : #ifdef WORKLOAD_PROFILING 404 : /// @brief counting completed tasks 405 : int myCounter; 406 : /// @brief the time spent in calculations during the current batch 407 : long long int myBusyTime; 408 : /// @brief the total busy time 409 : long long int myTotalBusyTime; 410 : /// @brief the total time while anyone had tasks 411 : long long int myTotalTime; 412 : #endif 413 : };