Skip to content

Commit

Permalink
ros sim via pytest fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
jpreiss committed Nov 7, 2021
1 parent f54eb63 commit 21300ff
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 57 deletions.
46 changes: 46 additions & 0 deletions ros_ws/src/crazyswarm/scripts/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import pytest

from pycrazyswarm import Crazyswarm


HAVE_ROS = subprocess.call("type roslaunch", shell=True) == 0

Expand All @@ -10,3 +12,47 @@ def pytest_runtest_setup(item):
markers = [mark.name for mark in item.iter_markers()]
if "ros" in markers and not HAVE_ROS:
pytest.skip("ROS is not installed.")


def pytest_addoption(parser):
parser.addoption("--simros", action="store_true", help="use ROS simulator")


@pytest.fixture
def crazyswarm_ctor(pytestconfig):
if pytestconfig.getoption("simros"):
assert HAVE_ROS, "cannot run --simros tests without ROS installed."
# ROS simulator - start CSW server first
ros_process = None
def ctor(**kwargs):
assert "crazyflies_yaml" in kwargs
# TODO: It might be cleaner to use some kind of filesystem mocking,
# but does it matter?
with open("../launch/crazyflies.yaml", "w") as f:
f.write(kwargs["crazyflies_yaml"])
nonlocal ros_process
ros_process = subprocess.Popen([
"roslaunch",
"crazyswarm",
"hover_swarm.launch",
"--sim"
])
return Crazyswarm(**kwargs)
yield ctor
# Control will NOT return here immediately after the test calls
# `ctor()`. Instead, pytest remembers that the fixture is in `yield`
# state and returns control _after the test finishes_. For details, see
# https://docs.pytest.org/en/latest/how-to/fixtures.html#yield-fixtures-recommended
ros_process.terminate()
else:
# Pycrazyswarm simulator
def ctor(**kwargs):
args = "--sim --vis null "
if "args" in kwargs:
args += kwargs["args"]
kwargs = kwargs.copy()
del kwargs["args"]
return Crazyswarm(args=args, **kwargs)
# The only reason we use `yield` here is for type consistency with the
# ROS config -- see comment in `if` case.
yield ctor
54 changes: 27 additions & 27 deletions ros_ws/src/crazyswarm/scripts/test_collisionAvoidance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
Z = 1.0


def setUp(args):
@pytest.fixture
def setUp(crazyswarm_ctor):
crazyflies_yaml = """
crazyflies:
- id: 1
Expand All @@ -21,13 +22,15 @@ def setUp(args):
channel: 100
initialPosition: [1.0, 0.0, 0.0]
"""
swarm = Crazyswarm(crazyflies_yaml=crazyflies_yaml, args=args)
timeHelper = swarm.timeHelper
return swarm.allcfs, timeHelper
def setup(args=""):
swarm = crazyswarm_ctor(crazyflies_yaml=crazyflies_yaml, args=args)
timeHelper = swarm.timeHelper
return swarm.allcfs, timeHelper
return setup


def test_velocityMode_sidestepWorstCase():
args = "--sim --vis null --dt 0.05 --maxvel 1.0"
def test_velocityMode_sidestepWorstCase(setUp):
args = "--dt 0.05 --maxvel 1.0"
allcfs, timeHelper = setUp(args)
a, b = allcfs.crazyflies

Expand All @@ -48,7 +51,7 @@ def test_velocityMode_sidestepWorstCase():
assert False


def test_goToWithoutCA_CheckCollision():
def test_goToWithoutCA_CheckCollision(setUp):
args = "--sim --vis null"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
Expand All @@ -66,7 +69,7 @@ def test_goToWithoutCA_CheckCollision():
assert False


def test_goToWithCA_CheckCollision():
def test_goToWithCA_CheckCollision(setUp):
args = "--sim --vis null"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
Expand All @@ -84,7 +87,7 @@ def test_goToWithCA_CheckCollision():
timeHelper.sleep(timeHelper.dt)


def test_goToWithCA_CheckDestination():
def test_goToWithCA_CheckDestination(setUp):
args = "--sim --vis null"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
Expand All @@ -102,7 +105,7 @@ def test_goToWithCA_CheckDestination():
assert np.all(np.isclose(cf1.position(), goal1))


def test_goToWithCA_changeEllipsoid():
def test_goToWithCA_changeEllipsoid(setUp):
args = "--sim --vis null"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
Expand Down Expand Up @@ -132,8 +135,8 @@ def test_goToWithCA_changeEllipsoid():
assert collisionHappend


def test_goToWithCA_Intersection():
args = "--sim --vis null --dt 0.01"
def test_goToWithCA_Intersection(setUp):
args = "--dt 0.01"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -156,8 +159,8 @@ def test_goToWithCA_Intersection():
assert np.all(np.isclose(cf1.position(), goal1))


def test_goToWithoutCA_Intersection():
args = "--sim --vis null --dt 0.05"
def test_goToWithoutCA_Intersection(setUp):
args = "--dt 0.05"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -178,15 +181,12 @@ def test_goToWithoutCA_Intersection():
assert False


def test_goToWithCA_random():
def test_goToWithCA_random(setUp):
rows, cols = 3, 5
N = rows * cols
crazyflies_yaml = grid_yaml(rows, cols, spacing=0.5)

args = "--sim --vis null"
swarm = Crazyswarm(crazyflies_yaml=crazyflies_yaml, args=args)
timeHelper = swarm.timeHelper
allcfs = swarm.allcfs
allcfs, timeHelper = setUp()
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)

Expand Down Expand Up @@ -214,8 +214,8 @@ def test_goToWithCA_random():
assert np.all(np.isclose(cf.position(), goal, atol=1e-4))


def test_cmdPosition():
args = "--sim --vis null --maxvel 1.0"
def test_cmdPosition(setUp):
args = "--maxvel 1.0"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -237,8 +237,8 @@ def test_cmdPosition():
assert np.all(np.isclose(cf1.position(), goal1))


def test_boundingBox():
args = "--sim --vis null --maxvel 1.0"
def test_boundingBox(setUp):
args = "--maxvel 1.0"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -264,8 +264,8 @@ def test_boundingBox():


#@pytest.mark.xfail(reason="Bug in firmware")
def test_maxSpeed_zero():
args = "--sim --vis null --maxvel 1.0"
def test_maxSpeed_zero(setUp):
args = "--maxvel 1.0"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -286,8 +286,8 @@ def test_maxSpeed_zero():
assert np.all(np.isclose(cf1.position(), goal0))


def test_maxSpeed_limit():
args = "--sim --vis null --maxvel 1.0"
def test_maxSpeed_limit(setUp):
args = "--maxvel 1.0"
allcfs, timeHelper = setUp(args)
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand Down
39 changes: 24 additions & 15 deletions ros_ws/src/crazyswarm/scripts/test_highLevel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#!/usr/bin/env python

import numpy as np
import pytest

from pycrazyswarm import *
import uav_trajectory


Z = 1.0

def setUp():

@pytest.fixture
def setUp(crazyswarm_ctor):
crazyflies_yaml = """
crazyflies:
- channel: 100
Expand All @@ -16,9 +21,12 @@ def setUp():
id: 10
initialPosition: [0.0, -1.0, 0.0]
"""
swarm = Crazyswarm(crazyflies_yaml=crazyflies_yaml, args="--sim --vis null")
timeHelper = swarm.timeHelper
return swarm.allcfs, timeHelper
def setup(args=""):
swarm = crazyswarm_ctor(crazyflies_yaml=crazyflies_yaml)
timeHelper = swarm.timeHelper
return swarm.allcfs, timeHelper
return setup


