LCOV - code coverage report
Current view: top level - src/utils/foxtools - MFXWorkerThread.h (source / functions) Hit Total Coverage
Test: lcov.info Lines: 86 86 100.0 %
Date: 2024-04-27 15:34:54 Functions: 13 14 92.9 %

          Line data    Source code
       1             : /****************************************************************************/
       2             : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3             : // Copyright (C) 2004-2024 German Aerospace Center (DLR) and others.
       4             : // This program and the accompanying materials are made available under the
       5             : // terms of the Eclipse Public License 2.0 which is available at
       6             : // https://www.eclipse.org/legal/epl-2.0/
       7             : // This Source Code may also be made available under the following Secondary
       8             : // Licenses when the conditions for such availability set forth in the Eclipse
       9             : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10             : // or later which is available at
      11             : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12             : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13             : /****************************************************************************/
      14             : /// @file    MFXWorkerThread.h
      15             : /// @author  Michael Behrisch
      16             : /// @date    2014-07-13
      17             : ///
      18             : // A thread class together with a pool and a task for parallelized computation
      19             : /****************************************************************************/
      20             : #pragma once
      21             : #include <config.h>
      22             : 
      23             : // #define WORKLOAD_PROFILING
      24             : // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
      25             : // undefine to use summary report only
      26             : #define WORKLOAD_INTERVAL 100
      27             : 
      28             : #include <list>
      29             : #include <vector>
      30             : #include "fxheader.h"
      31             : #ifdef WORKLOAD_PROFILING
      32             : #include <chrono>
      33             : #include <utils/common/MsgHandler.h>
      34             : #include <utils/common/ToString.h>
      35             : #endif
      36             : #include <utils/common/UtilExceptions.h>
      37             : 
      38             : 
      39             : // ===========================================================================
      40             : // class definitions
      41             : // ===========================================================================
      42             : /**
      43             :  * @class MFXWorkerThread
      44             :  * @brief A thread repeatingly calculating incoming tasks
      45             :  */
      46             : class MFXWorkerThread : public FXThread {
      47             : 
      48             : public:
      49             :     /**
      50             :      * @class MFXWorkerThread::Task
      51             :      * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks
      52             :      */
      53             :     class Task {
      54             :     public:
      55             :         /// @brief Desctructor
      56             :         virtual ~Task() {};
      57             : 
      58             :         /** @brief Abstract method which in subclasses should contain the computations to be performed.
      59             :          *
      60             :          * If there is data to be shared among several tasks (but not among several threads) it can be put in the
      61             :          *  a thread class subclassing the MFXWorkerThread. the instance of the thread is then made available
      62             :          *  via the context parameter.
      63             :          *
      64             :          * @param[in] context The thread which runs the task
      65             :          */
      66             :         virtual void run(MFXWorkerThread* context) = 0;
      67             : 
      68             :         /** @brief Sets the running index of this task.
      69             :          *
      70             :          * Every task receive an index which is unique among all pending tasks of the same thread pool.
      71             :          *
      72             :          * @param[in] newIndex the index to assign
      73             :          */
      74             :         void setIndex(const int newIndex) {
      75    23051799 :             myIndex = newIndex;
      76             :         }
      77             :     private:
      78             :         /// @brief the index of the task, valid only after the task has been added to the pool
      79             :         int myIndex;
      80             :     };
      81             : 
      82             :     /**
      83             :      * @class MFXWorkerThread::Pool
      84             :      * @brief A pool of worker threads which distributes the tasks and collects the results
      85             :      */
      86             :     class Pool {
      87             :     public:
      88             :         /** @brief Constructor
      89             :          *
      90             :          * May initialize the pool with a given number of workers.
      91             :          *
      92             :          * @param[in] numThreads the number of threads to create
      93             :          */
      94       39744 :         Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
      95             : #ifdef WORKLOAD_PROFILING
      96             :             , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
      97             : #endif
      98             :         {
      99             : #ifdef WORKLOAD_PROFILING
     100             :             long long int timeDiff = 0;
     101             :             for (int i = 0; i < 100; i++) {
     102             :                 const auto begin = std::chrono::high_resolution_clock::now();
     103             :                 const auto end = std::chrono::high_resolution_clock::now();
     104             :                 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
     105             :             }
     106             :             //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
     107             : #endif
     108       39760 :             while (numThreads > 0) {
     109          16 :                 new MFXWorkerThread(*this);
     110          16 :                 numThreads--;
     111             :             }
     112       39744 :         }
     113             : 
     114             :         /** @brief Destructor
     115             :          *
     116             :          * Stopping and deleting all workers by calling clear.
     117             :          */
     118       39267 :         virtual ~Pool() {
     119       39267 :             clear();
     120       39267 :         }
     121             : 
     122             :         /** @brief Stops and deletes all worker threads.
     123             :          */
     124       73488 :         void clear() {
     125       88701 :             for (MFXWorkerThread* const worker : myWorkers) {
     126       15213 :                 delete worker;
     127             :             }
     128             :             myWorkers.clear();
     129       73488 :         }
     130             : 
     131             :         /** @brief Adds the given thread to the pool.
     132             :          *
     133             :          * @param[in] w the thread to add
     134             :          */
     135             :         void addWorker(MFXWorkerThread* const w) {
     136       15477 :             myWorkers.push_back(w);
     137       15477 :         }
     138             : 
     139             :         /** @brief Gives a number to the given task and assigns it to the worker with the given index.
     140             :          * If the index is negative, assign to the next (round robin) one.
     141             :          *
     142             :          * @param[in] t the task to add
     143             :          * @param[in] index index of the worker thread to use or -1 for an arbitrary one
     144             :          */
     145    23051799 :         void add(Task* const t, int index = -1) {
     146    23051799 :             if (index < 0) {
     147      130743 :                 index = myRunningIndex % myWorkers.size();
     148             :             }
     149             : #ifdef WORKLOAD_PROFILING
     150             :             if (myRunningIndex == 0) {
     151             :                 for (MFXWorkerThread* const worker : myWorkers) {
     152             :                     worker->startProfile();
     153             :                 }
     154             :                 myProfileStart = std::chrono::high_resolution_clock::now();
     155             :             }
     156             : #endif
     157    23051799 :             t->setIndex(myRunningIndex++);
     158    23051799 :             myWorkers[index]->add(t);
     159    23051799 :         }
     160             : 
     161             :         /** @brief Adds the given tasks to the list of finished tasks.
     162             :          *
     163             :          * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only.
     164             :          *
     165             :          * @param[in] tasks the tasks to add
     166             :          */
     167    12736015 :         void addFinished(std::list<Task*>& tasks) {
     168    12736015 :             myMutex.lock();
     169             :             myFinishedTasks.splice(myFinishedTasks.end(), tasks);
     170    12736015 :             myCondition.signal();
     171    12736015 :             myMutex.unlock();
     172    12736015 :         }
     173             : 
     174          24 :         void setException(ProcessError& e) {
     175          24 :             myMutex.lock();
     176          24 :             if (myException == nullptr) {
     177          48 :                 myException = new ProcessError(e);
     178             :             }
     179          24 :             myMutex.unlock();
     180          24 :         }
     181             : 
     182             :         /// @brief waits for all tasks to be finished
     183    13277210 :         void waitAll(const bool deleteFinished = true) {
     184    13277210 :             myMutex.lock();
     185    20111778 :             while ((int)myFinishedTasks.size() < myRunningIndex) {
     186     6834568 :                 myCondition.wait(myMutex);
     187             :             }
     188             : #ifdef WORKLOAD_PROFILING
     189             :             if (myRunningIndex > 0) {
     190             :                 const auto end = std::chrono::high_resolution_clock::now();
     191             :                 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
     192             :                 double minLoad = std::numeric_limits<double>::max();
     193             :                 double maxLoad = 0.;
     194             :                 for (MFXWorkerThread* const worker : myWorkers) {
     195             :                     const double load = worker->endProfile(elapsed);
     196             :                     minLoad = MIN2(minLoad, load);
     197             :                     maxLoad = MAX2(maxLoad, load);
     198             :                 }
     199             : #ifdef WORKLOAD_INTERVAL
     200             :                 myTotalMaxLoad += maxLoad;
     201             :                 myTotalSpread += maxLoad / minLoad;
     202             :                 myNumBatches++;
     203             :                 if (myNumBatches % WORKLOAD_INTERVAL == 0) {
     204             :                     WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
     205             :                     myTotalMaxLoad = 0.;
     206             :                     myTotalSpread = 0.;
     207             :                 }
     208             : #endif
     209             :             }
     210             : #endif
     211    13277210 :             if (deleteFinished) {
     212     6793705 :                 for (Task* task : myFinishedTasks) {
     213      131007 :                     delete task;
     214             :                 }
     215             :             }
     216    13277210 :             ProcessError* toRaise = myException;
     217    13277210 :             myException = nullptr;
     218             :             myFinishedTasks.clear();
     219    13277210 :             myRunningIndex = 0;
     220    13277210 :             myMutex.unlock();
     221    13277210 :             if (toRaise != nullptr) {
     222             :                 ProcessError err = *toRaise;
     223          24 :                 delete toRaise;
     224          48 :                 throw err;
     225             :             }
     226    13277186 :         }
     227             : 
     228             :         /** @brief Checks whether there are currently more pending tasks than threads.
     229             :          *
     230             :          * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the
     231             :          *  number of tasks is large.
     232             :          *
     233             :          * @return whether there are enough tasks to let all threads work
     234             :          */
     235             :         bool isFull() const {
     236          30 :             return myRunningIndex - (int)myFinishedTasks.size() >= size();
     237             :         }
     238             : 
     239             :         /** @brief Returns the number of threads in the pool.
     240             :          *
     241             :          * @return the number of threads
     242             :          */
     243             :         int size() const {
     244   175718753 :             return (int)myWorkers.size();
     245             :         }
     246             : 
     247             :         /// @brief locks the pool mutex
     248             :         void lock() {
     249             :             myPoolMutex.lock();
     250             :         }
     251             : 
     252             :         /// @brief unlocks the pool mutex
     253             :         void unlock() {
     254             :             myPoolMutex.unlock();
     255             :         }
     256             : 
     257             :         const std::vector<MFXWorkerThread*>& getWorkers() {
     258             :             return myWorkers;
     259             :         }
     260             :     private:
     261             :         /// @brief the current worker threads
     262             :         std::vector<MFXWorkerThread*> myWorkers;
     263             :         /// @brief the internal mutex for the task list
     264             :         FXMutex myMutex;
     265             :         /// @brief the pool mutex for external sync
     266             :         FXMutex myPoolMutex;
     267             :         /// @brief the semaphore to wait on for finishing all tasks
     268             :         FXCondition myCondition;
     269             :         /// @brief list of finished tasks
     270             :         std::list<Task*> myFinishedTasks;
     271             :         /// @brief the running index for the next task
     272             :         int myRunningIndex;
     273             :         /// @brief the exception from a child thread
     274             :         ProcessError* myException;
     275             : #ifdef WORKLOAD_PROFILING
     276             :         /// @brief the number of finished batch runs
     277             :         int myNumBatches;
     278             :         /// @brief the sum over the maximum loads
     279             :         double myTotalMaxLoad;
     280             :         /// @brief the sum over the load spreads
     281             :         double myTotalSpread;
     282             :         /// @brief the time when profiling started
     283             :         std::chrono::high_resolution_clock::time_point myProfileStart;
     284             : #endif
     285             :     };
     286             : 
     287             : public:
     288             :     /** @brief Constructor
     289             :      *
     290             :      * Adds the thread to the given pool and starts it.
     291             :      *
     292             :      * @param[in] pool the pool for this thread
     293             :      */
     294       15477 :     MFXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
     295             : #ifdef WORKLOAD_PROFILING
     296             :         , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
     297             : #endif
     298             :     {
     299       15477 :         pool.addWorker(this);
     300       15477 :         start();
     301       15477 :     }
     302             : 
     303             :     /** @brief Destructor
     304             :      *
     305             :      * Stops the thread by calling stop.
     306             :      */
     307       15229 :     virtual ~MFXWorkerThread() {
     308       15213 :         stop();
     309             : #ifdef WORKLOAD_PROFILING
     310             :         const double load = 100. * myTotalBusyTime / myTotalTime;
     311             :         WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
     312             :                       " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
     313             :                       "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
     314             : #endif
     315       15229 :     }
     316             : 
     317             :     /** @brief Adds the given task to this thread to be calculated
     318             :      *
     319             :      * @param[in] t the task to add
     320             :      */
     321    23051799 :     void add(Task* t) {
     322    23051799 :         myMutex.lock();
     323    23051799 :         myTasks.push_back(t);
     324    23051799 :         myCondition.signal();
     325    23051799 :         myMutex.unlock();
     326    23051799 :     }
     327             : 
     328             :     /** @brief Main execution method of this thread.
     329             :      *
     330             :      * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped.
     331             :      *
     332             :      * @return always 0
     333             :      */
     334       15472 :     FXint run() {
     335    12751487 :         while (!myStopped) {
     336    12750656 :             myMutex.lock();
     337    24160261 :             while (!myStopped && myTasks.empty()) {
     338    11409864 :                 myCondition.wait(myMutex);
     339             :             }
     340    12750397 :             if (myStopped) {
     341       14382 :                 myMutex.unlock();
     342       14382 :                 break;
     343             :             }
     344    12736015 :             myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
     345    12736015 :             myMutex.unlock();
     346             :             try {
     347    35787790 :                 for (Task* const t : myCurrentTasks) {
     348             : #ifdef WORKLOAD_PROFILING
     349             :                     const auto before = std::chrono::high_resolution_clock::now();
     350             : #endif
     351    23051799 :                     t->run(this);
     352             : #ifdef WORKLOAD_PROFILING
     353             :                     const auto after = std::chrono::high_resolution_clock::now();
     354             :                     myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
     355             :                     myCounter++;
     356             : #endif
     357             :                 }
     358          24 :             } catch (ProcessError& e) {
     359          24 :                 myPool.setException(e);
     360          24 :             }
     361    12736015 :             myPool.addFinished(myCurrentTasks);
     362             :         }
     363       15213 :         return 0;
     364             :     }
     365             : 
     366             :     /** @brief Stops the thread
     367             :      *
     368             :      * The currently running task will be finished but all further tasks are discarded.
     369             :      */
     370       30406 :     void stop() {
     371       30406 :         myMutex.lock();
     372       30406 :         myStopped = true;
     373       30406 :         myCondition.signal();
     374       30406 :         myMutex.unlock();
     375       30406 :         join();
     376       30406 :     }
     377             : 
     378             : #ifdef WORKLOAD_PROFILING
     379             :     void startProfile() {
     380             :         myBusyTime = 0;
     381             :     }
     382             : 
     383             :     double endProfile(const long long int time) {
     384             :         myTotalTime += time;
     385             :         myTotalBusyTime += myBusyTime;
     386             :         return time == 0 ? 100. : 100. * myBusyTime / time;
     387             :     }
     388             : #endif
     389             : 
     390             : private:
     391             :     /// @brief the pool for this thread
     392             :     Pool& myPool;
     393             :     /// @brief the mutex for the task list
     394             :     FXMutex myMutex;
     395             :     /// @brief the semaphore when waiting for new tasks
     396             :     FXCondition myCondition;
     397             :     /// @brief the list of pending tasks
     398             :     std::list<Task*> myTasks;
     399             :     /// @brief the list of tasks which are currently calculated
     400             :     std::list<Task*> myCurrentTasks;
     401             :     /// @brief whether we are still running
     402             :     bool myStopped;
     403             : #ifdef WORKLOAD_PROFILING
     404             :     /// @brief counting completed tasks
     405             :     int myCounter;
     406             :     /// @brief the time spent in calculations during the current batch
     407             :     long long int myBusyTime;
     408             :     /// @brief the total busy time
     409             :     long long int myTotalBusyTime;
     410             :     /// @brief the total time while anyone had tasks
     411             :     long long int myTotalTime;
     412             : #endif
     413             : };

Generated by: LCOV version 1.14