LCOV - code coverage report
Current view: top level - src/libtraci - Connection.cpp (source / functions) Coverage Total Hit
Test: lcov.info Lines: 92.2 % 256 236
Test Date: 2024-12-21 15:45:41 Functions: 100.0 % 14 14

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2024 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    Connection.cpp
      15              : /// @author  Daniel Krajzewicz
      16              : /// @author  Mario Krumnow
      17              : /// @author  Jakob Erdmann
      18              : /// @author  Michael Behrisch
      19              : /// @date    30.05.2012
      20              : ///
      21              : // C++ TraCI client API implementation
      22              : /****************************************************************************/
      23              : #include <config.h>
      24              : 
      25              : #include <thread>
      26              : #include <chrono>
      27              : #include <array>
      28              : #include <libsumo/StorageHelper.h>
      29              : #include <libsumo/TraCIDefs.h>
      30              : #include "Connection.h"
      31              : 
      32              : 
      33              : namespace libtraci {
      34              : // ===========================================================================
      35              : // static member initializations
      36              : // ===========================================================================
      37              : Connection* Connection::myActive = nullptr;
      38              : std::map<const std::string, Connection*> Connection::myConnections;
      39              : 
      40              : 
      41              : // ===========================================================================
      42              : // member method definitions
      43              : // ===========================================================================
      44          643 : Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
      45         1931 :     myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
      46          643 :     if (pipe != nullptr) {
      47          587 :         myProcessReader = new std::thread(&Connection::readOutput, this);
      48              :     }
      49         1403 :     for (int i = 0; i <= numRetries; i++) {
      50              :         try {
      51         1403 :             mySocket.connect();
      52              :             break;
      53          762 :         } catch (tcpip::SocketException& e) {
      54          762 :             mySocket.close();
      55          762 :             if (i == numRetries) {
      56            2 :                 close();
      57            4 :                 throw libsumo::FatalTraCIError("Could not connect in " + toString(numRetries + 1) + " tries");
      58              :             }
      59         1520 :             std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
      60              :             std::cout << " Retrying in 1 second" << std::endl;
      61          760 :             std::this_thread::sleep_for(std::chrono::seconds(1));
      62          762 :         }
      63              :     }
      64          647 : }
      65              : 
      66              : 
      67              : void
      68          587 : Connection::readOutput() {
      69              :     std::array<char, 256> buffer;
      70              :     bool errout = false;
      71         1478 :     while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
      72          891 :         std::stringstream result;
      73          891 :         result << buffer.data();
      74              :         std::string line;
      75         1782 :         while (std::getline(result, line)) {
      76          891 :             if ((errout && (line.empty() || line[0] == ' ')) || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
      77              :                 std::cerr << line << std::endl;
      78              :                 errout = true;
      79              :             } else {
      80              :                 std::cout << line << std::endl;
      81              :                 errout = false;
      82              :             }
      83              :         }
      84          891 :     }
      85          582 : }
      86              : 
      87              : 
      88              : void
      89          638 : Connection::close() {
      90          638 :     if (mySocket.has_client_connection()) {
      91          636 :         std::unique_lock<std::mutex> lock{ myMutex };
      92          636 :         tcpip::Storage outMsg;
      93              :         // command length
      94          636 :         outMsg.writeUnsignedByte(1 + 1);
      95              :         // command id
      96          636 :         outMsg.writeUnsignedByte(libsumo::CMD_CLOSE);
      97          636 :         mySocket.sendExact(outMsg);
      98              : 
      99          636 :         tcpip::Storage inMsg;
     100              :         std::string acknowledgement;
     101          636 :         check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
     102          636 :         mySocket.close();
     103          636 :     }
     104          638 :     if (myProcessReader != nullptr) {
     105          582 :         myProcessReader->join();
     106         1164 :         delete myProcessReader;
     107          582 :         myProcessReader = nullptr;
     108              : #ifdef WIN32
     109              :         _pclose(myProcessPipe);
     110              : #else
     111          582 :         pclose(myProcessPipe);
     112              : #endif
     113              :     }
     114          638 :     myConnections.erase(myLabel);
     115          638 :     delete myActive;
     116          638 :     myActive = nullptr;
     117          638 : }
     118              : 
     119              : 
     120              : void
     121       115794 : Connection::simulationStep(double time) {
     122       115794 :     std::unique_lock<std::mutex> lock{myMutex};
     123       115794 :     tcpip::Storage outMsg;
     124              :     // command length
     125       115794 :     outMsg.writeUnsignedByte(1 + 1 + 8);
     126              :     // command id
     127       115794 :     outMsg.writeUnsignedByte(libsumo::CMD_SIMSTEP);
     128       115794 :     outMsg.writeDouble(time);
     129              :     // send request message
     130       115794 :     mySocket.sendExact(outMsg);
     131              : 
     132       115794 :     tcpip::Storage inMsg;
     133       115794 :     check_resultState(inMsg, libsumo::CMD_SIMSTEP);
     134              :     mySubscriptionResults.clear();
     135              :     myContextSubscriptionResults.clear();
     136       115794 :     int numSubs = inMsg.readInt();
     137       163117 :     while (numSubs-- > 0) {
     138        47323 :         const int responseID = check_commandGetResult(inMsg, 0, -1, true);
     139        47323 :         if ((responseID >= libsumo::RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE && responseID <= libsumo::RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE) ||
     140        47323 :                 (responseID >= libsumo::RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE && responseID <= libsumo::RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE)) {
     141        18120 :             readVariableSubscription(responseID, inMsg);
     142              :         } else {
     143        29203 :             readContextSubscription(responseID, inMsg);
     144              :         }
     145              :     }
     146       231588 : }
     147              : 
     148              : 
     149              : void
     150            2 : Connection::setOrder(int order) {
     151            2 :     std::unique_lock<std::mutex> lock{ myMutex };
     152            2 :     tcpip::Storage outMsg;
     153              :     // command length
     154            2 :     outMsg.writeUnsignedByte(1 + 1 + 4);
     155              :     // command id
     156            2 :     outMsg.writeUnsignedByte(libsumo::CMD_SETORDER);
     157              :     // client index
     158            2 :     outMsg.writeInt(order);
     159            2 :     mySocket.sendExact(outMsg);
     160              : 
     161            2 :     tcpip::Storage inMsg;
     162            2 :     check_resultState(inMsg, libsumo::CMD_SETORDER);
     163            4 : }
     164              : 
     165              : 
     166              : void
     167       655533 : Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
     168       655533 :     if (!mySocket.has_client_connection()) {
     169            0 :         throw libsumo::FatalTraCIError("Connection already closed.");
     170              :     }
     171       655533 :     myOutput.reset();
     172              :     // command length
     173              :     int length = 1 + 1;
     174       655533 :     if (varID >= 0) {
     175              :         length += 1;
     176       654885 :         if (objID != nullptr) {
     177       654699 :             length += 4 + (int)objID->length();
     178              :         }
     179              :     }
     180       655533 :     if (add != nullptr) {
     181        39998 :         length += (int)add->size();
     182              :     }
     183       655533 :     if (length <= 255) {
     184       655524 :         myOutput.writeUnsignedByte(length);
     185              :     } else {
     186            9 :         myOutput.writeUnsignedByte(0);
     187            9 :         myOutput.writeInt(length + 4);
     188              :     }
     189       655533 :     myOutput.writeUnsignedByte(cmdID);
     190       655533 :     if (varID >= 0) {
     191       654885 :         myOutput.writeUnsignedByte(varID);
     192       654885 :         if (objID != nullptr) {
     193       654699 :             myOutput.writeString(*objID);
     194              :         }
     195              :     }
     196              :     // additional values
     197       655533 :     if (add != nullptr) {
     198        39998 :         myOutput.writeStorage(*add);
     199              :     }
     200       655533 : }
     201              : 
     202              : 
     203              : void
     204         1414 : Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
     205              :                       int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
     206         1414 :     if (!mySocket.has_client_connection()) {
     207            0 :         throw tcpip::SocketException("Socket is not initialised");
     208              :     }
     209              :     const bool isContext = domain != -1;
     210         1414 :     tcpip::Storage outMsg;
     211         1414 :     outMsg.writeUnsignedByte(domID); // command id
     212         1414 :     outMsg.writeDouble(beginTime);
     213         1414 :     outMsg.writeDouble(endTime);
     214         1414 :     outMsg.writeString(objID);
     215         1414 :     if (isContext) {
     216          934 :         outMsg.writeUnsignedByte(domain);
     217          934 :         outMsg.writeDouble(range);
     218              :     }
     219         1414 :     if (vars.size() == 1 && vars.front() == -1) {
     220           30 :         if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
     221              :             // default for vehicles is edge id and lane position
     222            3 :             outMsg.writeUnsignedByte(2);
     223            3 :             outMsg.writeUnsignedByte(libsumo::VAR_ROAD_ID);
     224            3 :             outMsg.writeUnsignedByte(libsumo::VAR_LANEPOSITION);
     225              :         } else {
     226              :             // default for detectors is vehicle number, for all others (and contexts) id list
     227           27 :             outMsg.writeUnsignedByte(1);
     228           27 :             const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
     229           27 :                                     || domID == libsumo::CMD_SUBSCRIBE_LANEAREA_VARIABLE
     230              :                                     || domID == libsumo::CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
     231              :                                     || domID == libsumo::CMD_SUBSCRIBE_LANE_VARIABLE
     232              :                                     || domID == libsumo::CMD_SUBSCRIBE_EDGE_VARIABLE;
     233           27 :             outMsg.writeUnsignedByte(isDetector ? libsumo::LAST_STEP_VEHICLE_NUMBER : libsumo::TRACI_ID_LIST);
     234              :         }
     235              :     } else {
     236         1384 :         outMsg.writeUnsignedByte((int)vars.size());
     237         2539 :         for (const int v : vars) {
     238         1155 :             outMsg.writeUnsignedByte(v);
     239              :             const auto& paramEntry = params.find(v);
     240         1155 :             if (paramEntry != params.end()) {
     241           48 :                 outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
     242              :             }
     243              :         }
     244              :     }
     245         1414 :     tcpip::Storage complete;
     246         1414 :     complete.writeUnsignedByte(0);
     247         1414 :     complete.writeInt(5 + (int)outMsg.size());
     248         1414 :     complete.writeStorage(outMsg);
     249         1414 :     std::unique_lock<std::mutex> lock{ myMutex };
     250              :     // send message
     251         1414 :     mySocket.sendExact(complete);
     252              : 
     253         1414 :     tcpip::Storage inMsg;
     254         1414 :     check_resultState(inMsg, domID);
     255         1183 :     if (!vars.empty()) {
     256          846 :         const int responseID = check_commandGetResult(inMsg, domID);
     257          846 :         if (isContext) {
     258          370 :             readContextSubscription(responseID, inMsg);
     259              :         } else {
     260          476 :             readVariableSubscription(responseID, inMsg);
     261              :         }
     262              :     }
     263         3059 : }
     264              : 
     265              : 
     266              : void
     267       773379 : Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
     268       773379 :     mySocket.receiveExact(inMsg);
     269              :     int cmdLength;
     270              :     int cmdId;
     271              :     int resultType;
     272              :     int cmdStart;
     273              :     std::string msg;
     274              :     try {
     275       773379 :         cmdStart = inMsg.position();
     276       773379 :         cmdLength = inMsg.readUnsignedByte();
     277       773379 :         cmdId = inMsg.readUnsignedByte();
     278       773379 :         resultType = inMsg.readUnsignedByte();
     279       773379 :         msg = inMsg.readString();
     280            0 :     } catch (std::invalid_argument&) {
     281            0 :         throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
     282            0 :     }
     283       773379 :     switch (resultType) {
     284          455 :         case libsumo::RTYPE_ERR:
     285          910 :             throw libsumo::TraCIException(msg);
     286           25 :         case libsumo::RTYPE_NOTIMPLEMENTED:
     287           50 :             throw libsumo::TraCIException(".. Sent command is not implemented (" + toHex(command) + "), [description: " + msg + "]");
     288       772899 :         case libsumo::RTYPE_OK:
     289       772899 :             if (acknowledgement != nullptr) {
     290         2388 :                 (*acknowledgement) = ".. Command acknowledged (" + toHex(command) + "), [description: " + msg + "]";
     291              :             }
     292              :             break;
     293            0 :         default:
     294            0 :             throw libsumo::TraCIException(".. Answered with unknown result code(" + toHex(resultType) + ") to command(" + toHex(command) + "), [description: " + msg + "]");
     295              :     }
     296       772899 :     if (command != cmdId && !ignoreCommandId) {
     297            0 :         throw libsumo::TraCIException("#Error: received status response to command: " + toHex(cmdId) + " but expected: " + toHex(command));
     298              :     }
     299       772899 :     if ((cmdStart + cmdLength) != (int) inMsg.position()) {
     300            0 :         throw libsumo::TraCIException("#Error: command at position " + toHex(cmdStart) + " has wrong length");
     301              :     }
     302       772899 : }
     303              : 
     304              : 
     305              : int
     306       686881 : Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
     307       686881 :     int length = inMsg.readUnsignedByte();
     308       686881 :     if (length == 0) {
     309        59468 :         length = inMsg.readInt();
     310              :     }
     311       686881 :     int cmdId = inMsg.readUnsignedByte();
     312       686881 :     if (!ignoreCommandId && cmdId != (command + 0x10)) {
     313            0 :         throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
     314              :     }
     315       686881 :     if (expectedType >= 0) {
     316              :         // not called from the TraCITestClient but from within the Connection
     317       638712 :         inMsg.readUnsignedByte(); // variableID
     318       638712 :         inMsg.readString(); // objectID
     319       638712 :         int valueDataType = inMsg.readUnsignedByte();
     320       638712 :         if (valueDataType != expectedType) {
     321            0 :             throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
     322              :         }
     323              :     }
     324       686881 :     return cmdId;
     325              : }
     326              : 
     327              : 
     328              : tcpip::Storage&
     329       655347 : Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add, int expectedType) {
     330       655347 :     createCommand(command, var, &id, add);
     331       655347 :     mySocket.sendExact(myOutput);
     332       655347 :     myInput.reset();
     333       655347 :     check_resultState(myInput, command);
     334       655099 :     if (expectedType >= 0) {
     335       638712 :         check_commandGetResult(myInput, command, expectedType);
     336              :     }
     337       655099 :     return myInput;
     338              : }
     339              : 
     340              : 
     341              : void
     342          186 : Connection::addFilter(int var, tcpip::Storage* add) {
     343          186 :     std::unique_lock<std::mutex> lock{ myMutex };
     344          186 :     createCommand(libsumo::CMD_ADD_SUBSCRIPTION_FILTER, var, nullptr, add);
     345          186 :     mySocket.sendExact(myOutput);
     346          186 :     myInput.reset();
     347          186 :     check_resultState(myInput, libsumo::CMD_ADD_SUBSCRIPTION_FILTER);
     348          185 : }
     349              : 
     350              : 
     351              : void
     352       113448 : Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
     353       254007 :     while (variableCount > 0) {
     354              : 
     355       140559 :         const int variableID = inMsg.readUnsignedByte();
     356       140559 :         const int status = inMsg.readUnsignedByte();
     357       140559 :         const int type = inMsg.readUnsignedByte();
     358              : 
     359       140559 :         if (status == libsumo::RTYPE_OK) {
     360       140559 :             switch (type) {
     361        17847 :                 case libsumo::TYPE_DOUBLE:
     362        17847 :                     into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
     363        17847 :                     break;
     364        16528 :                 case libsumo::TYPE_STRING:
     365        33056 :                     into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
     366        16528 :                     break;
     367        92809 :                 case libsumo::POSITION_2D: {
     368              :                     auto p = std::make_shared<libsumo::TraCIPosition>();
     369        92809 :                     p->x = inMsg.readDouble();
     370        92809 :                     p->y = inMsg.readDouble();
     371        92809 :                     into[objectID][variableID] = p;
     372              :                     break;
     373              :                 }
     374           28 :                 case libsumo::POSITION_3D: {
     375              :                     auto p = std::make_shared<libsumo::TraCIPosition>();
     376           28 :                     p->x = inMsg.readDouble();
     377           28 :                     p->y = inMsg.readDouble();
     378           28 :                     p->z = inMsg.readDouble();
     379           28 :                     into[objectID][variableID] = p;
     380              :                     break;
     381              :                 }
     382            0 :                 case libsumo::TYPE_COLOR: {
     383              :                     auto c = std::make_shared<libsumo::TraCIColor>();
     384            0 :                     c->r = (unsigned char)inMsg.readUnsignedByte();
     385            0 :                     c->g = (unsigned char)inMsg.readUnsignedByte();
     386            0 :                     c->b = (unsigned char)inMsg.readUnsignedByte();
     387            0 :                     c->a = (unsigned char)inMsg.readUnsignedByte();
     388            0 :                     into[objectID][variableID] = c;
     389              :                     break;
     390              :                 }
     391         8354 :                 case libsumo::TYPE_INTEGER:
     392         8354 :                     into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
     393         8354 :                     break;
     394          437 :                 case libsumo::TYPE_STRINGLIST: {
     395              :                     auto sl = std::make_shared<libsumo::TraCIStringList>();
     396          437 :                     int n = inMsg.readInt();
     397         1677 :                     for (int i = 0; i < n; ++i) {
     398         2480 :                         sl->value.push_back(inMsg.readString());
     399              :                     }
     400          437 :                     into[objectID][variableID] = sl;
     401              :                 }
     402          437 :                 break;
     403         4556 :                 case libsumo::TYPE_COMPOUND: {
     404         4556 :                     int n = inMsg.readInt();
     405         4556 :                     if (n == 2) {
     406         4556 :                         inMsg.readUnsignedByte();
     407         4556 :                         const std::string s = inMsg.readString();
     408         4556 :                         const int secondType = inMsg.readUnsignedByte();
     409         4556 :                         if (secondType == libsumo::TYPE_DOUBLE) {
     410              :                             auto r = std::make_shared<libsumo::TraCIRoadPosition>();
     411         4535 :                             r->edgeID = s;
     412         4535 :                             r->pos = inMsg.readDouble();
     413         4535 :                             into[objectID][variableID] = r;
     414           21 :                         } else if (secondType == libsumo::TYPE_STRING) {
     415              :                             auto sl = std::make_shared<libsumo::TraCIStringList>();
     416           21 :                             sl->value.push_back(s);
     417           21 :                             sl->value.push_back(inMsg.readString());
     418           21 :                             into[objectID][variableID] = sl;
     419              :                         }
     420              :                     }
     421              :                 }
     422              :                 break;
     423              : 
     424              :                 // TODO Other data types
     425              : 
     426            0 :                 default:
     427            0 :                     throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
     428              :             }
     429              :         } else {
     430            0 :             throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
     431              :         }
     432              : 
     433       140559 :         variableCount--;
     434              :     }
     435       113448 : }
     436              : 
     437              : 
     438              : void
     439        18596 : Connection::readVariableSubscription(int responseID, tcpip::Storage& inMsg) {
     440        18596 :     const std::string objectID = inMsg.readString();
     441        18596 :     const int variableCount = inMsg.readUnsignedByte();
     442        18596 :     readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
     443        18596 : }
     444              : 
     445              : 
     446              : void
     447        29573 : Connection::readContextSubscription(int responseID, tcpip::Storage& inMsg) {
     448        29573 :     const std::string contextID = inMsg.readString();
     449        29573 :     inMsg.readUnsignedByte(); // context domain
     450        29573 :     const int variableCount = inMsg.readUnsignedByte();
     451        29573 :     int numObjects = inMsg.readInt();
     452              :     // the following also instantiates the empty map to get comparable results with libsumo
     453              :     // see also https://github.com/eclipse/sumo/issues/7288
     454        29573 :     libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
     455       124425 :     while (numObjects-- > 0) {
     456        94852 :         const std::string& objectID = inMsg.readString();
     457        94852 :         results[objectID]; // instantiate empty map for id lists
     458        94852 :         readVariables(inMsg, objectID, variableCount, results);
     459              :     }
     460        29573 : }
     461              : 
     462              : 
     463              : }
     464              : 
     465              : 
     466              : /****************************************************************************/
        

Generated by: LCOV version 2.0-1