Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
Connection.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2024 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
21// C++ TraCI client API implementation
22/****************************************************************************/
23#include <config.h>
24
25#include <thread>
26#include <chrono>
27#include <array>
29#include <libsumo/TraCIDefs.h>
30#include "Connection.h"
31
32
33namespace libtraci {
34// ===========================================================================
35// static member initializations
36// ===========================================================================
37Connection* Connection::myActive = nullptr;
38std::map<const std::string, Connection*> Connection::myConnections;
39
40
41// ===========================================================================
42// member method definitions
43// ===========================================================================
44Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
45 myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
46 if (pipe != nullptr) {
47 myProcessReader = new std::thread(&Connection::readOutput, this);
48 }
49 for (int i = 0; i <= numRetries; i++) {
50 try {
52 break;
53 } catch (tcpip::SocketException& e) {
55 if (i == numRetries) {
56 close();
57 throw libsumo::FatalTraCIError("Could not connect in " + toString(numRetries + 1) + " tries");
58 }
59 std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
60 std::cout << " Retrying in 1 second" << std::endl;
61 std::this_thread::sleep_for(std::chrono::seconds(1));
62 }
63 }
64}
65
66
67void
69 std::array<char, 256> buffer;
70 bool errout = false;
71 while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
72 std::stringstream result;
73 result << buffer.data();
74 std::string line;
75 while (std::getline(result, line)) {
76 if ((errout && (line.empty() || line[0] == ' ')) || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
77 std::cerr << line << std::endl;
78 errout = true;
79 } else {
80 std::cout << line << std::endl;
81 errout = false;
82 }
83 }
84 }
85}
86
87
88void
91 std::unique_lock<std::mutex> lock{ myMutex };
92 tcpip::Storage outMsg;
93 // command length
94 outMsg.writeUnsignedByte(1 + 1);
95 // command id
97 mySocket.sendExact(outMsg);
98
99 tcpip::Storage inMsg;
100 std::string acknowledgement;
101 check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
102 mySocket.close();
103 }
104 if (myProcessReader != nullptr) {
105 myProcessReader->join();
106 delete myProcessReader;
107 myProcessReader = nullptr;
108#ifdef WIN32
109 _pclose(myProcessPipe);
110#else
111 pclose(myProcessPipe);
112#endif
113 }
114 myConnections.erase(myLabel);
115 delete myActive;
116 myActive = nullptr;
117}
118
119
120void
122 std::unique_lock<std::mutex> lock{myMutex};
123 tcpip::Storage outMsg;
124 // command length
125 outMsg.writeUnsignedByte(1 + 1 + 8);
126 // command id
128 outMsg.writeDouble(time);
129 // send request message
130 mySocket.sendExact(outMsg);
131
132 tcpip::Storage inMsg;
134 mySubscriptionResults.clear();
136 int numSubs = inMsg.readInt();
137 while (numSubs-- > 0) {
138 const int responseID = check_commandGetResult(inMsg, 0, -1, true);
141 readVariableSubscription(responseID, inMsg);
142 } else {
143 readContextSubscription(responseID, inMsg);
144 }
145 }
146}
147
148
149void
151 std::unique_lock<std::mutex> lock{ myMutex };
152 tcpip::Storage outMsg;
153 // command length
154 outMsg.writeUnsignedByte(1 + 1 + 4);
155 // command id
157 // client index
158 outMsg.writeInt(order);
159 mySocket.sendExact(outMsg);
160
161 tcpip::Storage inMsg;
163}
164
165
166void
167Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
169 throw libsumo::FatalTraCIError("Connection already closed.");
170 }
171 myOutput.reset();
172 // command length
173 int length = 1 + 1;
174 if (varID >= 0) {
175 length += 1;
176 if (objID != nullptr) {
177 length += 4 + (int)objID->length();
178 }
179 }
180 if (add != nullptr) {
181 length += (int)add->size();
182 }
183 if (length <= 255) {
185 } else {
187 myOutput.writeInt(length + 4);
188 }
190 if (varID >= 0) {
192 if (objID != nullptr) {
193 myOutput.writeString(*objID);
194 }
195 }
196 // additional values
197 if (add != nullptr) {
199 }
200}
201
202
203void
204Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
205 int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
207 throw tcpip::SocketException("Socket is not initialised");
208 }
209 const bool isContext = domain != -1;
210 tcpip::Storage outMsg;
211 outMsg.writeUnsignedByte(domID); // command id
212 outMsg.writeDouble(beginTime);
213 outMsg.writeDouble(endTime);
214 outMsg.writeString(objID);
215 if (isContext) {
216 outMsg.writeUnsignedByte(domain);
217 outMsg.writeDouble(range);
218 }
219 if (vars.size() == 1 && vars.front() == -1) {
220 if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
221 // default for vehicles is edge id and lane position
222 outMsg.writeUnsignedByte(2);
225 } else {
226 // default for detectors is vehicle number, for all others (and contexts) id list
227 outMsg.writeUnsignedByte(1);
228 const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
234 }
235 } else {
236 outMsg.writeUnsignedByte((int)vars.size());
237 for (const int v : vars) {
238 outMsg.writeUnsignedByte(v);
239 const auto& paramEntry = params.find(v);
240 if (paramEntry != params.end()) {
241 outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
242 }
243 }
244 }
245 tcpip::Storage complete;
246 complete.writeUnsignedByte(0);
247 complete.writeInt(5 + (int)outMsg.size());
248 complete.writeStorage(outMsg);
249 std::unique_lock<std::mutex> lock{ myMutex };
250 // send message
251 mySocket.sendExact(complete);
252
253 tcpip::Storage inMsg;
254 check_resultState(inMsg, domID);
255 if (!vars.empty()) {
256 const int responseID = check_commandGetResult(inMsg, domID);
257 if (isContext) {
258 readContextSubscription(responseID, inMsg);
259 } else {
260 readVariableSubscription(responseID, inMsg);
261 }
262 }
263}
264
265
266void
267Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
268 mySocket.receiveExact(inMsg);
269 int cmdLength;
270 int cmdId;
271 int resultType;
272 int cmdStart;
273 std::string msg;
274 try {
275 cmdStart = inMsg.position();
276 cmdLength = inMsg.readUnsignedByte();
277 cmdId = inMsg.readUnsignedByte();
278 resultType = inMsg.readUnsignedByte();
279 msg = inMsg.readString();
280 } catch (std::invalid_argument&) {
281 throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
282 }
283 switch (resultType) {
285 throw libsumo::TraCIException(msg);
287 throw libsumo::TraCIException(".. Sent command is not implemented (" + toHex(command) + "), [description: " + msg + "]");
289 if (acknowledgement != nullptr) {
290 (*acknowledgement) = ".. Command acknowledged (" + toHex(command) + "), [description: " + msg + "]";
291 }
292 break;
293 default:
294 throw libsumo::TraCIException(".. Answered with unknown result code(" + toHex(resultType) + ") to command(" + toHex(command) + "), [description: " + msg + "]");
295 }
296 if (command != cmdId && !ignoreCommandId) {
297 throw libsumo::TraCIException("#Error: received status response to command: " + toHex(cmdId) + " but expected: " + toHex(command));
298 }
299 if ((cmdStart + cmdLength) != (int) inMsg.position()) {
300 throw libsumo::TraCIException("#Error: command at position " + toHex(cmdStart) + " has wrong length");
301 }
302}
303
304
305int
306Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
307 int length = inMsg.readUnsignedByte();
308 if (length == 0) {
309 length = inMsg.readInt();
310 }
311 int cmdId = inMsg.readUnsignedByte();
312 if (!ignoreCommandId && cmdId != (command + 0x10)) {
313 throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
314 }
315 if (expectedType >= 0) {
316 // not called from the TraCITestClient but from within the Connection
317 inMsg.readUnsignedByte(); // variableID
318 inMsg.readString(); // objectID
319 int valueDataType = inMsg.readUnsignedByte();
320 if (valueDataType != expectedType) {
321 throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
322 }
323 }
324 return cmdId;
325}
326
327
329Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add, int expectedType) {
330 createCommand(command, var, &id, add);
332 myInput.reset();
333 check_resultState(myInput, command);
334 if (expectedType >= 0) {
335 check_commandGetResult(myInput, command, expectedType);
336 }
337 return myInput;
338}
339
340
341void
349
350
351void
352Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
353 while (variableCount > 0) {
354
355 const int variableID = inMsg.readUnsignedByte();
356 const int status = inMsg.readUnsignedByte();
357 const int type = inMsg.readUnsignedByte();
358
359 if (status == libsumo::RTYPE_OK) {
360 switch (type) {
362 into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
363 break;
365 into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
366 break;
368 auto p = std::make_shared<libsumo::TraCIPosition>();
369 p->x = inMsg.readDouble();
370 p->y = inMsg.readDouble();
371 into[objectID][variableID] = p;
372 break;
373 }
375 auto p = std::make_shared<libsumo::TraCIPosition>();
376 p->x = inMsg.readDouble();
377 p->y = inMsg.readDouble();
378 p->z = inMsg.readDouble();
379 into[objectID][variableID] = p;
380 break;
381 }
382 case libsumo::TYPE_COLOR: {
383 auto c = std::make_shared<libsumo::TraCIColor>();
384 c->r = (unsigned char)inMsg.readUnsignedByte();
385 c->g = (unsigned char)inMsg.readUnsignedByte();
386 c->b = (unsigned char)inMsg.readUnsignedByte();
387 c->a = (unsigned char)inMsg.readUnsignedByte();
388 into[objectID][variableID] = c;
389 break;
390 }
392 into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
393 break;
395 auto sl = std::make_shared<libsumo::TraCIStringList>();
396 int n = inMsg.readInt();
397 for (int i = 0; i < n; ++i) {
398 sl->value.push_back(inMsg.readString());
399 }
400 into[objectID][variableID] = sl;
401 }
402 break;
404 int n = inMsg.readInt();
405 if (n == 2) {
406 inMsg.readUnsignedByte();
407 const std::string s = inMsg.readString();
408 const int secondType = inMsg.readUnsignedByte();
409 if (secondType == libsumo::TYPE_DOUBLE) {
410 auto r = std::make_shared<libsumo::TraCIRoadPosition>();
411 r->edgeID = s;
412 r->pos = inMsg.readDouble();
413 into[objectID][variableID] = r;
414 } else if (secondType == libsumo::TYPE_STRING) {
415 auto sl = std::make_shared<libsumo::TraCIStringList>();
416 sl->value.push_back(s);
417 sl->value.push_back(inMsg.readString());
418 into[objectID][variableID] = sl;
419 }
420 }
421 }
422 break;
423
424 // TODO Other data types
425
426 default:
427 throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
428 }
429 } else {
430 throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
431 }
432
433 variableCount--;
434 }
435}
436
437
438void
440 const std::string objectID = inMsg.readString();
441 const int variableCount = inMsg.readUnsignedByte();
442 readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
443}
444
445
446void
448 const std::string contextID = inMsg.readString();
449 inMsg.readUnsignedByte(); // context domain
450 const int variableCount = inMsg.readUnsignedByte();
451 int numObjects = inMsg.readInt();
452 // the following also instantiates the empty map to get comparable results with libsumo
453 // see also https://github.com/eclipse/sumo/issues/7288
454 libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
455 while (numObjects-- > 0) {
456 const std::string& objectID = inMsg.readString();
457 results[objectID]; // instantiate empty map for id lists
458 readVariables(inMsg, objectID, variableCount, results);
459 }
460}
461
462
463}
464
465
466/****************************************************************************/
An error which is not recoverable.
Definition TraCIDefs.h:155
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
An error which allows to continue.
Definition TraCIDefs.h:144
void simulationStep(double time)
Sends a SimulationStep command.
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
void close()
ends the simulation and closes the connection
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
void addFilter(int var, tcpip::Storage *add=nullptr)
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
tcpip::Socket mySocket
The socket.
Definition Connection.h:180
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition Connection.h:188
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
tcpip::Storage myInput
The reusable input storage.
Definition Connection.h:184
FILE *const myProcessPipe
Definition Connection.h:177
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition Connection.h:189
tcpip::Storage myOutput
The reusable output storage.
Definition Connection.h:182
void setOrder(int order)
Sends a SetOrder command.
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
static std::map< const std::string, Connection * > myConnections
Definition Connection.h:192
const std::string myLabel
Definition Connection.h:176
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
tcpip::Storage & doCommand(int command, int var=-1, const std::string &id="", tcpip::Storage *add=nullptr, int expectedType=-1)
static Connection * myActive
Definition Connection.h:191
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition Connection.h:150
std::thread * myProcessReader
Definition Connection.h:178
std::string toHex(const T i, std::streamsize numDigits=2)
Definition Connection.h:159
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition socket.cpp:510
void sendExact(const Storage &)
Definition socket.cpp:413
bool has_client_connection() const
Definition socket.cpp:542
void connect()
Connects to host_:port_.
Definition socket.cpp:332
void close()
Definition socket.cpp:365
virtual std::string readString()
Definition storage.cpp:180
virtual void writeString(const std::string &s)
Definition storage.cpp:197
virtual unsigned int position() const
Definition storage.cpp:76
virtual void writeInt(int)
Definition storage.cpp:321
virtual void writeDouble(double)
Definition storage.cpp:354
virtual int readUnsignedByte()
Definition storage.cpp:155
virtual void writeUnsignedByte(int)
Definition storage.cpp:165
StorageType::size_type size() const
Definition storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition storage.cpp:388
virtual double readDouble()
Definition storage.cpp:362
virtual int readInt()
Definition storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition TraCIDefs.h:337
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition TraCIDefs.h:335
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING