Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 // Copyright (C) 2012-2024 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
21 // C++ TraCI client API implementation
22 /****************************************************************************/
23 #include <config.h>
24 
25 #include <thread>
26 #include <chrono>
27 #include <array>
28 #include <libsumo/StorageHelper.h>
29 #include <libsumo/TraCIDefs.h>
30 #include "Connection.h"
31 
32 
33 namespace libtraci {
34 // ===========================================================================
35 // static member initializations
36 // ===========================================================================
37 Connection* Connection::myActive = nullptr;
38 std::map<const std::string, Connection*> Connection::myConnections;
39 
40 
41 // ===========================================================================
42 // member method definitions
43 // ===========================================================================
44 Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
45  myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
46  if (pipe != nullptr) {
47  myProcessReader = new std::thread(&Connection::readOutput, this);
48  }
49  for (int i = 0; i <= numRetries; i++) {
50  try {
51  mySocket.connect();
52  break;
53  } catch (tcpip::SocketException& e) {
54  mySocket.close();
55  if (i == numRetries) {
56  close();
57  throw libsumo::FatalTraCIError("Could not connect in " + toString(numRetries + 1) + " tries");
58  }
59  std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
60  std::cout << " Retrying in 1 second" << std::endl;
61  std::this_thread::sleep_for(std::chrono::seconds(1));
62  }
63  }
64 }
65 
66 
67 void
69  std::array<char, 256> buffer;
70  bool errout = false;
71  while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
72  std::stringstream result;
73  result << buffer.data();
74  std::string line;
75  while (std::getline(result, line)) {
76  if ((errout && (line.empty() || line[0] == ' ')) || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
77  std::cerr << line << std::endl;
78  errout = true;
79  } else {
80  std::cout << line << std::endl;
81  errout = false;
82  }
83  }
84  }
85 }
86 
87 
88 void
91  std::unique_lock<std::mutex> lock{ myMutex };
92  tcpip::Storage outMsg;
93  // command length
94  outMsg.writeUnsignedByte(1 + 1);
95  // command id
97  mySocket.sendExact(outMsg);
98 
99  tcpip::Storage inMsg;
100  std::string acknowledgement;
101  check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
102  mySocket.close();
103  }
104  if (myProcessReader != nullptr) {
105  myProcessReader->join();
106  delete myProcessReader;
107  myProcessReader = nullptr;
108 #ifdef WIN32
109  _pclose(myProcessPipe);
110 #else
111  pclose(myProcessPipe);
112 #endif
113  }
114  myConnections.erase(myLabel);
115  delete myActive;
116  myActive = nullptr;
117 }
118 
119 
120 void
122  std::unique_lock<std::mutex> lock{myMutex};
123  tcpip::Storage outMsg;
124  // command length
125  outMsg.writeUnsignedByte(1 + 1 + 8);
126  // command id
128  outMsg.writeDouble(time);
129  // send request message
130  mySocket.sendExact(outMsg);
131 
132  tcpip::Storage inMsg;
134  mySubscriptionResults.clear();
136  int numSubs = inMsg.readInt();
137  while (numSubs-- > 0) {
138  const int responseID = check_commandGetResult(inMsg, 0, -1, true);
141  readVariableSubscription(responseID, inMsg);
142  } else {
143  readContextSubscription(responseID, inMsg);
144  }
145  }
146 }
147 
148 
149 void
151  std::unique_lock<std::mutex> lock{ myMutex };
152  tcpip::Storage outMsg;
153  // command length
154  outMsg.writeUnsignedByte(1 + 1 + 4);
155  // command id
157  // client index
158  outMsg.writeInt(order);
159  mySocket.sendExact(outMsg);
160 
161  tcpip::Storage inMsg;
163 }
164 
165 
166 void
167 Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
169  throw libsumo::FatalTraCIError("Connection already closed.");
170  }
171  myOutput.reset();
172  // command length
173  int length = 1 + 1;
174  if (varID >= 0) {
175  length += 1;
176  if (objID != nullptr) {
177  length += 4 + (int)objID->length();
178  }
179  }
180  if (add != nullptr) {
181  length += (int)add->size();
182  }
183  if (length <= 255) {
184  myOutput.writeUnsignedByte(length);
185  } else {
187  myOutput.writeInt(length + 4);
188  }
190  if (varID >= 0) {
192  if (objID != nullptr) {
193  myOutput.writeString(*objID);
194  }
195  }
196  // additional values
197  if (add != nullptr) {
198  myOutput.writeStorage(*add);
199  }
200 }
201 
202 
203 void
204 Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
205  int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
207  throw tcpip::SocketException("Socket is not initialised");
208  }
209  const bool isContext = domain != -1;
210  tcpip::Storage outMsg;
211  outMsg.writeUnsignedByte(domID); // command id
212  outMsg.writeDouble(beginTime);
213  outMsg.writeDouble(endTime);
214  outMsg.writeString(objID);
215  if (isContext) {
216  outMsg.writeUnsignedByte(domain);
217  outMsg.writeDouble(range);
218  }
219  if (vars.size() == 1 && vars.front() == -1) {
220  if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
221  // default for vehicles is edge id and lane position
222  outMsg.writeUnsignedByte(2);
225  } else {
226  // default for detectors is vehicle number, for all others (and contexts) id list
227  outMsg.writeUnsignedByte(1);
228  const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
234  }
235  } else {
236  outMsg.writeUnsignedByte((int)vars.size());
237  for (const int v : vars) {
238  outMsg.writeUnsignedByte(v);
239  const auto& paramEntry = params.find(v);
240  if (paramEntry != params.end()) {
241  outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
242  }
243  }
244  }
245  tcpip::Storage complete;
246  complete.writeUnsignedByte(0);
247  complete.writeInt(5 + (int)outMsg.size());
248  complete.writeStorage(outMsg);
249  std::unique_lock<std::mutex> lock{ myMutex };
250  // send message
251  mySocket.sendExact(complete);
252 
253  tcpip::Storage inMsg;
254  check_resultState(inMsg, domID);
255  if (!vars.empty()) {
256  const int responseID = check_commandGetResult(inMsg, domID);
257  if (isContext) {
258  readContextSubscription(responseID, inMsg);
259  } else {
260  readVariableSubscription(responseID, inMsg);
261  }
262  }
263 }
264 
265 
266 void
267 Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
268  mySocket.receiveExact(inMsg);
269  int cmdLength;
270  int cmdId;
271  int resultType;
272  int cmdStart;
273  std::string msg;
274  try {
275  cmdStart = inMsg.position();
276  cmdLength = inMsg.readUnsignedByte();
277  cmdId = inMsg.readUnsignedByte();
278  resultType = inMsg.readUnsignedByte();
279  msg = inMsg.readString();
280  } catch (std::invalid_argument&) {
281  throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
282  }
283  switch (resultType) {
284  case libsumo::RTYPE_ERR:
285  throw libsumo::TraCIException(msg);
287  throw libsumo::TraCIException(".. Sent command is not implemented (" + toHex(command) + "), [description: " + msg + "]");
288  case libsumo::RTYPE_OK:
289  if (acknowledgement != nullptr) {
290  (*acknowledgement) = ".. Command acknowledged (" + toHex(command) + "), [description: " + msg + "]";
291  }
292  break;
293  default:
294  throw libsumo::TraCIException(".. Answered with unknown result code(" + toHex(resultType) + ") to command(" + toHex(command) + "), [description: " + msg + "]");
295  }
296  if (command != cmdId && !ignoreCommandId) {
297  throw libsumo::TraCIException("#Error: received status response to command: " + toHex(cmdId) + " but expected: " + toHex(command));
298  }
299  if ((cmdStart + cmdLength) != (int) inMsg.position()) {
300  throw libsumo::TraCIException("#Error: command at position " + toHex(cmdStart) + " has wrong length");
301  }
302 }
303 
304 
305 int
306 Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
307  int length = inMsg.readUnsignedByte();
308  if (length == 0) {
309  length = inMsg.readInt();
310  }
311  int cmdId = inMsg.readUnsignedByte();
312  if (!ignoreCommandId && cmdId != (command + 0x10)) {
313  throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
314  }
315  if (expectedType >= 0) {
316  // not called from the TraCITestClient but from within the Connection
317  inMsg.readUnsignedByte(); // variableID
318  inMsg.readString(); // objectID
319  int valueDataType = inMsg.readUnsignedByte();
320  if (valueDataType != expectedType) {
321  throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
322  }
323  }
324  return cmdId;
325 }
326 
327 
329 Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add, int expectedType) {
330  createCommand(command, var, &id, add);
332  myInput.reset();
333  check_resultState(myInput, command);
334  if (expectedType >= 0) {
335  check_commandGetResult(myInput, command, expectedType);
336  }
337  return myInput;
338 }
339 
340 
341 void
343  std::unique_lock<std::mutex> lock{ myMutex };
346  myInput.reset();
348 }
349 
350 
351 void
352 Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
353  while (variableCount > 0) {
354 
355  const int variableID = inMsg.readUnsignedByte();
356  const int status = inMsg.readUnsignedByte();
357  const int type = inMsg.readUnsignedByte();
358 
359  if (status == libsumo::RTYPE_OK) {
360  switch (type) {
362  into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
363  break;
365  into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
366  break;
367  case libsumo::POSITION_2D: {
368  auto p = std::make_shared<libsumo::TraCIPosition>();
369  p->x = inMsg.readDouble();
370  p->y = inMsg.readDouble();
371  into[objectID][variableID] = p;
372  break;
373  }
374  case libsumo::POSITION_3D: {
375  auto p = std::make_shared<libsumo::TraCIPosition>();
376  p->x = inMsg.readDouble();
377  p->y = inMsg.readDouble();
378  p->z = inMsg.readDouble();
379  into[objectID][variableID] = p;
380  break;
381  }
382  case libsumo::TYPE_COLOR: {
383  auto c = std::make_shared<libsumo::TraCIColor>();
384  c->r = (unsigned char)inMsg.readUnsignedByte();
385  c->g = (unsigned char)inMsg.readUnsignedByte();
386  c->b = (unsigned char)inMsg.readUnsignedByte();
387  c->a = (unsigned char)inMsg.readUnsignedByte();
388  into[objectID][variableID] = c;
389  break;
390  }
392  into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
393  break;
395  auto sl = std::make_shared<libsumo::TraCIStringList>();
396  int n = inMsg.readInt();
397  for (int i = 0; i < n; ++i) {
398  sl->value.push_back(inMsg.readString());
399  }
400  into[objectID][variableID] = sl;
401  }
402  break;
403  case libsumo::TYPE_COMPOUND: {
404  int n = inMsg.readInt();
405  if (n == 2) {
406  inMsg.readUnsignedByte();
407  const std::string s = inMsg.readString();
408  const int secondType = inMsg.readUnsignedByte();
409  if (secondType == libsumo::TYPE_DOUBLE) {
410  auto r = std::make_shared<libsumo::TraCIRoadPosition>();
411  r->edgeID = s;
412  r->pos = inMsg.readDouble();
413  into[objectID][variableID] = r;
414  } else if (secondType == libsumo::TYPE_STRING) {
415  auto sl = std::make_shared<libsumo::TraCIStringList>();
416  sl->value.push_back(s);
417  sl->value.push_back(inMsg.readString());
418  into[objectID][variableID] = sl;
419  }
420  }
421  }
422  break;
423 
424  // TODO Other data types
425 
426  default:
427  throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
428  }
429  } else {
430  throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
431  }
432 
433  variableCount--;
434  }
435 }
436 
437 
438 void
440  const std::string objectID = inMsg.readString();
441  const int variableCount = inMsg.readUnsignedByte();
442  readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
443 }
444 
445 
446 void
448  const std::string contextID = inMsg.readString();
449  inMsg.readUnsignedByte(); // context domain
450  const int variableCount = inMsg.readUnsignedByte();
451  int numObjects = inMsg.readInt();
452  // the following also instantiates the empty map to get comparable results with libsumo
453  // see also https://github.com/eclipse/sumo/issues/7288
454  libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
455  while (numObjects-- > 0) {
456  const std::string& objectID = inMsg.readString();
457  results[objectID]; // instantiate empty map for id lists
458  readVariables(inMsg, objectID, variableCount, results);
459  }
460 }
461 
462 
463 }
464 
465 
466 /****************************************************************************/
An error which is not recoverable.
Definition: TraCIDefs.h:155
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
Definition: StorageHelper.h:33
An error which allows to continue.
Definition: TraCIDefs.h:144
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:121
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:44
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:89
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:167
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:306
void addFilter(int var, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:342
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:439
tcpip::Socket mySocket
The socket.
Definition: Connection.h:180
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:188
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:267
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:184
FILE *const myProcessPipe
Definition: Connection.h:177
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:352
std::mutex myMutex
Definition: Connection.h:186
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:189
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:182
void setOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:150
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
Definition: Connection.cpp:204
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:192
const std::string myLabel
Definition: Connection.h:176
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:447
tcpip::Storage & doCommand(int command, int var=-1, const std::string &id="", tcpip::Storage *add=nullptr, int expectedType=-1)
Definition: Connection.cpp:329
static Connection * myActive
Definition: Connection.h:191
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:150
std::thread * myProcessReader
Definition: Connection.h:178
std::string toHex(const T i, std::streamsize numDigits=2)
Definition: Connection.h:159
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:536
void sendExact(const Storage &)
Definition: socket.cpp:439
bool has_client_connection() const
Definition: socket.cpp:568
void connect()
Connects to host_:port_.
Definition: socket.cpp:367
void close()
Definition: socket.cpp:391
virtual std::string readString()
Definition: storage.cpp:180
virtual void writeString(const std::string &s)
Definition: storage.cpp:197
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:321
virtual void writeDouble(double)
Definition: storage.cpp:354
virtual int readUnsignedByte()
Definition: storage.cpp:155
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:165
StorageType::size_type size() const
Definition: storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:388
virtual double readDouble()
Definition: storage.cpp:362
virtual int readInt()
Definition: storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:337
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:335
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING