-
Notifications
You must be signed in to change notification settings - Fork 191
/
traffic_signal.py
executable file
·313 lines (261 loc) · 13.5 KB
/
traffic_signal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""This module contains the TrafficSignal class, which represents a traffic signal in the simulation."""
import os
import sys
from typing import Callable, List, Union
if "SUMO_HOME" in os.environ:
tools = os.path.join(os.environ["SUMO_HOME"], "tools")
sys.path.append(tools)
else:
raise ImportError("Please declare the environment variable 'SUMO_HOME'")
import numpy as np
from gymnasium import spaces
class TrafficSignal:
"""This class represents a Traffic Signal controlling an intersection.
It is responsible for retrieving information and changing the traffic phase using the Traci API.
IMPORTANT: It assumes that the traffic phases defined in the .net file are of the form:
[green_phase, yellow_phase, green_phase, yellow_phase, ...]
Currently it is not supporting all-red phases (but should be easy to implement it).
# Observation Space
The default observation for each traffic signal agent is a vector:
obs = [phase_one_hot, min_green, lane_1_density,...,lane_n_density, lane_1_queue,...,lane_n_queue]
- ```phase_one_hot``` is a one-hot encoded vector indicating the current active green phase
- ```min_green``` is a binary variable indicating whether min_green seconds have already passed in the current phase
- ```lane_i_density``` is the number of vehicles in incoming lane i dividided by the total capacity of the lane
- ```lane_i_queue``` is the number of queued (speed below 0.1 m/s) vehicles in incoming lane i divided by the total capacity of the lane
You can change the observation space by implementing a custom observation class. See :py:class:`sumo_rl.environment.observations.ObservationFunction`.
# Action Space
Action space is discrete, corresponding to which green phase is going to be open for the next delta_time seconds.
# Reward Function
The default reward function is 'diff-waiting-time'. You can change the reward function by implementing a custom reward function and passing to the constructor of :py:class:`sumo_rl.environment.env.SumoEnvironment`.
"""
# Default min gap of SUMO (see https://sumo.dlr.de/docs/Simulation/Safety.html). Should this be parameterized?
MIN_GAP = 2.5
def __init__(
self,
env,
ts_id: str,
delta_time: int,
yellow_time: int,
min_green: int,
max_green: int,
begin_time: int,
reward_fn: Union[str, Callable],
sumo,
):
"""Initializes a TrafficSignal object.
Args:
env (SumoEnvironment): The environment this traffic signal belongs to.
ts_id (str): The id of the traffic signal.
delta_time (int): The time in seconds between actions.
yellow_time (int): The time in seconds of the yellow phase.
min_green (int): The minimum time in seconds of the green phase.
max_green (int): The maximum time in seconds of the green phase.
begin_time (int): The time in seconds when the traffic signal starts operating.
reward_fn (Union[str, Callable]): The reward function. Can be a string with the name of the reward function or a callable function.
sumo (Sumo): The Sumo instance.
"""
self.id = ts_id
self.env = env
self.delta_time = delta_time
self.yellow_time = yellow_time
self.min_green = min_green
self.max_green = max_green
self.green_phase = 0
self.is_yellow = False
self.time_since_last_phase_change = 0
self.next_action_time = begin_time
self.last_measure = 0.0
self.last_reward = None
self.reward_fn = reward_fn
self.sumo = sumo
if type(self.reward_fn) is str:
if self.reward_fn in TrafficSignal.reward_fns.keys():
self.reward_fn = TrafficSignal.reward_fns[self.reward_fn]
else:
raise NotImplementedError(f"Reward function {self.reward_fn} not implemented")
self.observation_fn = self.env.observation_class(self)
self._build_phases()
self.lanes = list(
dict.fromkeys(self.sumo.trafficlight.getControlledLanes(self.id))
) # Remove duplicates and keep order
self.out_lanes = [link[0][1] for link in self.sumo.trafficlight.getControlledLinks(self.id) if link]
self.out_lanes = list(set(self.out_lanes))
self.lanes_length = {lane: self.sumo.lane.getLength(lane) for lane in self.lanes + self.out_lanes}
self.observation_space = self.observation_fn.observation_space()
self.action_space = spaces.Discrete(self.num_green_phases)
def _build_phases(self):
phases = self.sumo.trafficlight.getAllProgramLogics(self.id)[0].phases
if self.env.fixed_ts:
self.num_green_phases = len(phases) // 2 # Number of green phases == number of phases (green+yellow) divided by 2
return
self.green_phases = []
self.yellow_dict = {}
for phase in phases:
state = phase.state
if "y" not in state and (state.count("r") + state.count("s") != len(state)):
self.green_phases.append(self.sumo.trafficlight.Phase(60, state))
self.num_green_phases = len(self.green_phases)
self.all_phases = self.green_phases.copy()
for i, p1 in enumerate(self.green_phases):
for j, p2 in enumerate(self.green_phases):
if i == j:
continue
yellow_state = ""
for s in range(len(p1.state)):
if (p1.state[s] == "G" or p1.state[s] == "g") and (p2.state[s] == "r" or p2.state[s] == "s"):
yellow_state += "y"
else:
yellow_state += p1.state[s]
self.yellow_dict[(i, j)] = len(self.all_phases)
self.all_phases.append(self.sumo.trafficlight.Phase(self.yellow_time, yellow_state))
programs = self.sumo.trafficlight.getAllProgramLogics(self.id)
logic = programs[0]
logic.type = 0
logic.phases = self.all_phases
self.sumo.trafficlight.setProgramLogic(self.id, logic)
self.sumo.trafficlight.setRedYellowGreenState(self.id, self.all_phases[0].state)
@property
def time_to_act(self):
"""Returns True if the traffic signal should act in the current step."""
return self.next_action_time == self.env.sim_step
def update(self):
"""Updates the traffic signal state.
If the traffic signal should act, it will set the next green phase and update the next action time.
"""
self.time_since_last_phase_change += 1
if self.is_yellow and self.time_since_last_phase_change == self.yellow_time:
# self.sumo.trafficlight.setPhase(self.id, self.green_phase)
self.sumo.trafficlight.setRedYellowGreenState(self.id, self.all_phases[self.green_phase].state)
self.is_yellow = False
def set_next_phase(self, new_phase: int):
"""Sets what will be the next green phase and sets yellow phase if the next phase is different than the current.
Args:
new_phase (int): Number between [0 ... num_green_phases]
"""
new_phase = int(new_phase)
if self.green_phase == new_phase or self.time_since_last_phase_change < self.yellow_time + self.min_green:
# self.sumo.trafficlight.setPhase(self.id, self.green_phase)
self.sumo.trafficlight.setRedYellowGreenState(self.id, self.all_phases[self.green_phase].state)
self.next_action_time = self.env.sim_step + self.delta_time
else:
# self.sumo.trafficlight.setPhase(self.id, self.yellow_dict[(self.green_phase, new_phase)]) # turns yellow
self.sumo.trafficlight.setRedYellowGreenState(
self.id, self.all_phases[self.yellow_dict[(self.green_phase, new_phase)]].state
)
self.green_phase = new_phase
self.next_action_time = self.env.sim_step + self.delta_time
self.is_yellow = True
self.time_since_last_phase_change = 0
def compute_observation(self):
"""Computes the observation of the traffic signal."""
return self.observation_fn()
def compute_reward(self):
"""Computes the reward of the traffic signal."""
self.last_reward = self.reward_fn(self)
return self.last_reward
def _pressure_reward(self):
return self.get_pressure()
def _average_speed_reward(self):
return self.get_average_speed()
def _queue_reward(self):
return -self.get_total_queued()
def _diff_waiting_time_reward(self):
ts_wait = sum(self.get_accumulated_waiting_time_per_lane()) / 100.0
reward = self.last_measure - ts_wait
self.last_measure = ts_wait
return reward
def _observation_fn_default(self):
phase_id = [1 if self.green_phase == i else 0 for i in range(self.num_green_phases)] # one-hot encoding
min_green = [0 if self.time_since_last_phase_change < self.min_green + self.yellow_time else 1]
density = self.get_lanes_density()
queue = self.get_lanes_queue()
observation = np.array(phase_id + min_green + density + queue, dtype=np.float32)
return observation
def get_accumulated_waiting_time_per_lane(self) -> List[float]:
"""Returns the accumulated waiting time per lane.
Returns:
List[float]: List of accumulated waiting time of each intersection lane.
"""
wait_time_per_lane = []
for lane in self.lanes:
veh_list = self.sumo.lane.getLastStepVehicleIDs(lane)
wait_time = 0.0
for veh in veh_list:
veh_lane = self.sumo.vehicle.getLaneID(veh)
acc = self.sumo.vehicle.getAccumulatedWaitingTime(veh)
if veh not in self.env.vehicles:
self.env.vehicles[veh] = {veh_lane: acc}
else:
self.env.vehicles[veh][veh_lane] = acc - sum(
[self.env.vehicles[veh][lane] for lane in self.env.vehicles[veh].keys() if lane != veh_lane]
)
wait_time += self.env.vehicles[veh][veh_lane]
wait_time_per_lane.append(wait_time)
return wait_time_per_lane
def get_average_speed(self) -> float:
"""Returns the average speed normalized by the maximum allowed speed of the vehicles in the intersection.
Obs: If there are no vehicles in the intersection, it returns 1.0.
"""
avg_speed = 0.0
vehs = self._get_veh_list()
if len(vehs) == 0:
return 1.0
for v in vehs:
avg_speed += self.sumo.vehicle.getSpeed(v) / self.sumo.vehicle.getAllowedSpeed(v)
return avg_speed / len(vehs)
def get_pressure(self):
"""Returns the pressure (#veh leaving - #veh approaching) of the intersection."""
return sum(self.sumo.lane.getLastStepVehicleNumber(lane) for lane in self.out_lanes) - sum(
self.sumo.lane.getLastStepVehicleNumber(lane) for lane in self.lanes
)
def get_out_lanes_density(self) -> List[float]:
"""Returns the density of the vehicles in the outgoing lanes of the intersection."""
lanes_density = [
self.sumo.lane.getLastStepVehicleNumber(lane)
/ (self.lanes_length[lane] / (self.MIN_GAP + self.sumo.lane.getLastStepLength(lane)))
for lane in self.out_lanes
]
return [min(1, density) for density in lanes_density]
def get_lanes_density(self) -> List[float]:
"""Returns the density [0,1] of the vehicles in the incoming lanes of the intersection.
Obs: The density is computed as the number of vehicles divided by the number of vehicles that could fit in the lane.
"""
lanes_density = [
self.sumo.lane.getLastStepVehicleNumber(lane)
/ (self.lanes_length[lane] / (self.MIN_GAP + self.sumo.lane.getLastStepLength(lane)))
for lane in self.lanes
]
return [min(1, density) for density in lanes_density]
def get_lanes_queue(self) -> List[float]:
"""Returns the queue [0,1] of the vehicles in the incoming lanes of the intersection.
Obs: The queue is computed as the number of vehicles halting divided by the number of vehicles that could fit in the lane.
"""
lanes_queue = [
self.sumo.lane.getLastStepHaltingNumber(lane)
/ (self.lanes_length[lane] / (self.MIN_GAP + self.sumo.lane.getLastStepLength(lane)))
for lane in self.lanes
]
return [min(1, queue) for queue in lanes_queue]
def get_total_queued(self) -> int:
"""Returns the total number of vehicles halting in the intersection."""
return sum(self.sumo.lane.getLastStepHaltingNumber(lane) for lane in self.lanes)
def _get_veh_list(self):
veh_list = []
for lane in self.lanes:
veh_list += self.sumo.lane.getLastStepVehicleIDs(lane)
return veh_list
@classmethod
def register_reward_fn(cls, fn: Callable):
"""Registers a reward function.
Args:
fn (Callable): The reward function to register.
"""
if fn.__name__ in cls.reward_fns.keys():
raise KeyError(f"Reward function {fn.__name__} already exists")
cls.reward_fns[fn.__name__] = fn
reward_fns = {
"diff-waiting-time": _diff_waiting_time_reward,
"average-speed": _average_speed_reward,
"queue": _queue_reward,
"pressure": _pressure_reward,
}