-
Notifications
You must be signed in to change notification settings - Fork 1
/
particle_filter.py
331 lines (284 loc) · 14 KB
/
particle_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""
Code adapted from:
Wang S., Colas F., Liu M., Mondada F., Magnenat S. (2018) Localization of Inexpensive Robots with Low-Bandwidth
Sensors. In: Groß R. et al. (eds) Distributed Autonomous Robotic Systems. Springer Proceedings in Advanced
Robotics, vol 6. Springer, Cham
"""
import numpy as np
from numba import jit
import utils as ut
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
class MonteCarlo:
""" Class containing all the localisation functions, using a particle filter """
def __init__(self, ground_map_left, ground_map_right, particles_count, sigma_obs, prop_uniform, alpha_xy,
alpha_theta, state_init=None):
# sanity check on parameters
assert ground_map_left.dtype == np.double
assert ground_map_right.dtype == np.double
assert ground_map_left.shape[0] == ground_map_right.shape[0]
assert ground_map_left.shape[1] == ground_map_right.shape[1]
# copy parameters
self.ground_map_left = ground_map_left
self.ground_map_right = ground_map_right
self.N_uniform = int(prop_uniform * particles_count)
self.alpha_xy = alpha_xy
self.alpha_theta = alpha_theta
self.sigma_obs = sigma_obs
# setup limits
self.conf_theta = np.radians(10)
self.conf_xy = 3
# create initial particles
if state_init is None:
particles = np.random.uniform(0, 1, size=[particles_count, 3])
particles = particles * [ground_map_left.shape[0], ground_map_left.shape[1], np.pi * 2]
else:
particles = np.random.normal(state_init, np.asarray([1, 1, np.pi/20]), size=(particles_count, 3))
self.particles = particles
self.weights = np.zeros(particles_count)
self.estimated_particle = np.zeros_like(particles[0, :], dtype=float)
self.fig = None
def apply_obs_and_resample(self, left_color, right_color):
""" Apply observation (values measured on the map) in updating probability weights of each particle,
then resample according to these weights """
mapshape = self.ground_map_left.shape
self.particles, self.weights = self._apply_obs_and_resample(self.particles, self.particles.shape[0],
self.ground_map_left, self.ground_map_right,
self.sigma_obs, self.N_uniform, mapshape,
left_color, right_color)
@staticmethod
@jit(nopython=True)
def _apply_obs_and_resample(particles, particles_count, ground_map_left, ground_map_right,
sigma_obs, N_uniform, mapshape, left_color, right_color):
""" Optimised function, see apply_obs_and_resample for more info """
ratioA0 = 1.0877 # because the printed map doesn't have the theoretical dimension
particles_count = particles_count
weights = np.zeros(particles_count)
nb_ok = 0
for i in range(particles_count):
theta = particles[i, 2]
# compute position of sensors in world coordinates
rot = ut.rot_mat2(theta)
left_sensor_pos = rot.dot(np.array([7.2*ratioA0, 1.1*ratioA0])) + particles[i, 0:2]
right_sensor_pos = rot.dot(np.array([7.2*ratioA0, -1.1*ratioA0])) + particles[i, 0:2]
if not ut.is_in_bound(mapshape, left_sensor_pos) or not ut.is_in_bound(mapshape, right_sensor_pos):
# kill particle if out of map
weights[i] = 0.
else:
# otherwise, compute weight in function of ground color
# left sensor
ground_val_left = ground_map_left[ut.xyW2C(left_sensor_pos[0]), ut.xyW2C(left_sensor_pos[1])]
left_weight = ut.norm(left_color, ground_val_left, sigma_obs)
# right sensor
ground_val_right = ground_map_right[ut.xyW2C(right_sensor_pos[0]), ut.xyW2C(right_sensor_pos[1])]
right_weight = ut.norm(right_color, ground_val_right, sigma_obs)
# compute weight
weights[i] = left_weight * right_weight # + 1e-6
# update matching particles
if weights[i] > 0.5:
nb_ok += 1
# ratio matching particles
# print("Proportion of matching particles:", 1. * nb_ok / len(weights))
# Resample
resample_count = particles_count - N_uniform
if not(weights.sum() > 0.):
print("not(weights.sum() > 0.):", weights.sum())
print(weights / weights.sum())
weights += 1e-10 # added for numerical stability
else:
weights /= weights.sum()
# self.weights = weights
# resampled = particles[np.random.choice(particles_count, resample_count, p=weights)]
# workaround for numba
csum = np.cumsum(weights)
resampled = np.empty((resample_count, 3))
for k in range(resample_count):
resampled[k,:] = particles[np.searchsorted(csum, np.random.random(), side="right"), :]
# resampled = np.array(tmp)
# if N_uniform:
# new_particles = ut.weird(N_uniform, mapshape)
# particles[resample_count:] = new_particles
# particles[:resample_count] = resampled
# else:
particles[:resample_count] = resampled
# assert particles.shape[0] == particles_count # assert not supported by numba
if not(particles.shape[0] == particles_count):
print("Assert failure: not(particles.shape[0] == particles_count)")
# add adaptive noise to fight particle depletion
one_N3 = 1. / pow(particles_count, 1. / 3.)
range_x = np.float(mapshape[0]) * one_N3
range_y = np.float(mapshape[1]) * one_N3
range_theta = 2. * np.pi * one_N3
particles[:, 0] += np.random.uniform(-range_x / 2., range_x / 2., particles_count)
particles[:, 1] += np.random.uniform(-range_y / 2., range_y / 2., particles_count)
particles[:, 2] += np.random.uniform(-range_theta / 2., range_theta / 2., particles_count)
return particles, weights
def apply_command(self, d_x, d_y, d_theta):
""" Move the particles according to the measured odometry
:input d_x d_y d_theta measured by the odometry
"""
self.particles = self._apply_command(self.particles, self.particles.shape[0], self.alpha_theta, self.alpha_xy,
np.float(d_x), np.float(d_y), d_theta)
@staticmethod
@jit(nopython=True)
def _apply_command(particles, particles_count, alpha_theta, alpha_xy, d_x, d_y, d_theta):
""" Optimised function, see apply_command for more info """
d_xy = np.array([d_x, d_y])
particles = np.asarray(particles)
particles_count = particles_count
# error model
norm_xy = np.sqrt(d_x ** 2 + d_y ** 2)
e_theta = alpha_theta * abs(d_theta) + np.radians(0.25)
# assert e_theta > 0, e_theta
if not (e_theta > 0):
print("ERROR, e_theta <= 0")
e_xy = alpha_xy * norm_xy + 0.01
# assert e_xy > 0, e_xy
if not e_xy > 0:
print("ERROR, e_xy <= 0")
# apply command and sampled noise to each particle
for i in range(particles_count):
theta = particles[i, 2]
particles[i, 0:2] += ut.rot_mat2(theta).dot(d_xy) + np.random.normal(0., e_xy, 2)
particles[i, 2] = theta + d_theta + np.random.normal(0., e_theta)
return particles
def estimate_state(self):
""" Run the RANSAC algorithm to find the best approximation of the state
:return estimated_particle --> np.array(x,y,theta)
confidence --> a percentage
"""
estimated_particle, confidence = self._estimate_state(self.particles, self.particles.shape[0], self.conf_xy,
self.conf_theta)
self.estimated_particle = estimated_particle
return estimated_particle, confidence
@staticmethod
@jit(nopython=True)
def _estimate_state(particles, particles_count, conf_xy, conf_theta):
""" Optimised function, see estimate_state for more info """
# limits for considering participating to the state estimation
theta_lim = np.radians(5)
xy_lim = 1.5
# particles = self.particles
max_index = particles_count - 1
iterations_count = round(particles_count/100) # 500
tests_count = round(particles_count/100) # 500
# assert iterations_count <= max_index and tests_count <= max_index
# no replacement
tmp = []
for i in range(max_index):
tmp.append(i)
lin = np.array(tmp)
# np.array(np.linspace(0, max_index, max_index + 1), dtype=np.int)
iteration_indices = np.random.choice(lin, iterations_count, replace=False)
test_indices = np.random.choice(lin, tests_count, replace=False)
best_index = -1
support, best_support = 0, 0
# tries a certain number of times
for i in range(iterations_count):
index = iteration_indices[i]
x = particles[index, 0]
y = particles[index, 1]
theta = particles[index, 2]
support = 0
for j in range(tests_count):
o_index = test_indices[j]
o_x = particles[o_index, 0]
o_y = particles[o_index, 1]
o_theta = particles[o_index, 2]
# compute distance
dist_xy = np.sqrt((x - o_x) * (x - o_x) + (y - o_y) * (y - o_y))
dist_theta = ut.normalize_angle(theta - o_theta)
if dist_xy < xy_lim and dist_theta < theta_lim:
support += 1
# if it beats best, replace best
if support > best_support:
best_index = index
best_support = support
# then do the averaging for best index
x = particles[best_index, 0]
y = particles[best_index, 1]
theta = particles[best_index, 2]
count, conf_count, xs, ys, ths = 0, 0, 0, 0, 0
sins, coss = [], []
for j in range(tests_count):
o_index = test_indices[j]
o_x = particles[o_index, 0]
o_y = particles[o_index, 1]
o_theta = particles[o_index, 2]
dist_xy = np.sqrt((x - o_x) * (x - o_x) + (y - o_y) * (y - o_y))
dist_theta = ut.normalize_angle(theta - o_theta)
if dist_xy < xy_lim and dist_theta < theta_lim:
sins.append(np.sin(o_theta))
coss.append(np.cos(o_theta))
# ths += ut.normalize_angle(theta)
xs += o_x
ys += o_y
count += 1
if dist_xy < conf_xy and dist_theta < conf_theta:
conf_count += 1
# assert count > 0, count
x_m = xs / count
y_m = ys / count
theta_m = np.arctan2(np.sum(np.asarray(sins)), np.sum(np.asarray(coss))) # ths / count
return np.array([x_m, y_m, theta_m]), float(conf_count) / float(tests_count)
# mean = np.mean(self.particles, axis=0)
# self.estimated_particle = mean
# return mean[0], mean[1], mean[2], 42
def plot_state(self, base_filename=None, odom=np.array([-1, -1, -1]),
map_back=None, num_particles=-1, sens=None, path=None):
""" Write particles to an image """
ratioA0 = 1.0877 # because the printed map doesn't have the theoretical dimension
if base_filename is not None:
fig = Figure((3, 3), tight_layout=True)
canvas = FigureCanvas(fig)
else:
if self.fig is None:
self.fig = plt.figure()
fig = self.fig
else:
fig = self.fig
fig.clf()
ax = fig.gca()
ax.set_xlim([0, self.ground_map_left.shape[0]])
ax.set_ylim([0, self.ground_map_left.shape[1]])
x, y, theta = self.estimated_particle
if map_back is not None:
ax.imshow(np.uint8(map_back * 255))
if path is not None:
p = np.array(path)
ax.plot(p[0,0], p[0,1], 'rx', markersize=3)
ax.plot(p[1:,0], p[1:,1], 'bx', markersize=3)
if num_particles > 0:
idx = np.random.choice(self.particles.shape[0], num_particles)
for i in idx:
x, y, theta = self.particles[i, :]
ax.arrow(x, y, np.cos(theta), np.sin(theta), head_width=0.8, head_length=1, fc='k', ec='k', alpha=0.3)
if odom[0] != -1 and odom[1] != -1:
ax.arrow(odom[0], odom[1], np.cos(odom[2]) * 2, np.sin(odom[2]) * 2, head_width=1, head_length=1.2, fc='green',
ec='green')
ax.arrow(x, y, np.cos(theta) * 2, np.sin(theta) * 2, head_width=1, head_length=1.2, fc='blue', ec='blue')
if sens is not None:
# if gt[0] != -1 and gt[1] != -1:
# rot = ut.rot_mat2(gt[2])
# left_sensor_pos = rot.dot([7.2*ratioA0, 1.1*ratioA0]) + np.asarray(gt[0:2])
# right_sensor_pos = rot.dot([7.2*ratioA0, -1.1*ratioA0]) + np.asarray(gt[0:2])
# else:
rot = ut.rot_mat2(theta)
left_sensor_pos = rot.dot([7.2*ratioA0, 1.1*ratioA0]) + np.asarray([x, y])
right_sensor_pos = rot.dot([7.2*ratioA0, -1.1*ratioA0]) + np.asarray([x, y])
if sens[0] > 500:
col = 'wo'
else:
col = 'ko'
ax.plot(left_sensor_pos[0], left_sensor_pos[1], col, markersize=1)
if sens[1] > 500:
col = 'wo'
else:
col = 'ko'
ax.plot(right_sensor_pos[0], right_sensor_pos[1], col, markersize=1)
if base_filename is not None:
canvas.print_figure(base_filename + '.png', dpi=300)
# else:
# ax.show()
# self.fig.show()