Hi Jakob (et al.),
I needed the same - to run multiple sumo instances in parallel.
Unfortunately Python bindings for Traci do not support that since the
traci package is written as a state machine.
I've worked on it a bit and adapted the traci to support multiple
coexistent connections. The changes are extensive, but trivial. I have
wrapped the IO-related stuff in a class TraciAdapter, whose constructor
is now derived from what originally was traci.init(). Then I wrapped
functionality of vehicle and route modules in classes Vehicle and Route
and I instantiate them in TraciAdapter's constructor.
Except that now you have to access traci objects through instances of
TraciAdapter, the interface remains the same otherwise.
My experiments are limited to adding a vehicle and to collecting data
about it. I worked only on concerned parts of the package. I am sending
you the code as mere proof of concept, in case you are interested in
such a functionality.
Matej.
PS: I tried to send you full package, but our mailserver blacklists
zipped attachements, so I am sending you the three afffected files only.
PS2: simplified example that controls 2 sumo instances, adds a vehicle
to each and dumps their speeds as they progress in time
import traci
tad=traci.TraciAdapter(port)
tad.route.add('probe_route', edges)
tad.vehicle.add('probe', 'probe_route')
tad1=traci.TraciAdapter(port+1)
tad1.route.add('probe_route', edges)
tad1.vehicle.add('probe', 'probe_route')
while(True):
tad.simulationStep()
tad1.simulationStep()
print tad.vehicle.getSpeed('probe'), tad1.vehicle.getSpeed('probe')
On 17.12.2015 15:54, Jakob Erdmann wrote:
Yes. You can run multiple instances of sumo at the same time. It is even
possible to control multiple instances from the same TraCI script as longs
as you are careful with the port numbers.
regards,
Jakob
2015-12-17 14:39 GMT+01:00 Phuong Nguyen <[email protected]>:
Hi,
I'm trying to optimize a traffic scenario using optimization algorithm and
sumo. In the optimization process, I need to call sumo to run the scenario
simulation so many time. Can a number of the simulations run parallel?
Thanks so much.
--
Ms. Nguyen Thi Mai Phuong
Division of Science Management and International Relations,
Department of Network and Communications,
Thai Nguyen University of Information and Communication Technology,
Thai Nguyen city, Thai Nguyen province, Vietnam.
Email:[email protected]
Tel: 0985 18 38 48
------------------------------------------------------------------------------
_______________________________________________
sumo-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/sumo-user
------------------------------------------------------------------------------
_______________________________________________
sumo-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/sumo-user
# -*- coding: utf-8 -*-
"""
@file __init__.py
@author Michael Behrisch
@author Lena Kalleske
@author Mario Krumnow
@author Daniel Krajzewicz
@author Jakob Erdmann
@date 2008-10-09
@version $Id: __init__.py 18717 2015-08-25 12:39:40Z behrisch $
Python implementation of the TraCI interface.
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2008-2015 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import print_function
import socket
import time
import struct
from . import constants
_RESULTS = {0x00: "OK", 0x01: "Not implemented", 0xFF: "Error"}
_DEBUG = False
def _STEPS2TIME(step):
"""Conversion from time steps in milliseconds to seconds as float"""
return step / 1000.
def _TIME2STEPS(time):
"""Conversion from (float) time in seconds to milliseconds as int"""
return int(time * 1000)
class TraCIException(Exception):
"""Exception class for all TraCI errors which keep the connection intact"""
def __init__(self, command, errorType, desc):
Exception.__init__(self, desc)
self._command = command
self._type = errorType
def getCommand(self):
return self._command
def getType(self):
return self._type
class FatalTraCIError(Exception):
"""Exception class for all TraCI errors which do not allow for continuation"""
def __init__(self, desc):
Exception.__init__(self, desc)
class Message:
""" A named tuple for internal usage.
Simple "struct" for the composed message string
together with a list of TraCI commands which are inside.
"""
def __init__(self):
self.string = ""
self.queue = []
class Storage:
def __init__(self, content):
self._content = content
self._pos = 0
def read(self, format):
oldPos = self._pos
self._pos += struct.calcsize(format)
return struct.unpack(format, self._content[oldPos:self._pos])
def readInt(self):
return self.read("!i")[0]
def readDouble(self):
return self.read("!d")[0]
def readLength(self):
length = self.read("!B")[0]
if length > 0:
return length
return self.read("!i")[0]
def readString(self):
length = self.read("!i")[0]
return self.read("!%ss" % length)[0]
def readStringList(self):
n = self.read("!i")[0]
list = []
for i in range(n):
list.append(self.readString())
return list
def readShape(self):
length = self.read("!B")[0]
return [self.read("!dd") for i in range(length)]
def ready(self):
return self._pos < len(self._content)
def printDebug(self):
if _DEBUG:
for char in self._content[self._pos:]:
print("%03i %02x %s" % (ord(char), ord(char), char))
class SubscriptionResults:
def __init__(self, valueFunc):
self._results = {}
self._contextResults = {}
self._valueFunc = valueFunc
def _parse(self, varID, data):
if not varID in self._valueFunc:
raise FatalTraCIError("Unknown variable %02x." % varID)
return self._valueFunc[varID](data)
def reset(self):
self._results.clear()
self._contextResults.clear()
def add(self, refID, varID, data):
if refID not in self._results:
self._results[refID] = {}
self._results[refID][varID] = self._parse(varID, data)
def get(self, refID=None):
if refID == None:
return self._results
return self._results.get(refID, None)
def addContext(self, refID, domain, objID, varID=None, data=None):
if refID not in self._contextResults:
self._contextResults[refID] = {}
if objID not in self._contextResults[refID]:
self._contextResults[refID][objID] = {}
if varID != None and data != None:
self._contextResults[refID][objID][
varID] = domain._parse(varID, data)
def getContext(self, refID=None):
if refID == None:
return self._contextResults
return self._contextResults.get(refID, None)
def __repr__(self):
return "<%s, %s>" % (self._results, self._contextResults)
def getParameterAccessors(cmdGetID, cmdSetID):
def getParameter(self, objID, param):
"""getParameter(string, string) -> string
Returns the value of the given parameter for the given objID
"""
self._beginMessage(
cmdGetID, constants.VAR_PARAMETER, objID, 1 + 4 + len(param))
self._message.string += struct.pack("!Bi",
constants.TYPE_STRING, len(param)) + param
result = self._checkResult(cmdGetID, constants.VAR_PARAMETER, objID)
return result.readString()
def setParameter(self, objID, param, value):
"""setParameter(string, string, string) -> string
Sets the value of the given parameter to value for the given objID
"""
self._beginMessage(cmdSetID, constants.VAR_PARAMETER, objID,
1 + 4 + 1 + 4 + len(param) + 1 + 4 + len(value))
self._message.string += struct.pack("!Bi", constants.TYPE_COMPOUND, 2)
self._message.string += struct.pack("!Bi",
constants.TYPE_STRING, len(param)) + param
self._message.string += struct.pack("!Bi",
constants.TYPE_STRING, len(value)) + value
self._sendExact()
return getParameter, setParameter
class TraciAdapter:
from . import inductionloop, multientryexit, trafficlights
from . import lane, vehicle, vehicletype, person, route, areal
from . import poi, polygon, junction, edge, simulation, gui
def _recvExact(self):
try:
result = ""
while len(result) < 4:
t = self._connections[""].recv(4 - len(result))
if not t:
return None
result += t
length = struct.unpack("!i", result)[0] - 4
result = ""
while len(result) < length:
t = self._connections[""].recv(length - len(result))
if not t:
return None
result += t
return Storage(result)
except socket.error:
return None
def _sendExact(self):
length = struct.pack("!i", len(self._message.string) + 4)
self._connections[""].send(length + self._message.string)
result = self._recvExact()
if not result:
self._connections[""].close()
del self._connections[""]
raise FatalTraCIError("connection closed by SUMO")
for self.command in self._message.queue:
prefix = result.read("!BBB")
err = result.readString()
if prefix[2] or err:
self._message.string = ""
self._message.queue = []
raise TraCIException(prefix[1], _RESULTS[prefix[2]], err)
elif prefix[1] != self.command:
raise FatalTraCIError("Received answer %s for command %s." % (prefix[1],
self.command))
elif prefix[1] == constants.CMD_STOP:
length = result.read("!B")[0] - 1
result.read("!%sx" % length)
self._message.string = ""
self._message.queue = []
return result
def _beginMessage(self, cmdID, varID, objID, length=0):
self._message.queue.append(cmdID)
length += 1 + 1 + 1 + 4 + len(objID)
if length <= 255:
self._message.string += struct.pack("!BBBi", length,
cmdID, varID, len(objID)) + str(objID)
else:
self._message.string += struct.pack("!BiBBi", 0, length + 4,
cmdID, varID, len(objID)) + str(objID)
def _sendReadOneStringCmd(self, cmdID, varID, objID):
self._beginMessage(cmdID, varID, objID)
return self._checkResult(cmdID, varID, objID)
def _sendIntCmd(self, cmdID, varID, objID, value):
self._beginMessage(cmdID, varID, objID, 1 + 4)
self._message.string += struct.pack("!Bi", constants.TYPE_INTEGER, value)
self._sendExact()
def _sendDoubleCmd(self, cmdID, varID, objID, value):
self._beginMessage(cmdID, varID, objID, 1 + 8)
self._message.string += struct.pack("!Bd", constants.TYPE_DOUBLE, value)
self._sendExact()
def _sendByteCmd(self, cmdID, varID, objID, value):
self._beginMessage(cmdID, varID, objID, 1 + 1)
self._message.string += struct.pack("!BB", constants.TYPE_BYTE, value)
self._sendExact()
def _sendStringCmd(self, cmdID, varID, objID, value):
self._beginMessage(cmdID, varID, objID, 1 + 4 + len(value))
self._message.string += struct.pack("!Bi", constants.TYPE_STRING,
len(value)) + str(value)
self._sendExact()
def _checkResult(self, cmdID, varID, objID):
result = self._sendExact()
result.readLength()
response, retVarID = result.read("!BB")
objectID = result.readString()
if response - cmdID != 16 or retVarID != varID or objectID != objID:
raise FatalTraCIError("Received answer %s,%s,%s for command %s,%s,%s."
% (response, retVarID, objectID, cmdID, varID, objID))
result.read("!B") # Return type of the variable
return result
def _readSubscription(self, result):
result.printDebug() # to enable this you also need to set _DEBUG to True
result.readLength()
response = result.read("!B")[0]
isVariableSubscription = response >= constants.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE and response <= constants.RESPONSE_SUBSCRIBE_PERSON_VARIABLE
objectID = result.readString()
if not isVariableSubscription:
domain = result.read("!B")[0]
numVars = result.read("!B")[0]
if isVariableSubscription:
while numVars > 0:
varID = result.read("!B")[0]
status, varType = result.read("!BB")
if status:
print("Error!", result.readString())
elif response in _modules:
_modules[response].subscriptionResults.add(
objectID, varID, result)
else:
raise FatalTraCIError(
"Cannot handle subscription response %02x for %s." % (response, objectID))
numVars -= 1
else:
objectNo = result.read("!i")[0]
for o in range(objectNo):
oid = result.readString()
if numVars == 0:
_modules[response].subscriptionResults.addContext(
objectID, _modules[domain].subscriptionResults, oid)
for v in range(numVars):
varID = result.read("!B")[0]
status, varType = result.read("!BB")
if status:
print("Error!", result.readString())
elif response in _modules:
_modules[response].subscriptionResults.addContext(
objectID, _modules[domain].subscriptionResults, oid, varID, result)
else:
raise FatalTraCIError(
"Cannot handle subscription response %02x for %s." % (response, objectID))
return objectID, response
def _subscribe(self, cmdID, begin, end, objID, varIDs, parameters=None):
self._message.queue.append(cmdID)
length = 1 + 1 + 4 + 4 + 4 + len(objID) + 1 + len(varIDs)
if parameters:
for v in varIDs:
if v in parameters:
length += len(parameters[v])
if length <= 255:
self._message.string += struct.pack("!B", length)
else:
self._message.string += struct.pack("!Bi", 0, length + 4)
self._message.string += struct.pack("!Biii",
cmdID, begin, end, len(objID)) + objID
self._message.string += struct.pack("!B", len(varIDs))
for v in varIDs:
self._message.string += struct.pack("!B", v)
if parameters and v in parameters:
self._message.string += parameters[v]
result = self._sendExact()
objectID, response = _readSubscription(result)
if response - cmdID != 16 or objectID != objID:
raise FatalTraCIError("Received answer %02x,%s for subscription command %02x,%s." % (
response, objectID, cmdID, objID))
def _subscribeContext(self, cmdID, begin, end, objID, domain, dist, varIDs):
self._message.queue.append(cmdID)
length = 1 + 1 + 4 + 4 + 4 + len(objID) + 1 + 8 + 1 + len(varIDs)
if length <= 255:
self._message.string += struct.pack("!B", length)
else:
self._message.string += struct.pack("!Bi", 0, length + 4)
self._message.string += struct.pack("!Biii",
cmdID, begin, end, len(objID)) + objID
self._message.string += struct.pack("!BdB", domain, dist, len(varIDs))
for v in varIDs:
self._message.string += struct.pack("!B", v)
result = self._sendExact()
objectID, response = _readSubscription(result)
if response - cmdID != 16 or objectID != objID:
raise FatalTraCIError("Received answer %02x,%s for context subscription command %02x,%s." % (
response, objectID, cmdID, objID))
def __init__(self, port=8813, numRetries=10, host="localhost", label="default"):
self._modules = {
# constants.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE: inductionloop,
# constants.RESPONSE_SUBSCRIBE_MULTI_ENTRY_EXIT_DETECTOR_VARIABLE:
# multientryexit,
# constants.RESPONSE_SUBSCRIBE_AREAL_DETECTOR_VARIABLE: areal,
# constants.RESPONSE_SUBSCRIBE_TL_VARIABLE: trafficlights,
# constants.RESPONSE_SUBSCRIBE_LANE_VARIABLE: lane,
# constants.RESPONSE_SUBSCRIBE_VEHICLE_VARIABLE: vehicle,
# constants.RESPONSE_SUBSCRIBE_PERSON_VARIABLE: person,
# constants.RESPONSE_SUBSCRIBE_VEHICLETYPE_VARIABLE: vehicletype,
# constants.RESPONSE_SUBSCRIBE_ROUTE_VARIABLE: route,
# constants.RESPONSE_SUBSCRIBE_POI_VARIABLE: poi,
# constants.RESPONSE_SUBSCRIBE_POLYGON_VARIABLE: polygon,
# constants.RESPONSE_SUBSCRIBE_JUNCTION_VARIABLE: junction,
# constants.RESPONSE_SUBSCRIBE_EDGE_VARIABLE: edge,
# constants.RESPONSE_SUBSCRIBE_SIM_VARIABLE: simulation,
# constants.RESPONSE_SUBSCRIBE_GUI_VARIABLE: gui,
# constants.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_CONTEXT: inductionloop,
# constants.RESPONSE_SUBSCRIBE_MULTI_ENTRY_EXIT_DETECTOR_CONTEXT:
# multientryexit,
# constants.RESPONSE_SUBSCRIBE_AREAL_DETECTOR_CONTEXT: areal,
# constants.RESPONSE_SUBSCRIBE_TL_CONTEXT: trafficlights,
# constants.RESPONSE_SUBSCRIBE_LANE_CONTEXT: lane,
# constants.RESPONSE_SUBSCRIBE_VEHICLE_CONTEXT: vehicle,
# constants.RESPONSE_SUBSCRIBE_PERSON_CONTEXT: person,
# constants.RESPONSE_SUBSCRIBE_VEHICLETYPE_CONTEXT: vehicletype,
# constants.RESPONSE_SUBSCRIBE_ROUTE_CONTEXT: route,
# constants.RESPONSE_SUBSCRIBE_POI_CONTEXT: poi,
# constants.RESPONSE_SUBSCRIBE_POLYGON_CONTEXT: polygon,
# constants.RESPONSE_SUBSCRIBE_JUNCTION_CONTEXT: junction,
# constants.RESPONSE_SUBSCRIBE_EDGE_CONTEXT: edge,
# constants.RESPONSE_SUBSCRIBE_SIM_CONTEXT: simulation,
# constants.RESPONSE_SUBSCRIBE_GUI_CONTEXT: gui,
# constants.CMD_GET_INDUCTIONLOOP_VARIABLE: inductionloop,
# constants.CMD_GET_MULTI_ENTRY_EXIT_DETECTOR_VARIABLE:
# multientryexit,
# constants.CMD_GET_AREAL_DETECTOR_VARIABLE: areal,
# constants.CMD_GET_TL_VARIABLE: trafficlights,
# constants.CMD_GET_LANE_VARIABLE: lane,
constants.CMD_GET_VEHICLE_VARIABLE: vehicle.Vehicle(self),
# constants.CMD_GET_VEHICLETYPE_VARIABLE: vehicletype,
constants.CMD_GET_ROUTE_VARIABLE: route.Route(self),
# constants.CMD_GET_POI_VARIABLE: poi,
# constants.CMD_GET_POLYGON_VARIABLE: polygon,
# constants.CMD_GET_JUNCTION_VARIABLE: junction,
# constants.CMD_GET_EDGE_VARIABLE: edge,
# constants.CMD_GET_SIM_VARIABLE: simulation,
# constants.CMD_GET_GUI_VARIABLE: gui
}
self._connections = {}
self._message = Message()
for wait in range(1, numRetries + 2):
try:
self._connections[""] = self._connections[label] = socket.socket()
self._connections[label].setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, 1)
self._connections[label].connect((host, port))
break
except socket.error:
time.sleep(wait)
self.vehicle = self._modules[constants.CMD_GET_VEHICLE_VARIABLE]
self.route = self._modules[constants.CMD_GET_ROUTE_VARIABLE]
def simulationStep(self, step=0):
"""
Make a simulation step and simulate up to the given millisecond in sim time.
If the given value is 0 or absent, exactly one step is performed.
Values smaller than or equal to the current sim time result in no action.
"""
self._message.queue.append(constants.CMD_SIMSTEP2)
self._message.string += struct.pack("!BBi", 1 +
1 + 4, constants.CMD_SIMSTEP2, step)
result = self._sendExact()
for module in self._modules.values():
module.subscriptionResults.reset()
numSubs = result.readInt()
responses = []
while numSubs > 0:
responses.append(_readSubscription(result))
numSubs -= 1
return responses
def getVersion(self):
command = constants.CMD_GETVERSION
self._message.queue.append(command)
self._message.string += struct.pack("!BB", 1 + 1, command)
result = self._sendExact()
result.readLength()
response = result.read("!B")[0]
if response != command:
raise FatalTraCIError(
"Received answer %s for command %s." % (response, command))
return result.readInt(), result.readString()
def close(self):
if "" in self._connections:
self._message.queue.append(constants.CMD_CLOSE)
self._message.string += struct.pack("!BB", 1 + 1, constants.CMD_CLOSE)
self._sendExact()
self._connections[""].close()
del self._connections[""]
def switch(self, label):
self._connections[""] = self._connections[label]
# -*- coding: utf-8 -*-
"""
@file vehicle.py
@author Michael Behrisch
@author Lena Kalleske
@author Mario Krumnow
@author Lena Kalleske
@author Jakob Erdmann
@author Laura Bieker
@author Daniel Krajzewicz
@date 2011-03-09
@version $Id: vehicle.py 18756 2015-08-31 19:16:33Z behrisch $
Python implementation of the TraCI interface.
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2011-2015 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
import struct
import traci
import traci.constants as tc
DEPART_TRIGGERED = -1
DEPART_CONTAINER_TRIGGERED = -2
DEPART_NOW = -3
DEPART_SPEED_RANDOM = -2
DEPART_SPEED_MAX = -3
STOP_DEFAULT = 0
STOP_PARKING = 1
STOP_TRIGGERED = 2
STOP_CONTAINER_TRIGGERED = 4
STOP_BUS_STOP = 8
STOP_CONTAINER_STOP = 16
def _readBestLanes(self, result):
result.read("!iB")
nbLanes = result.read("!i")[0] # Length
lanes = []
for i in range(nbLanes):
result.read("!B")
laneID = result.readString()
length, occupation, offset = result.read("!BdBdBb")[1::2]
allowsContinuation = result.read("!BB")[1]
nextLanesNo = result.read("!Bi")[1]
nextLanes = []
for j in range(nextLanesNo):
nextLanes.append(result.readString())
lanes.append(
[laneID, length, occupation, offset, allowsContinuation, nextLanes])
return lanes
def _readLeader(self, result):
result.read("!iB")
vehicleID = result.readString()
result.read("!B")
dist = result.readDouble()
if vehicleID:
return vehicleID, dist
return None
_RETURN_VALUE_FUNC = {tc.ID_LIST: traci.Storage.readStringList,
tc.ID_COUNT: traci.Storage.readInt,
tc.VAR_SPEED: traci.Storage.readDouble,
tc.VAR_SPEED_WITHOUT_TRACI: traci.Storage.readDouble,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_ANGLE: traci.Storage.readDouble,
tc.VAR_ROAD_ID: traci.Storage.readString,
tc.VAR_LANE_ID: traci.Storage.readString,
tc.VAR_LANE_INDEX: traci.Storage.readInt,
tc.VAR_TYPE: traci.Storage.readString,
tc.VAR_ROUTE_ID: traci.Storage.readString,
tc.VAR_ROUTE_INDEX: traci.Storage.readInt,
tc.VAR_COLOR: lambda result: result.read("!BBBB"),
tc.VAR_LANEPOSITION: traci.Storage.readDouble,
tc.VAR_CO2EMISSION: traci.Storage.readDouble,
tc.VAR_COEMISSION: traci.Storage.readDouble,
tc.VAR_HCEMISSION: traci.Storage.readDouble,
tc.VAR_PMXEMISSION: traci.Storage.readDouble,
tc.VAR_NOXEMISSION: traci.Storage.readDouble,
tc.VAR_FUELCONSUMPTION: traci.Storage.readDouble,
tc.VAR_NOISEEMISSION: traci.Storage.readDouble,
tc.VAR_EDGE_TRAVELTIME: traci.Storage.readDouble,
tc.VAR_EDGE_EFFORT: traci.Storage.readDouble,
tc.VAR_ROUTE_VALID: lambda result: bool(result.read("!B")[0]),
tc.VAR_EDGES: traci.Storage.readStringList,
tc.VAR_SIGNALS: traci.Storage.readInt,
tc.VAR_LENGTH: traci.Storage.readDouble,
tc.VAR_MAXSPEED: traci.Storage.readDouble,
tc.VAR_ALLOWED_SPEED: traci.Storage.readDouble,
tc.VAR_VEHICLECLASS: traci.Storage.readString,
tc.VAR_SPEED_FACTOR: traci.Storage.readDouble,
tc.VAR_SPEED_DEVIATION: traci.Storage.readDouble,
tc.VAR_EMISSIONCLASS: traci.Storage.readString,
tc.VAR_WAITING_TIME: traci.Storage.readDouble,
tc.VAR_WIDTH: traci.Storage.readDouble,
tc.VAR_MINGAP: traci.Storage.readDouble,
tc.VAR_SHAPECLASS: traci.Storage.readString,
tc.VAR_ACCEL: traci.Storage.readDouble,
tc.VAR_DECEL: traci.Storage.readDouble,
tc.VAR_IMPERFECTION: traci.Storage.readDouble,
tc.VAR_TAU: traci.Storage.readDouble,
tc.VAR_BEST_LANES: _readBestLanes,
tc.VAR_LEADER: _readLeader,
tc.DISTANCE_REQUEST: traci.Storage.readDouble,
tc.VAR_STOPSTATE: lambda result: result.read("!B")[0],
tc.VAR_DISTANCE: traci.Storage.readDouble}
class Vehicle:
def __init__(self, adapter):
self.ad = adapter
self.subscriptionResults = traci.SubscriptionResults(_RETURN_VALUE_FUNC)
def _getUniversal(self, varID, vehID):
result = self.ad._sendReadOneStringCmd(
tc.CMD_GET_VEHICLE_VARIABLE, varID, vehID)
return _RETURN_VALUE_FUNC[varID](result)
def getIDList(self):
"""getIDList() -> list(string)
Returns a list of ids of all vehicles currently running within the scenario.
"""
return self._getUniversal(tc.ID_LIST, "")
def getIDCount(self):
"""getIDCount() -> integer
Returns the number of vehicle in the network.
"""
return self._getUniversal(tc.ID_COUNT, "")
def getSpeed(self, vehID):
"""getSpeed(string) -> double
Returns the speed in m/s of the named vehicle within the last step.
"""
return self._getUniversal(tc.VAR_SPEED, vehID)
def getSpeedWithoutTraCI(self, vehID):
"""getSpeedWithoutTraCI(string) -> double
.
"""
return self._getUniversal(tc.VAR_SPEED_WITHOUT_TRACI, vehID)
def getPosition(self, vehID):
"""getPosition(string) -> (double, double)
Returns the position of the named vehicle within the last step [m,m].
"""
return self._getUniversal(tc.VAR_POSITION, vehID)
def getAngle(self, vehID):
"""getAngle(string) -> double
Returns the angle in degrees of the named vehicle within the last step.
"""
return self._getUniversal(tc.VAR_ANGLE, vehID)
def getRoadID(self, vehID):
"""getRoadID(string) -> string
Returns the id of the edge the named vehicle was at within the last step.
"""
return self._getUniversal(tc.VAR_ROAD_ID, vehID)
def getLaneID(self, vehID):
"""getLaneID(string) -> string
Returns the id of the lane the named vehicle was at within the last step.
"""
return self._getUniversal(tc.VAR_LANE_ID, vehID)
def getLaneIndex(self, vehID):
"""getLaneIndex(string) -> integer
Returns the index of the lane the named vehicle was at within the last step.
"""
return self._getUniversal(tc.VAR_LANE_INDEX, vehID)
def getTypeID(self, vehID):
"""getTypeID(string) -> string
Returns the id of the type of the named vehicle.
"""
return self._getUniversal(tc.VAR_TYPE, vehID)
def getRouteID(self, vehID):
"""getRouteID(string) -> string
Returns the id of the route of the named vehicle.
"""
return self._getUniversal(tc.VAR_ROUTE_ID, vehID)
def getRouteIndex(self, vehID):
"""getRouteIndex(string) -> int
Returns the index of the current edge within the vehicles route or -1 if the
vehicle has not yet departed
"""
return self._getUniversal(tc.VAR_ROUTE_INDEX, vehID)
def getRoute(self, vehID):
"""getRoute(string) -> list(string)
Returns the ids of the edges the vehicle's route is made of.
"""
return self._getUniversal(tc.VAR_EDGES, vehID)
def getLanePosition(self, vehID):
"""getLanePosition(string) -> double
The position of the vehicle along the lane measured in m.
"""
return self._getUniversal(tc.VAR_LANEPOSITION, vehID)
def getColor(self, vehID):
"""getColor(string) -> (integer, integer, integer, integer)
Returns the vehicle's rgba color.
"""
return self._getUniversal(tc.VAR_COLOR, vehID)
def getCO2Emission(self, vehID):
"""getCO2Emission(string) -> double
Returns the CO2 emission in mg for the last time step.
"""
return self._getUniversal(tc.VAR_CO2EMISSION, vehID)
def getCOEmission(self, vehID):
"""getCOEmission(string) -> double
Returns the CO emission in mg for the last time step.
"""
return self._getUniversal(tc.VAR_COEMISSION, vehID)
def getHCEmission(self, vehID):
"""getHCEmission(string) -> double
Returns the HC emission in mg for the last time step.
"""
return self._getUniversal(tc.VAR_HCEMISSION, vehID)
def getPMxEmission(self, vehID):
"""getPMxEmission(string) -> double
Returns the particular matter emission in mg for the last time step.
"""
return self._getUniversal(tc.VAR_PMXEMISSION, vehID)
def getNOxEmission(self, vehID):
"""getNOxEmission(string) -> double
Returns the NOx emission in mg for the last time step.
"""
return self._getUniversal(tc.VAR_NOXEMISSION, vehID)
def getFuelConsumption(self, vehID):
"""getFuelConsumption(string) -> double
Returns the fuel consumption in ml for the last time step.
"""
return self._getUniversal(tc.VAR_FUELCONSUMPTION, vehID)
def getNoiseEmission(self, vehID):
"""getNoiseEmission(string) -> double
Returns the noise emission in db for the last time step.
"""
return self._getUniversal(tc.VAR_NOISEEMISSION, vehID)
def getPersonNumber(self, vehID):
"""getPersonNumber(string) -> integer
.
"""
return self._getUniversal(tc.VAR_PERSON_NUMBER, vehID)
def getAdaptedTraveltime(self, vehID, time, edgeID):
"""getAdaptedTraveltime(string, double, string) -> double
.
"""
self.ad._beginMessage(tc.CMD_GET_VEHICLE_VARIABLE,
tc.VAR_EDGE_TRAVELTIME, vehID, 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID))
self.ad._message.string += struct.pack("!BiBiBi", tc.TYPE_COMPOUND, 2, tc.TYPE_INTEGER, time,
tc.TYPE_STRING, len(edgeID)) + str(edgeID)
return self.ad._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.VAR_EDGE_TRAVELTIME, vehID).readDouble()
def getEffort(self, vehID, time, edgeID):
"""getEffort(string, double, string) -> double
.
"""
self.ad._beginMessage(tc.CMD_GET_VEHICLE_VARIABLE,
tc.VAR_EDGE_EFFORT, vehID, 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID))
self.ad._message.string += struct.pack("!BiBiBi", tc.TYPE_COMPOUND, 2, tc.TYPE_INTEGER, time,
tc.TYPE_STRING, len(edgeID)) + str(edgeID)
return self.ad._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.VAR_EDGE_EFFORT, vehID).readDouble()
def isRouteValid(self, vehID):
return self._getUniversal(tc.VAR_ROUTE_VALID, vehID)
def getSignals(self, vehID):
"""getSignals(string) -> integer
Returns an integer encoding the state of a vehicle's signals.
"""
return self._getUniversal(tc.VAR_SIGNALS, vehID)
def getLength(self, vehID):
"""getLength(string) -> double
Returns the length in m of the given vehicle.
"""
return self._getUniversal(tc.VAR_LENGTH, vehID)
def getMaxSpeed(self, vehID):
"""getMaxSpeed(string) -> double
Returns the maximum speed in m/s of this vehicle.
"""
return self._getUniversal(tc.VAR_MAXSPEED, vehID)
def getAllowedSpeed(self, vehID):
"""getAllowedSpeed(string) -> double
Returns the maximum allowed speed on the current lane regarding speed factor in m/s for this vehicle.
"""
return self._getUniversal(tc.VAR_ALLOWED_SPEED, vehID)
def getVehicleClass(self, vehID):
"""getVehicleClass(string) -> string
Returns the vehicle class of this vehicle.
"""
return self._getUniversal(tc.VAR_VEHICLECLASS, vehID)
def getSpeedFactor(self, vehID):
"""getSpeedFactor(string) -> double
Returns the chosen speed factor for this vehicle.
"""
return self._getUniversal(tc.VAR_SPEED_FACTOR, vehID)
def getSpeedDeviation(self, vehID):
"""getSpeedDeviation(string) -> double
Returns the maximum speed deviation of the vehicle type.
"""
return self._getUniversal(tc.VAR_SPEED_DEVIATION, vehID)
def getEmissionClass(self, vehID):
"""getEmissionClass(string) -> string
Returns the emission class of this vehicle.
"""
return self._getUniversal(tc.VAR_EMISSIONCLASS, vehID)
def getWaitingTime(self, vehID):
"""getWaitingTime() -> double
The waiting time of a vehicle is defined as the time (in seconds) spent with a
speed below 0.1m/s since the last time it was faster than 0.1m/s.
(basically, the waiting time of a vehicle is reset to 0 every time it moves).
"""
return self._getUniversal(tc.VAR_WAITING_TIME, vehID)
def getWidth(self, vehID):
"""getWidth(string) -> double
Returns the width in m of this vehicle.
"""
return self._getUniversal(tc.VAR_WIDTH, vehID)
def getMinGap(self, vehID):
"""getMinGap(string) -> double
Returns the offset (gap to front vehicle if halting) of this vehicle.
"""
return self._getUniversal(tc.VAR_MINGAP, vehID)
def getShapeClass(self, vehID):
"""getShapeClass(string) -> string
Returns the shape class of this vehicle.
"""
return self._getUniversal(tc.VAR_SHAPECLASS, vehID)
def getAccel(self, vehID):
"""getAccel(string) -> double
Returns the maximum acceleration possibility in m/s^2 of this vehicle.
"""
return self._getUniversal(tc.VAR_ACCEL, vehID)
def getDecel(self, vehID):
"""getDecel(string) -> double
Returns the maximum deceleration possibility in m/s^2 of this vehicle.
"""
return self._getUniversal(tc.VAR_DECEL, vehID)
def getImperfection(self, vehID):
"""getImperfection(string) -> double
.
"""
return self._getUniversal(tc.VAR_IMPERFECTION, vehID)
def getTau(self, vehID):
"""getTau(string) -> double
Returns the driver's reaction time in s for this vehicle.
"""
return self._getUniversal(tc.VAR_TAU, vehID)
def getBestLanes(self, vehID):
"""getBestLanes(string) ->
Information about the wish to use subsequent edges' lanes.
"""
return self._getUniversal(tc.VAR_BEST_LANES, vehID)
def getLeader(self, vehID, dist=0.):
"""getLeader(string, double) -> (string, double)
Return the leading vehicle id together with the distance.
The dist parameter defines the maximum lookahead, 0 calculates a lookahead from the brake gap.
"""
self.ad._beginMessage(
tc.CMD_GET_VEHICLE_VARIABLE, tc.VAR_LEADER, vehID, 1 + 8)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, dist)
return _readLeader(self.ad._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.VAR_LEADER, vehID))
def subscribeLeader(self, vehID, dist=0., begin=0, end=2**31 - 1):
"""subscribeLeader(string, double) -> None
Subscribe for the leading vehicle id together with the distance.
The dist parameter defines the maximum lookahead, 0 calculates a lookahead from the brake gap.
"""
self.ad._subscribe(tc.CMD_SUBSCRIBE_VEHICLE_VARIABLE, begin, end, vehID,
(tc.VAR_LEADER,), {tc.VAR_LEADER: struct.pack("!Bd", tc.TYPE_DOUBLE, dist)})
def getDrivingDistance(self, vehID, edgeID, pos, laneID=0):
"""getDrivingDistance(string, string, double, integer) -> double
.
"""
self.ad._beginMessage(tc.CMD_GET_VEHICLE_VARIABLE, tc.DISTANCE_REQUEST,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 8 + 1 + 1)
self.ad._message.string += struct.pack("!BiBi", tc.TYPE_COMPOUND, 2,
tc.POSITION_ROADMAP, len(edgeID)) + str(edgeID)
self.ad._message.string += struct.pack("!dBB",
pos, laneID, tc.REQUEST_DRIVINGDIST)
return self.ad._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.DISTANCE_REQUEST, vehID).readDouble()
def getDrivingDistance2D(self, vehID, x, y):
"""getDrivingDistance2D(string, double, double) -> integer
.
"""
self.ad._beginMessage(
tc.CMD_GET_VEHICLE_VARIABLE, tc.DISTANCE_REQUEST, vehID, 1 + 4 + 1 + 8 + 8 + 1)
self.ad._message.string += struct.pack("!BiBddB", tc.TYPE_COMPOUND, 2,
tc.POSITION_2D, x, y, tc.REQUEST_DRIVINGDIST)
return self.ad._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.DISTANCE_REQUEST, vehID).readDouble()
def getDistance(self, vehID):
"""getDistance(string) -> double
Returns the distance to the starting point like an odometer
"""
return self._getUniversal(tc.VAR_DISTANCE, vehID)
def getStopState(self, vehID):
"""getStopState(string) -> integer
Returns information in regard to stopping:
The returned integer is defined as 1 * stopped + 2 * parking
+ 4 * personTriggered + 8 * containerTriggered + 16 * isBusStop
+ 32 * isContainerStop
with each of these flags defined as 0 or 1
"""
return self._getUniversal(tc.VAR_STOPSTATE, vehID)
def isStopped(self, vehID):
"""isStopped(string) -> bool
Return whether the vehicle is stopped
"""
return (self.getStopState(vehID) & 1) == 1
def isStoppedParking(self, vehID):
"""isStoppedParking(string) -> bool
Return whether the vehicle is parking (implies stopped)
"""
return (self.getStopState(vehID) & 2) == 2
def isStoppedTriggered(self, vehID):
"""isStoppedTriggered(string) -> bool
Return whether the vehicle is stopped and waiting for a person or container
"""
return (self.getStopState(vehID) & 12) > 0
def isAtBusStop(self, vehID):
"""isAtBusStop(string) -> bool
Return whether the vehicle is stopped at a bus stop
"""
return (self.getStopState(vehID) & 16) == 16
def isAtContainerStop(self, vehID):
"""isAtContainerStop(string) -> bool
Return whether the vehicle is stopped at a container stop
"""
return (self.getStopState(vehID) & 32) == 32
def subscribe(self, vehID, varIDs=(tc.VAR_ROAD_ID, tc.VAR_LANEPOSITION), begin=0, end=2**31 - 1):
"""subscribe(string, list(integer), double, double) -> None
Subscribe to one or more vehicle values for the given interval.
"""
self.ad._subscribe(
tc.CMD_SUBSCRIBE_VEHICLE_VARIABLE, begin, end, vehID, varIDs)
def getSubscriptionResults(self, vehID=None):
"""getSubscriptionResults(string) -> dict(integer: <value_type>)
Returns the subscription results for the last time step and the given vehicle.
If no vehicle id is given, all subscription results are returned in a dict.
If the vehicle id is unknown or the subscription did for any reason return no data,
'None' is returned.
It is not possible to retrieve older subscription results than the ones
from the last time step.
"""
return self.subscriptionResults.get(vehID)
def subscribeContext(self, vehID, domain, dist, varIDs=(tc.VAR_ROAD_ID, tc.VAR_LANEPOSITION), begin=0, end=2**31 - 1):
self.ad._subscribeContext(
tc.CMD_SUBSCRIBE_VEHICLE_CONTEXT, begin, end, vehID, domain, dist, varIDs)
def getContextSubscriptionResults(self, vehID=None):
return self.subscriptionResults.getContext(vehID)
def setMaxSpeed(self, vehID, speed):
"""setMaxSpeed(string, double) -> None
Sets the maximum speed in m/s for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_MAXSPEED, vehID, speed)
def setStop(self, vehID, edgeID, pos=1., laneIndex=0, duration=2**31 - 1, flags=STOP_DEFAULT, startPos=tc.INVALID_DOUBLE_VALUE, until=-1):
"""setStop(string, string, double, integer, integer, integer, double, integer) -> None
Adds or modifies a stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_STOP,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8 + 1 + 1 + 1 + 4 + 1 + 1 + 1 + 8 + 1 + 4)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 7)
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRING, len(edgeID)) + str(edgeID)
self.ad._message.string += struct.pack("!BdBBBiBB", tc.TYPE_DOUBLE, pos,
tc.TYPE_BYTE, laneIndex, tc.TYPE_INTEGER, duration, tc.TYPE_BYTE, flags)
self.ad._message.string += struct.pack("!BdBi",
tc.TYPE_DOUBLE, startPos, tc.TYPE_INTEGER, until)
self.ad._sendExact()
def setBusStop(self, vehID, stopID, duration=2**31 - 1, until=-1, flags=STOP_DEFAULT):
"""setBusStop(string, string, integer, integer, integer) -> None
Adds or modifies a bus stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
setStop(vehID, stopID, duration=duration,
until=until, flags=flags | STOP_BUS_STOP)
def setContainerStop(self, vehID, stopID, duration=2**31 - 1, until=-1, flags=STOP_DEFAULT):
"""setContainerStop(string, string, integer, integer, integer) -> None
Adds or modifies a container stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
setStop(vehID, stopID, duration=duration, until=until,
flags=flags | STOP_CONTAINER_STOP)
def resume(self, vehID):
"""resume(string) -> None
Resumes the vehicle from the current stop (throws an error if the vehicle is not stopped).
"""
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_RESUME, vehID, 1 + 4)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 0)
self.ad._sendExact()
def changeLane(self, vehID, laneIndex, duration):
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_CHANGELANE, vehID, 1 + 4 + 1 + 1 + 1 + 4)
self.ad._message.string += struct.pack(
"!BiBBBi", tc.TYPE_COMPOUND, 2, tc.TYPE_BYTE, laneIndex, tc.TYPE_INTEGER, duration)
self.ad._sendExact()
def slowDown(self, vehID, speed, duration):
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_SLOWDOWN, vehID, 1 + 4 + 1 + 8 + 1 + 4)
self.ad._message.string += struct.pack(
"!BiBdBi", tc.TYPE_COMPOUND, 2, tc.TYPE_DOUBLE, speed, tc.TYPE_INTEGER, duration)
self.ad._sendExact()
def changeTarget(self, vehID, edgeID):
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_CHANGETARGET, vehID, edgeID)
def setType(self, vehID, typeID):
"""setType(string, string) -> None
Sets the id of the type for the named vehicle.
"""
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_TYPE, vehID, typeID)
def setRouteID(self, vehID, routeID):
"""setRouteID(string, string) -> None
Changes the vehicles route to the route with the given id.
"""
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_ROUTE_ID, vehID, routeID)
def setRoute(self, vehID, edgeList):
"""
setRoute(string, list) -> None
changes the vehicle route to given edges list.
The first edge in the list has to be the one that the vehicle is at at the moment.
example usage:
setRoute('1', ['1', '2', '4', '6', '7'])
this changes route for vehicle id 1 to edges 1-2-4-6-7
"""
if isinstance(edgeList, str):
edgeList = [edgeList]
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_ROUTE, vehID,
1 + 4 + sum(map(len, edgeList)) + 4 * len(edgeList))
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRINGLIST, len(edgeList))
for edge in edgeList:
self.ad._message.string += struct.pack("!i", len(edge)) + str(edge)
self.ad._sendExact()
def setAdaptedTraveltime(self, vehID, begTime, endTime, edgeID, time):
"""setAdaptedTraveltime(string, double, string, double) -> None
.
"""
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_EDGE_TRAVELTIME,
vehID, 1 + 4 + 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8)
self.ad._message.string += struct.pack("!BiBiBiBi", tc.TYPE_COMPOUND, 4, tc.TYPE_INTEGER, begTime,
tc.TYPE_INTEGER, endTime, tc.TYPE_STRING, len(edgeID)) + str(edgeID)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, time)
self.ad._sendExact()
def setEffort(self, vehID, begTime, endTime, edgeID, effort):
"""setEffort(string, double, string, double) -> None
.
"""
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_EDGE_EFFORT,
vehID, 1 + 4 + 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID) + 1 + 4)
self.ad._message.string += struct.pack("!BiBiBiBi", tc.TYPE_COMPOUND, 4, tc.TYPE_INTEGER, begTime,
tc.TYPE_INTEGER, endTime, tc.TYPE_STRING, len(edgeID)) + str(edgeID)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, effort)
self.ad._sendExact()
def rerouteTraveltime(self, vehID, currentTravelTimes=True):
"""rerouteTraveltime(string, bool) -> None Reroutes a vehicle. If
currentTravelTimes is True (default) then the current traveltime of the
edges is loaded and used for rerouting. If currentTravelTimes is False,
travel times loaded from a weight file are used. In the absence of loaded
weights, the minimum travel time is used (speed limit).
"""
if currentTravelTimes:
for edge in self.ad.edge.getIDList():
self.ad.edge.adaptTraveltime(edge, self.ad.edge.getTraveltime(edge))
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_REROUTE_TRAVELTIME, vehID, 1 + 4)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 0)
self.ad._sendExact()
def rerouteEffort(self, vehID):
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_REROUTE_EFFORT, vehID, 1 + 4)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 0)
self.ad._sendExact()
def setSignals(self, vehID, signals):
"""setSignals(string, integer) -> None
Sets an integer encoding the state of the vehicle's signals.
"""
self.ad._sendIntCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SIGNALS, vehID, signals)
def moveTo(self, vehID, laneID, pos):
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE,
tc.VAR_MOVE_TO, vehID, 1 + 4 + 1 + 4 + len(laneID) + 1 + 8)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 2)
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRING, len(laneID)) + str(laneID)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, pos)
self.ad._sendExact()
def setSpeed(self, vehID, speed):
"""setSpeed(string, double) -> None
Sets the speed in m/s for the named vehicle within the last step.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SPEED, vehID, speed)
def setColor(self, vehID, color):
"""setColor(string, (integer, integer, integer, integer))
sets color for vehicle with the given ID.
i.e. (255,0,0,0) for the color red.
The fourth integer (alpha) is only used when drawing vehicles with raster images
"""
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_COLOR, vehID, 1 + 1 + 1 + 1 + 1)
self.ad._message.string += struct.pack("!BBBBB", tc.TYPE_COLOR, int(
color[0]), int(color[1]), int(color[2]), int(color[3]))
self.ad._sendExact()
def setLength(self, vehID, length):
"""setLength(string, double) -> None
Sets the length in m for the given vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_LENGTH, vehID, length)
def setVehicleClass(self, vehID, clazz):
"""setVehicleClass(string, string) -> None
Sets the vehicle class for this vehicle.
"""
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_VEHICLECLASS, vehID, clazz)
def setSpeedFactor(self, vehID, factor):
"""setSpeedFactor(string, double) -> None
.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SPEED_FACTOR, vehID, factor)
def setSpeedDeviation(self, vehID, deviation):
"""setSpeedDeviation(string, double) -> None
Sets the maximum speed deviation for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SPEED_DEVIATION, vehID, deviation)
def setEmissionClass(self, vehID, clazz):
"""setEmissionClass(string, string) -> None
Sets the emission class for this vehicle.
"""
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_EMISSIONCLASS, vehID, clazz)
def setWidth(self, vehID, width):
"""setWidth(string, double) -> None
Sets the width in m for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_WIDTH, vehID, width)
def setMinGap(self, vehID, minGap):
"""setMinGap(string, double) -> None
Sets the offset (gap to front vehicle if halting) for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_MINGAP, vehID, minGap)
def setShapeClass(self, vehID, clazz):
"""setShapeClass(string, string) -> None
Sets the shape class for this vehicle.
"""
self.ad._sendStringCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SHAPECLASS, vehID, clazz)
def setAccel(self, vehID, accel):
"""setAccel(string, double) -> None
Sets the maximum acceleration in m/s^2 for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_ACCEL, vehID, accel)
def setDecel(self, vehID, decel):
"""setDecel(string, double) -> None
Sets the maximum deceleration in m/s^2 for this vehicle.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_DECEL, vehID, decel)
def setImperfection(self, vehID, imperfection):
"""setImperfection(string, double) -> None
.
"""
self.ad._sendDoubleCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_IMPERFECTION, vehID, imperfection)
def setTau(self, vehID, tau):
"""setTau(string, double) -> None
Sets the driver's reaction time in s for this vehicle.
"""
self.ad._sendDoubleCmd(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_TAU, vehID, tau)
def setLaneChangeMode(self, vehID, lcm):
"""setLaneChangeMode(string, integer) -> None
Sets the vehicle's lane change mode as a bitset.
"""
self.ad._sendIntCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_LANECHANGE_MODE, vehID, lcm)
def setSpeedMode(self, vehID, sm):
"""setLaneChangeMode(string, integer) -> None
Sets the vehicle's speed mode as a bitset.
"""
self.ad._sendIntCmd(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_SPEEDSETMODE, vehID, sm)
def add(self, vehID, routeID, depart=DEPART_NOW, pos=0, speed=0, lane=0, typeID="DEFAULT_VEHTYPE"):
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.ADD, vehID,
1 + 4 + 1 + 4 + len(typeID) + 1 + 4 + len(routeID) + 1 + 4 + 1 + 8 + 1 + 8 + 1 + 1)
if depart > 0:
depart *= 1000
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 6)
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRING, len(typeID)) + str(typeID)
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRING, len(routeID)) + str(routeID)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_INTEGER, depart)
self.ad._message.string += struct.pack("!BdBd",
tc.TYPE_DOUBLE, pos, tc.TYPE_DOUBLE, speed)
self.ad._message.string += struct.pack("!BB", tc.TYPE_BYTE, lane)
self.ad._sendExact()
def addFull(self, vehID, routeID, typeID="DEFAULT_VEHTYPE", depart=None,
departLane="0", departPos="base", departSpeed="0",
arrivalLane="current", arrivalPos="max", arrivalSpeed="current",
fromTaz="", toTaz="", line="", personCapacity=0, personNumber=0):
messageString = struct.pack("!Bi", tc.TYPE_COMPOUND, 14)
if depart is None:
depart = str(self.ad.simulation.getCurrentTime() / 1000.)
for val in (routeID, typeID, depart, departLane, departPos, departSpeed,
arrivalLane, arrivalPos, arrivalSpeed, fromTaz, toTaz, line):
messageString += struct.pack("!Bi",
tc.TYPE_STRING, len(val)) + str(val)
messageString += struct.pack("!Bi", tc.TYPE_INTEGER, personCapacity)
messageString += struct.pack("!Bi", tc.TYPE_INTEGER, personNumber)
self.ad._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.ADD_FULL, vehID, len(messageString))
self.ad._message.string += messageString
self.ad._sendExact()
def remove(self, vehID, reason=tc.REMOVE_VAPORIZED):
'''Remove vehicle with the given ID for the give reason.
Reasons are defined in module constants and start with REMOVE_'''
self.ad._sendByteCmd(tc.CMD_SET_VEHICLE_VARIABLE, tc.REMOVE, vehID, reason)
def moveToVTD(self, vehID, edgeID, lane, x, y, angle):
self.ad._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_MOVE_TO_VTD,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 1 + 4 + 1 + 8 + 1 + 8 + 1 + 8)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_COMPOUND, 5)
self.ad._message.string += struct.pack("!Bi",
tc.TYPE_STRING, len(edgeID)) + str(edgeID)
self.ad._message.string += struct.pack("!Bi", tc.TYPE_INTEGER, lane)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, x)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, y)
self.ad._message.string += struct.pack("!Bd", tc.TYPE_DOUBLE, angle)
self.ad._sendExact()
getParameter, setParameter = traci.getParameterAccessors(
tc.CMD_GET_VEHICLE_VARIABLE, tc.CMD_SET_VEHICLE_VARIABLE)
# -*- coding: utf-8 -*-
"""
@file route.py
@author Michael Behrisch
@author Lena Kalleske
@date 2008-10-09
@version $Id: route.py 18106 2015-03-19 08:08:16Z behrisch $
Python implementation of the TraCI interface.
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2008-2015 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
import traci
import struct
import traci.constants as tc
class Route:
def __init__(self, adapter):
self.getParameter, self.setParameter = traci.getParameterAccessors(
tc.CMD_GET_ROUTE_VARIABLE, tc.CMD_SET_ROUTE_VARIABLE)
self._RETURN_VALUE_FUNC = {tc.ID_LIST: traci.Storage.readStringList,
tc.ID_COUNT: traci.Storage.readInt,
tc.VAR_EDGES: traci.Storage.readStringList}
self.ad = adapter
self.subscriptionResults = traci.SubscriptionResults(self._RETURN_VALUE_FUNC)
def _getUniversal(self, varID, routeID):
result = self.ad._sendReadOneStringCmd(
tc.CMD_GET_ROUTE_VARIABLE, varID, routeID)
return _RETURN_VALUE_FUNC[varID](result)
def getIDList(self):
"""getIDList() -> list(string)
Returns a list of all routes in the network.
"""
return _getUniversal(tc.ID_LIST, "")
def getIDCount(self):
"""getIDCount() -> integer
Returns the number of currently loaded routes.
"""
return _getUniversal(tc.ID_COUNT, "")
def getEdges(self, routeID):
"""getEdges(string) -> list(string)
Returns a list of all edges in the route.
"""
return _getUniversal(tc.VAR_EDGES, routeID)
def subscribe(self, routeID, varIDs=(tc.ID_LIST,), begin=0, end=2**31 - 1):
"""subscribe(string, list(integer), double, double) -> None
Subscribe to one or more route values for the given interval.
"""
self.ad._subscribe(
tc.CMD_SUBSCRIBE_ROUTE_VARIABLE, begin, end, routeID, varIDs)
def getSubscriptionResults(self, routeID=None):
"""getSubscriptionResults(string) -> dict(integer: <value_type>)
Returns the subscription results for the last time step and the given route.
If no route id is given, all subscription results are returned in a dict.
If the route id is unknown or the subscription did for any reason return no data,
'None' is returned.
It is not possible to retrieve older subscription results than the ones
from the last time step.
"""
return subscriptionResults.get(routeID)
def subscribeContext(self, routeID, domain, dist, varIDs=(tc.ID_LIST,), begin=0, end=2**31 - 1):
self.ad._subscribeContext(
tc.CMD_SUBSCRIBE_ROUTE_CONTEXT, begin, end, routeID, domain, dist, varIDs)
def getContextSubscriptionResults(self, routeID=None):
return subscriptionResults.getContext(routeID)
def add(self, routeID, edges):
self.ad._beginMessage(tc.CMD_SET_ROUTE_VARIABLE, tc.ADD, routeID,
1 + 4 + sum(map(len, edges)) + 4 * len(edges))
self.ad._message.string += struct.pack("!Bi", tc.TYPE_STRINGLIST, len(edges))
for e in edges:
self.ad._message.string += struct.pack("!i", len(e)) + str(e)
self.ad._sendExact()
------------------------------------------------------------------------------
_______________________________________________
sumo-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/sumo-user