Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2012-2024 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file Connection.cpp
15 : /// @author Daniel Krajzewicz
16 : /// @author Mario Krumnow
17 : /// @author Jakob Erdmann
18 : /// @author Michael Behrisch
19 : /// @date 30.05.2012
20 : ///
21 : // C++ TraCI client API implementation
22 : /****************************************************************************/
23 : #include <config.h>
24 :
25 : #include <thread>
26 : #include <chrono>
27 : #include <array>
28 : #include <libsumo/StorageHelper.h>
29 : #include <libsumo/TraCIDefs.h>
30 : #include "Connection.h"
31 :
32 :
33 : namespace libtraci {
34 : // ===========================================================================
35 : // static member initializations
36 : // ===========================================================================
37 : Connection* Connection::myActive = nullptr;
38 : std::map<const std::string, Connection*> Connection::myConnections;
39 :
40 :
41 : // ===========================================================================
42 : // member method definitions
43 : // ===========================================================================
44 638 : Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
45 1916 : myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
46 638 : if (pipe != nullptr) {
47 582 : myProcessReader = new std::thread(&Connection::readOutput, this);
48 : }
49 1393 : for (int i = 0; i <= numRetries; i++) {
50 : try {
51 1393 : mySocket.connect();
52 : break;
53 757 : } catch (tcpip::SocketException& e) {
54 757 : mySocket.close();
55 757 : if (i == numRetries) {
56 2 : close();
57 4 : throw libsumo::FatalTraCIError("Could not connect in " + toString(numRetries + 1) + " tries");
58 : }
59 1510 : std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
60 : std::cout << " Retrying in 1 second" << std::endl;
61 755 : std::this_thread::sleep_for(std::chrono::seconds(1));
62 757 : }
63 : }
64 642 : }
65 :
66 :
67 : void
68 582 : Connection::readOutput() {
69 : std::array<char, 256> buffer;
70 : bool errout = false;
71 1486 : while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
72 904 : std::stringstream result;
73 904 : result << buffer.data();
74 : std::string line;
75 1808 : while (std::getline(result, line)) {
76 904 : if ((errout && (line.empty() || line[0] == ' ')) || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
77 : std::cerr << line << std::endl;
78 : errout = true;
79 : } else {
80 : std::cout << line << std::endl;
81 : errout = false;
82 : }
83 : }
84 904 : }
85 577 : }
86 :
87 :
88 : void
89 633 : Connection::close() {
90 633 : if (mySocket.has_client_connection()) {
91 631 : std::unique_lock<std::mutex> lock{ myMutex };
92 631 : tcpip::Storage outMsg;
93 : // command length
94 631 : outMsg.writeUnsignedByte(1 + 1);
95 : // command id
96 631 : outMsg.writeUnsignedByte(libsumo::CMD_CLOSE);
97 631 : mySocket.sendExact(outMsg);
98 :
99 631 : tcpip::Storage inMsg;
100 : std::string acknowledgement;
101 631 : check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
102 631 : mySocket.close();
103 631 : }
104 633 : if (myProcessReader != nullptr) {
105 577 : myProcessReader->join();
106 1154 : delete myProcessReader;
107 577 : myProcessReader = nullptr;
108 : #ifdef WIN32
109 : _pclose(myProcessPipe);
110 : #else
111 577 : pclose(myProcessPipe);
112 : #endif
113 : }
114 633 : myConnections.erase(myLabel);
115 633 : delete myActive;
116 633 : myActive = nullptr;
117 633 : }
118 :
119 :
120 : void
121 105270 : Connection::simulationStep(double time) {
122 105270 : std::unique_lock<std::mutex> lock{myMutex};
123 105270 : tcpip::Storage outMsg;
124 : // command length
125 105270 : outMsg.writeUnsignedByte(1 + 1 + 8);
126 : // command id
127 105270 : outMsg.writeUnsignedByte(libsumo::CMD_SIMSTEP);
128 105270 : outMsg.writeDouble(time);
129 : // send request message
130 105270 : mySocket.sendExact(outMsg);
131 :
132 105270 : tcpip::Storage inMsg;
133 105270 : check_resultState(inMsg, libsumo::CMD_SIMSTEP);
134 : mySubscriptionResults.clear();
135 : myContextSubscriptionResults.clear();
136 105270 : int numSubs = inMsg.readInt();
137 152593 : while (numSubs-- > 0) {
138 47323 : const int responseID = check_commandGetResult(inMsg, 0, -1, true);
139 47323 : if ((responseID >= libsumo::RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE && responseID <= libsumo::RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE) ||
140 47323 : (responseID >= libsumo::RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE && responseID <= libsumo::RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE)) {
141 18120 : readVariableSubscription(responseID, inMsg);
142 : } else {
143 29203 : readContextSubscription(responseID, inMsg);
144 : }
145 : }
146 210540 : }
147 :
148 :
149 : void
150 2 : Connection::setOrder(int order) {
151 2 : std::unique_lock<std::mutex> lock{ myMutex };
152 2 : tcpip::Storage outMsg;
153 : // command length
154 2 : outMsg.writeUnsignedByte(1 + 1 + 4);
155 : // command id
156 2 : outMsg.writeUnsignedByte(libsumo::CMD_SETORDER);
157 : // client index
158 2 : outMsg.writeInt(order);
159 2 : mySocket.sendExact(outMsg);
160 :
161 2 : tcpip::Storage inMsg;
162 2 : check_resultState(inMsg, libsumo::CMD_SETORDER);
163 4 : }
164 :
165 :
166 : void
167 644181 : Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
168 644181 : if (!mySocket.has_client_connection()) {
169 0 : throw libsumo::FatalTraCIError("Connection already closed.");
170 : }
171 644181 : myOutput.reset();
172 : // command length
173 : int length = 1 + 1;
174 644181 : if (varID >= 0) {
175 : length += 1;
176 643538 : if (objID != nullptr) {
177 643352 : length += 4 + (int)objID->length();
178 : }
179 : }
180 644181 : if (add != nullptr) {
181 39951 : length += (int)add->size();
182 : }
183 644181 : if (length <= 255) {
184 644173 : myOutput.writeUnsignedByte(length);
185 : } else {
186 8 : myOutput.writeUnsignedByte(0);
187 8 : myOutput.writeInt(length + 4);
188 : }
189 644181 : myOutput.writeUnsignedByte(cmdID);
190 644181 : if (varID >= 0) {
191 643538 : myOutput.writeUnsignedByte(varID);
192 643538 : if (objID != nullptr) {
193 643352 : myOutput.writeString(*objID);
194 : }
195 : }
196 : // additional values
197 644181 : if (add != nullptr) {
198 39951 : myOutput.writeStorage(*add);
199 : }
200 644181 : }
201 :
202 :
203 : void
204 1414 : Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
205 : int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
206 1414 : if (!mySocket.has_client_connection()) {
207 0 : throw tcpip::SocketException("Socket is not initialised");
208 : }
209 : const bool isContext = domain != -1;
210 1414 : tcpip::Storage outMsg;
211 1414 : outMsg.writeUnsignedByte(domID); // command id
212 1414 : outMsg.writeDouble(beginTime);
213 1414 : outMsg.writeDouble(endTime);
214 1414 : outMsg.writeString(objID);
215 1414 : if (isContext) {
216 934 : outMsg.writeUnsignedByte(domain);
217 934 : outMsg.writeDouble(range);
218 : }
219 1414 : if (vars.size() == 1 && vars.front() == -1) {
220 30 : if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
221 : // default for vehicles is edge id and lane position
222 3 : outMsg.writeUnsignedByte(2);
223 3 : outMsg.writeUnsignedByte(libsumo::VAR_ROAD_ID);
224 3 : outMsg.writeUnsignedByte(libsumo::VAR_LANEPOSITION);
225 : } else {
226 : // default for detectors is vehicle number, for all others (and contexts) id list
227 27 : outMsg.writeUnsignedByte(1);
228 27 : const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
229 27 : || domID == libsumo::CMD_SUBSCRIBE_LANEAREA_VARIABLE
230 : || domID == libsumo::CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
231 : || domID == libsumo::CMD_SUBSCRIBE_LANE_VARIABLE
232 : || domID == libsumo::CMD_SUBSCRIBE_EDGE_VARIABLE;
233 27 : outMsg.writeUnsignedByte(isDetector ? libsumo::LAST_STEP_VEHICLE_NUMBER : libsumo::TRACI_ID_LIST);
234 : }
235 : } else {
236 1384 : outMsg.writeUnsignedByte((int)vars.size());
237 2539 : for (const int v : vars) {
238 1155 : outMsg.writeUnsignedByte(v);
239 : const auto& paramEntry = params.find(v);
240 1155 : if (paramEntry != params.end()) {
241 48 : outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
242 : }
243 : }
244 : }
245 1414 : tcpip::Storage complete;
246 1414 : complete.writeUnsignedByte(0);
247 1414 : complete.writeInt(5 + (int)outMsg.size());
248 1414 : complete.writeStorage(outMsg);
249 1414 : std::unique_lock<std::mutex> lock{ myMutex };
250 : // send message
251 1414 : mySocket.sendExact(complete);
252 :
253 1414 : tcpip::Storage inMsg;
254 1414 : check_resultState(inMsg, domID);
255 1183 : if (!vars.empty()) {
256 846 : const int responseID = check_commandGetResult(inMsg, domID);
257 846 : if (isContext) {
258 370 : readContextSubscription(responseID, inMsg);
259 : } else {
260 476 : readVariableSubscription(responseID, inMsg);
261 : }
262 : }
263 3059 : }
264 :
265 :
266 : void
267 751498 : Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
268 751498 : mySocket.receiveExact(inMsg);
269 : int cmdLength;
270 : int cmdId;
271 : int resultType;
272 : int cmdStart;
273 : std::string msg;
274 : try {
275 751498 : cmdStart = inMsg.position();
276 751498 : cmdLength = inMsg.readUnsignedByte();
277 751498 : cmdId = inMsg.readUnsignedByte();
278 751498 : resultType = inMsg.readUnsignedByte();
279 751498 : msg = inMsg.readString();
280 0 : } catch (std::invalid_argument&) {
281 0 : throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
282 0 : }
283 751498 : switch (resultType) {
284 455 : case libsumo::RTYPE_ERR:
285 910 : throw libsumo::TraCIException(msg);
286 25 : case libsumo::RTYPE_NOTIMPLEMENTED:
287 50 : throw libsumo::TraCIException(".. Sent command is not implemented (" + toHex(command) + "), [description: " + msg + "]");
288 751018 : case libsumo::RTYPE_OK:
289 751018 : if (acknowledgement != nullptr) {
290 2373 : (*acknowledgement) = ".. Command acknowledged (" + toHex(command) + "), [description: " + msg + "]";
291 : }
292 : break;
293 0 : default:
294 0 : throw libsumo::TraCIException(".. Answered with unknown result code(" + toHex(resultType) + ") to command(" + toHex(command) + "), [description: " + msg + "]");
295 : }
296 751018 : if (command != cmdId && !ignoreCommandId) {
297 0 : throw libsumo::TraCIException("#Error: received status response to command: " + toHex(cmdId) + " but expected: " + toHex(command));
298 : }
299 751018 : if ((cmdStart + cmdLength) != (int) inMsg.position()) {
300 0 : throw libsumo::TraCIException("#Error: command at position " + toHex(cmdStart) + " has wrong length");
301 : }
302 751018 : }
303 :
304 :
305 : int
306 675541 : Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
307 675541 : int length = inMsg.readUnsignedByte();
308 675541 : if (length == 0) {
309 59467 : length = inMsg.readInt();
310 : }
311 675541 : int cmdId = inMsg.readUnsignedByte();
312 675541 : if (!ignoreCommandId && cmdId != (command + 0x10)) {
313 0 : throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
314 : }
315 675541 : if (expectedType >= 0) {
316 : // not called from the TraCITestClient but from within the Connection
317 627372 : inMsg.readUnsignedByte(); // variableID
318 627372 : inMsg.readString(); // objectID
319 627372 : int valueDataType = inMsg.readUnsignedByte();
320 627372 : if (valueDataType != expectedType) {
321 0 : throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
322 : }
323 : }
324 675541 : return cmdId;
325 : }
326 :
327 :
328 : tcpip::Storage&
329 643995 : Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add, int expectedType) {
330 643995 : createCommand(command, var, &id, add);
331 643995 : mySocket.sendExact(myOutput);
332 643995 : myInput.reset();
333 643995 : check_resultState(myInput, command);
334 643747 : if (expectedType >= 0) {
335 627372 : check_commandGetResult(myInput, command, expectedType);
336 : }
337 643747 : return myInput;
338 : }
339 :
340 :
341 : void
342 186 : Connection::addFilter(int var, tcpip::Storage* add) {
343 186 : std::unique_lock<std::mutex> lock{ myMutex };
344 186 : createCommand(libsumo::CMD_ADD_SUBSCRIPTION_FILTER, var, nullptr, add);
345 186 : mySocket.sendExact(myOutput);
346 186 : myInput.reset();
347 186 : check_resultState(myInput, libsumo::CMD_ADD_SUBSCRIPTION_FILTER);
348 185 : }
349 :
350 :
351 : void
352 113448 : Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
353 254007 : while (variableCount > 0) {
354 :
355 140559 : const int variableID = inMsg.readUnsignedByte();
356 140559 : const int status = inMsg.readUnsignedByte();
357 140559 : const int type = inMsg.readUnsignedByte();
358 :
359 140559 : if (status == libsumo::RTYPE_OK) {
360 140559 : switch (type) {
361 17847 : case libsumo::TYPE_DOUBLE:
362 17847 : into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
363 17847 : break;
364 16528 : case libsumo::TYPE_STRING:
365 33056 : into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
366 16528 : break;
367 92809 : case libsumo::POSITION_2D: {
368 : auto p = std::make_shared<libsumo::TraCIPosition>();
369 92809 : p->x = inMsg.readDouble();
370 92809 : p->y = inMsg.readDouble();
371 92809 : into[objectID][variableID] = p;
372 : break;
373 : }
374 28 : case libsumo::POSITION_3D: {
375 : auto p = std::make_shared<libsumo::TraCIPosition>();
376 28 : p->x = inMsg.readDouble();
377 28 : p->y = inMsg.readDouble();
378 28 : p->z = inMsg.readDouble();
379 28 : into[objectID][variableID] = p;
380 : break;
381 : }
382 0 : case libsumo::TYPE_COLOR: {
383 : auto c = std::make_shared<libsumo::TraCIColor>();
384 0 : c->r = (unsigned char)inMsg.readUnsignedByte();
385 0 : c->g = (unsigned char)inMsg.readUnsignedByte();
386 0 : c->b = (unsigned char)inMsg.readUnsignedByte();
387 0 : c->a = (unsigned char)inMsg.readUnsignedByte();
388 0 : into[objectID][variableID] = c;
389 : break;
390 : }
391 8354 : case libsumo::TYPE_INTEGER:
392 8354 : into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
393 8354 : break;
394 437 : case libsumo::TYPE_STRINGLIST: {
395 : auto sl = std::make_shared<libsumo::TraCIStringList>();
396 437 : int n = inMsg.readInt();
397 1677 : for (int i = 0; i < n; ++i) {
398 2480 : sl->value.push_back(inMsg.readString());
399 : }
400 437 : into[objectID][variableID] = sl;
401 : }
402 437 : break;
403 4556 : case libsumo::TYPE_COMPOUND: {
404 4556 : int n = inMsg.readInt();
405 4556 : if (n == 2) {
406 4556 : inMsg.readUnsignedByte();
407 4556 : const std::string s = inMsg.readString();
408 4556 : const int secondType = inMsg.readUnsignedByte();
409 4556 : if (secondType == libsumo::TYPE_DOUBLE) {
410 : auto r = std::make_shared<libsumo::TraCIRoadPosition>();
411 4535 : r->edgeID = s;
412 4535 : r->pos = inMsg.readDouble();
413 4535 : into[objectID][variableID] = r;
414 21 : } else if (secondType == libsumo::TYPE_STRING) {
415 : auto sl = std::make_shared<libsumo::TraCIStringList>();
416 21 : sl->value.push_back(s);
417 21 : sl->value.push_back(inMsg.readString());
418 21 : into[objectID][variableID] = sl;
419 : }
420 : }
421 : }
422 : break;
423 :
424 : // TODO Other data types
425 :
426 0 : default:
427 0 : throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
428 : }
429 : } else {
430 0 : throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
431 : }
432 :
433 140559 : variableCount--;
434 : }
435 113448 : }
436 :
437 :
438 : void
439 18596 : Connection::readVariableSubscription(int responseID, tcpip::Storage& inMsg) {
440 18596 : const std::string objectID = inMsg.readString();
441 18596 : const int variableCount = inMsg.readUnsignedByte();
442 18596 : readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
443 18596 : }
444 :
445 :
446 : void
447 29573 : Connection::readContextSubscription(int responseID, tcpip::Storage& inMsg) {
448 29573 : const std::string contextID = inMsg.readString();
449 29573 : inMsg.readUnsignedByte(); // context domain
450 29573 : const int variableCount = inMsg.readUnsignedByte();
451 29573 : int numObjects = inMsg.readInt();
452 : // the following also instantiates the empty map to get comparable results with libsumo
453 : // see also https://github.com/eclipse/sumo/issues/7288
454 29573 : libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
455 124425 : while (numObjects-- > 0) {
456 94852 : const std::string& objectID = inMsg.readString();
457 94852 : results[objectID]; // instantiate empty map for id lists
458 94852 : readVariables(inMsg, objectID, variableCount, results);
459 : }
460 29573 : }
461 :
462 :
463 : }
464 :
465 :
466 : /****************************************************************************/
|