LCOV - code coverage report
Current view: top level - src/utils/foxtools - MFXWorkerThread.h (source / functions) Coverage Total Hit
Test: lcov.info Lines: 100.0 % 86 86
Test Date: 2025-11-13 15:38:19 Functions: 92.9 % 14 13

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2004-2025 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    MFXWorkerThread.h
      15              : /// @author  Michael Behrisch
      16              : /// @date    2014-07-13
      17              : ///
      18              : // A thread class together with a pool and a task for parallelized computation
      19              : /****************************************************************************/
      20              : #pragma once
      21              : #include <config.h>
      22              : 
      23              : // #define WORKLOAD_PROFILING
      24              : // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
      25              : // undefine to use summary report only
      26              : #define WORKLOAD_INTERVAL 100
      27              : 
      28              : #include <list>
      29              : #include <vector>
      30              : #include "fxheader.h"
      31              : #ifdef WORKLOAD_PROFILING
      32              : #include <chrono>
      33              : #include <utils/common/MsgHandler.h>
      34              : #include <utils/common/ToString.h>
      35              : #endif
      36              : #include <utils/common/UtilExceptions.h>
      37              : 
      38              : 
      39              : // ===========================================================================
      40              : // class definitions
      41              : // ===========================================================================
      42              : /**
      43              :  * @class MFXWorkerThread
      44              :  * @brief A thread repeatingly calculating incoming tasks
      45              :  */
      46              : class MFXWorkerThread : public FXThread {
      47              : 
      48              : public:
      49              :     /**
      50              :      * @class MFXWorkerThread::Task
      51              :      * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks
      52              :      */
      53              :     class Task {
      54              :     public:
      55              :         /// @brief Desctructor
      56              :         virtual ~Task() {};
      57              : 
      58              :         /** @brief Abstract method which in subclasses should contain the computations to be performed.
      59              :          *
      60              :          * If there is data to be shared among several tasks (but not among several threads) it can be put in the
      61              :          *  a thread class subclassing the MFXWorkerThread. the instance of the thread is then made available
      62              :          *  via the context parameter.
      63              :          *
      64              :          * @param[in] context The thread which runs the task
      65              :          */
      66              :         virtual void run(MFXWorkerThread* context) = 0;
      67              : 
      68              :         /** @brief Sets the running index of this task.
      69              :          *
      70              :          * Every task receive an index which is unique among all pending tasks of the same thread pool.
      71              :          *
      72              :          * @param[in] newIndex the index to assign
      73              :          */
      74              :         void setIndex(const int newIndex) {
      75     28965437 :             myIndex = newIndex;
      76              :         }
      77              :     private:
      78              :         /// @brief the index of the task, valid only after the task has been added to the pool
      79              :         int myIndex;
      80              :     };
      81              : 
      82              :     /**
      83              :      * @class MFXWorkerThread::Pool
      84              :      * @brief A pool of worker threads which distributes the tasks and collects the results
      85              :      */
      86              :     class Pool {
      87              :     public:
      88              :         /** @brief Constructor
      89              :          *
      90              :          * May initialize the pool with a given number of workers.
      91              :          *
      92              :          * @param[in] numThreads the number of threads to create
      93              :          */
      94        42868 :         Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
      95              : #ifdef WORKLOAD_PROFILING
      96              :             , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
      97              : #endif
      98              :         {
      99              : #ifdef WORKLOAD_PROFILING
     100              :             long long int timeDiff = 0;
     101              :             for (int i = 0; i < 100; i++) {
     102              :                 const auto begin = std::chrono::high_resolution_clock::now();
     103              :                 const auto end = std::chrono::high_resolution_clock::now();
     104              :                 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
     105              :             }
     106              :             //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
     107              : #endif
     108        42876 :             while (numThreads > 0) {
     109            8 :                 new MFXWorkerThread(*this);
     110            8 :                 numThreads--;
     111              :             }
     112        42868 :         }
     113              : 
     114              :         /** @brief Destructor
     115              :          *
     116              :          * Stopping and deleting all workers by calling clear.
     117              :          */
     118        42425 :         virtual ~Pool() {
     119        42425 :             clear();
     120        42425 :         }
     121              : 
     122              :         /** @brief Stops and deletes all worker threads.
     123              :          */
     124        80608 :         void clear() {
     125        97801 :             for (MFXWorkerThread* const worker : myWorkers) {
     126        17193 :                 delete worker;
     127              :             }
     128              :             myWorkers.clear();
     129        80608 :         }
     130              : 
     131              :         /** @brief Adds the given thread to the pool.
     132              :          *
     133              :          * @param[in] w the thread to add
     134              :          */
     135              :         void addWorker(MFXWorkerThread* const w) {
     136        17477 :             myWorkers.push_back(w);
     137        17477 :         }
     138              : 
     139              :         /** @brief Gives a number to the given task and assigns it to the worker with the given index.
     140              :          * If the index is negative, assign to the next (round robin) one.
     141              :          *
     142              :          * @param[in] t the task to add
     143              :          * @param[in] index index of the worker thread to use or -1 for an arbitrary one
     144              :          */
     145     28965437 :         void add(Task* const t, int index = -1) {
     146     28965437 :             if (index < 0) {
     147       187739 :                 index = myRunningIndex % myWorkers.size();
     148              :             }
     149              : #ifdef WORKLOAD_PROFILING
     150              :             if (myRunningIndex == 0) {
     151              :                 for (MFXWorkerThread* const worker : myWorkers) {
     152              :                     worker->startProfile();
     153              :                 }
     154              :                 myProfileStart = std::chrono::high_resolution_clock::now();
     155              :             }
     156              : #endif
     157     28965437 :             t->setIndex(myRunningIndex++);
     158     28965437 :             myWorkers[index]->add(t);
     159     28965437 :         }
     160              : 
     161              :         /** @brief Adds the given tasks to the list of finished tasks.
     162              :          *
     163              :          * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only.
     164              :          *
     165              :          * @param[in] tasks the tasks to add
     166              :          */
     167     15529943 :         void addFinished(std::list<Task*>& tasks) {
     168     15529943 :             myMutex.lock();
     169              :             myFinishedTasks.splice(myFinishedTasks.end(), tasks);
     170     15529943 :             myCondition.signal();
     171     15529943 :             myMutex.unlock();
     172     15529943 :         }
     173              : 
     174           25 :         void setException(ProcessError& e) {
     175           25 :             myMutex.lock();
     176           25 :             if (myException == nullptr) {
     177           25 :                 myException = new ProcessError(e);
     178              :             }
     179           25 :             myMutex.unlock();
     180           25 :         }
     181              : 
     182              :         /// @brief waits for all tasks to be finished
     183     21525171 :         void waitAll(const bool deleteFinished = true) {
     184     21525171 :             myMutex.lock();
     185     33812617 :             while ((int)myFinishedTasks.size() < myRunningIndex) {
     186     12287446 :                 myCondition.wait(myMutex);
     187              :             }
     188              : #ifdef WORKLOAD_PROFILING
     189              :             if (myRunningIndex > 0) {
     190              :                 const auto end = std::chrono::high_resolution_clock::now();
     191              :                 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
     192              :                 double minLoad = std::numeric_limits<double>::max();
     193              :                 double maxLoad = 0.;
     194              :                 for (MFXWorkerThread* const worker : myWorkers) {
     195              :                     const double load = worker->endProfile(elapsed);
     196              :                     minLoad = MIN2(minLoad, load);
     197              :                     maxLoad = MAX2(maxLoad, load);
     198              :                 }
     199              : #ifdef WORKLOAD_INTERVAL
     200              :                 myTotalMaxLoad += maxLoad;
     201              :                 myTotalSpread += maxLoad / minLoad;
     202              :                 myNumBatches++;
     203              :                 if (myNumBatches % WORKLOAD_INTERVAL == 0) {
     204              :                     WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
     205              :                     myTotalMaxLoad = 0.;
     206              :                     myTotalSpread = 0.;
     207              :                 }
     208              : #endif
     209              :             }
     210              : #endif
     211     21525171 :             if (deleteFinished) {
     212     10981341 :                 for (Task* task : myFinishedTasks) {
     213       191419 :                     delete task;
     214              :                 }
     215              :             }
     216     21525171 :             ProcessError* toRaise = myException;
     217     21525171 :             myException = nullptr;
     218              :             myFinishedTasks.clear();
     219     21525171 :             myRunningIndex = 0;
     220     21525171 :             myMutex.unlock();
     221     21525171 :             if (toRaise != nullptr) {
     222           25 :                 ProcessError err = *toRaise;
     223           25 :                 delete toRaise;
     224           50 :                 throw err;
     225              :             }
     226     21525146 :         }
     227              : 
     228              :         /** @brief Checks whether there are currently more pending tasks than threads.
     229              :          *
     230              :          * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the
     231              :          *  number of tasks is large.
     232              :          *
     233              :          * @return whether there are enough tasks to let all threads work
     234              :          */
     235              :         bool isFull() const {
     236           31 :             return myRunningIndex - (int)myFinishedTasks.size() >= size();
     237              :         }
     238              : 
     239              :         /** @brief Returns the number of threads in the pool.
     240              :          *
     241              :          * @return the number of threads
     242              :          */
     243              :         int size() const {
     244    158914344 :             return (int)myWorkers.size();
     245              :         }
     246              : 
     247              :         /// @brief locks the pool mutex
     248              :         void lock() {
     249              :             myPoolMutex.lock();
     250              :         }
     251              : 
     252              :         /// @brief unlocks the pool mutex
     253              :         void unlock() {
     254              :             myPoolMutex.unlock();
     255              :         }
     256              : 
     257              :         const std::vector<MFXWorkerThread*>& getWorkers() {
     258              :             return myWorkers;
     259              :         }
     260              :     private:
     261              :         /// @brief the current worker threads
     262              :         std::vector<MFXWorkerThread*> myWorkers;
     263              :         /// @brief the internal mutex for the task list
     264              :         FXMutex myMutex;
     265              :         /// @brief the pool mutex for external sync
     266              :         FXMutex myPoolMutex;
     267              :         /// @brief the semaphore to wait on for finishing all tasks
     268              :         FXCondition myCondition;
     269              :         /// @brief list of finished tasks
     270              :         std::list<Task*> myFinishedTasks;
     271              :         /// @brief the running index for the next task
     272              :         int myRunningIndex;
     273              :         /// @brief the exception from a child thread
     274              :         ProcessError* myException;
     275              : #ifdef WORKLOAD_PROFILING
     276              :         /// @brief the number of finished batch runs
     277              :         int myNumBatches;
     278              :         /// @brief the sum over the maximum loads
     279              :         double myTotalMaxLoad;
     280              :         /// @brief the sum over the load spreads
     281              :         double myTotalSpread;
     282              :         /// @brief the time when profiling started
     283              :         std::chrono::high_resolution_clock::time_point myProfileStart;
     284              : #endif
     285              :     };
     286              : 
     287              : public:
     288              :     /** @brief Constructor
     289              :      *
     290              :      * Adds the thread to the given pool and starts it.
     291              :      *
     292              :      * @param[in] pool the pool for this thread
     293              :      */
     294        17477 :     MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
     295              : #ifdef WORKLOAD_PROFILING
     296              :         , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
     297              : #endif
     298              :     {
     299        17477 :         pool.addWorker(this);
     300        17477 :         start();
     301        17477 :     }
     302              : 
     303              :     /** @brief Destructor
     304              :      *
     305              :      * Stops the thread by calling stop.
     306              :      */
     307        17201 :     virtual ~MFXWorkerThread() {
     308        17193 :         stop();
     309              : #ifdef WORKLOAD_PROFILING
     310              :         const double load = 100. * myTotalBusyTime / myTotalTime;
     311              :         WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
     312              :                       " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
     313              :                       "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
     314              : #endif
     315        17201 :     }
     316              : 
     317              :     /** @brief Adds the given task to this thread to be calculated
     318              :      *
     319              :      * @param[in] t the task to add
     320              :      */
     321     28965437 :     void add(Task* t) {
     322     28965437 :         myMutex.lock();
     323     28965437 :         myTasks.push_back(t);
     324     28965437 :         myCondition.signal();
     325     28965437 :         myMutex.unlock();
     326     28965437 :     }
     327              : 
     328              :     /** @brief Main execution method of this thread.
     329              :      *
     330              :      * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped.
     331              :      *
     332              :      * @return always 0
     333              :      */
     334        17477 :     FXint run() {
     335     15547420 :         while (!myStopped) {
     336     15547366 :             myMutex.lock();
     337     29971463 :             while (!myStopped && myTasks.empty()) {
     338     14424381 :                 myCondition.wait(myMutex);
     339              :             }
     340     15547082 :             if (myStopped) {
     341        17139 :                 myMutex.unlock();
     342        17139 :                 break;
     343              :             }
     344              :             myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
     345     15529943 :             myMutex.unlock();
     346              :             try {
     347     44495355 :                 for (Task* const t : myCurrentTasks) {
     348              : #ifdef WORKLOAD_PROFILING
     349              :                     const auto before = std::chrono::high_resolution_clock::now();
     350              : #endif
     351     28965437 :                     t->run(this);
     352              : #ifdef WORKLOAD_PROFILING
     353              :                     const auto after = std::chrono::high_resolution_clock::now();
     354              :                     myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
     355              :                     myCounter++;
     356              : #endif
     357              :                 }
     358           25 :             } catch (ProcessError& e) {
     359           25 :                 myPool.setException(e);
     360           25 :             }
     361     15529943 :             myPool.addFinished(myCurrentTasks);
     362              :         }
     363        17193 :         return 0;
     364              :     }
     365              : 
     366              :     /** @brief Stops the thread
     367              :      *
     368              :      * The currently running task will be finished but all further tasks are discarded.
     369              :      */
     370        34370 :     void stop() {
     371        34370 :         myMutex.lock();
     372        34370 :         myStopped = true;
     373        34370 :         myCondition.signal();
     374        34370 :         myMutex.unlock();
     375        34370 :         join();
     376        34370 :     }
     377              : 
     378              : #ifdef WORKLOAD_PROFILING
     379              :     void startProfile() {
     380              :         myBusyTime = 0;
     381              :     }
     382              : 
     383              :     double endProfile(const long long int time) {
     384              :         myTotalTime += time;
     385              :         myTotalBusyTime += myBusyTime;
     386              :         return time == 0 ? 100. : 100. * myBusyTime / time;
     387              :     }
     388              : #endif
     389              : 
     390              : private:
     391              :     /// @brief the pool for this thread
     392              :     Pool& myPool;
     393              :     /// @brief the mutex for the task list
     394              :     FXMutex myMutex;
     395              :     /// @brief the semaphore when waiting for new tasks
     396              :     FXCondition myCondition;
     397              :     /// @brief the list of pending tasks
     398              :     std::list<Task*> myTasks;
     399              :     /// @brief the list of tasks which are currently calculated
     400              :     std::list<Task*> myCurrentTasks;
     401              :     /// @brief whether we are still running
     402              :     bool myStopped;
     403              : #ifdef WORKLOAD_PROFILING
     404              :     /// @brief counting completed tasks
     405              :     int myCounter;
     406              :     /// @brief the time spent in calculations during the current batch
     407              :     long long int myBusyTime;
     408              :     /// @brief the total busy time
     409              :     long long int myTotalBusyTime;
     410              :     /// @brief the total time while anyone had tasks
     411              :     long long int myTotalTime;
     412              : #endif
     413              : };
        

Generated by: LCOV version 2.0-1