def _collectRelativePositions(timeHelper, cf, duration):
t0 = timeHelper.time()
Expand All @@ -29,7 +37,7 @@ def _collectRelativePositions(timeHelper, cf, duration):
return np.stack(positions)


def test_takeOff():
def test_takeOff(setUp):
allcfs, timeHelper = setUp()
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -38,7 +46,7 @@ def test_takeOff():
pos = cf.initialPosition + np.array([0, 0, Z])
assert np.all(np.isclose(cf.position(), pos, atol=0.0001))

def test_goTo_nonRelative():
def test_goTo_nonRelative(setUp):
allcfs, timeHelper = setUp()
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -52,7 +60,7 @@ def test_goTo_nonRelative():
pos = cf.initialPosition + np.array([1, 1, Z])
assert np.all(np.isclose(cf.position(), pos))

def test_goTo_relative():
def test_goTo_relative(setUp):
allcfs, timeHelper = setUp()
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -64,7 +72,7 @@ def test_goTo_relative():
pos = cf.initialPosition + np.array([1.0,1.0,2*Z])
assert np.all(np.isclose(cf.position(), pos))

def test_landing():
def test_landing(setUp):
allcfs, timeHelper = setUp()
allcfs.takeoff(targetHeight=Z, duration=1.0+Z)
timeHelper.sleep(1.5+Z)
Expand All @@ -76,7 +84,7 @@ def test_landing():
pos = cf.initialPosition + np.array([0, 0, 0.02])
assert np.all(np.isclose(cf.position(), pos, atol=0.0001))

def test_uploadTrajectory_timescale():
def test_uploadTrajectory_timescale(setUp):
allcfs, timeHelper = setUp()
cf = allcfs.crazyflies[0]

Expand All @@ -98,7 +106,7 @@ def test_uploadTrajectory_timescale():
timeHelper.sleep(0.75 * traj.duration)
assert np.linalg.norm(cf.position() - cf.initialPosition) <= 0.001

def test_uploadTrajectory_fig8Bounds():
def test_uploadTrajectory_fig8Bounds(setUp):
allcfs, timeHelper = setUp()
cf = allcfs.crazyflies[0]

Expand All @@ -118,7 +126,7 @@ def test_uploadTrajectory_fig8Bounds():
assert 0.4 < np.amax(ys) < 0.6
assert -0.4 > np.amin(ys) > -0.6

def test_uploadTrajectory_reverse():
def test_uploadTrajectory_reverse(setUp):
allcfs, timeHelper = setUp()
cf = allcfs.crazyflies[0]

Expand All @@ -139,7 +147,7 @@ def test_uploadTrajectory_reverse():
dists = np.linalg.norm(positions - positions2, axis=1)
assert not np.any(dists > 0.2)

def test_uploadTrajectory_broadcast():
def test_uploadTrajectory_broadcast(setUp):
allcfs, timeHelper = setUp()
cf0, cf1 = allcfs.crazyflies

Expand All @@ -158,7 +166,7 @@ def test_uploadTrajectory_broadcast():
assert np.all(np.isclose(relativeInitial, relative))
timeHelper.sleep(timeHelper.dt + 1e-6)

def test_setGroupMask():
def test_setGroupMask(setUp):
allcfs, timeHelper = setUp()
cf0, cf1 = allcfs.crazyflies
cf0.setGroupMask(1)
Expand All @@ -174,5 +182,6 @@ def test_setGroupMask():
timeHelper.sleep(1.5+Z)

pos1 = cf1.initialPosition + np.array([0, 0, Z])
assert np.all(np.isclose(cf0.position(), pos0, atol=0.0001))
assert np.all(np.isclose(cf1.position(), pos1, atol=0.0001))
assert np.all(np.isclose(cf0.position(), pos0, atol=0.0001))
assert np.all(np.isclose(cf1.position(), pos1, atol=0.0001))

Loading

0 comments on commit 21300ff

Please sign in to comment.