Tutorials¶
Before We Start¶
So, you would like your robot to actually do something non-trivial?
Trivial? Ah, a sequence of timed actions - move forward 3s, rotate 90 degrees, move forward 3s, emit a greeting. This is open-loop and can be pre-programmed in a single script easily. Trivial. Shift gears!
Non-Trivial? Hmm, you’d like to dynamically plan navigational routes (waypoints), choose between actions depending on whether blocking obstacles are sensed, interrupt the current action if the battery is low … and this is just getting started. In short, decision making with priority interrupts and closed loops with peripheral systems (e.g. via sensing, HMI devices, web services). Now you’re talking!
Most roboticists will start scripting, but quickly run into a complexity barrier. They’ll often then reach for state machines which are great for control systems, but run into yet another complexity barrier attempting to handle priority interrupts and an exponentially increasing profusion of wires between states. Which brings you here, to behavour trees! Before we proceed though…
Where is the Robot? Ostensibly you’ll need one, at some point. More often than not though, it’s not available or it’s just not practical for rapid application development. Might be it’s only partially assembled, or new features are being developed in parallel (deadlines!). On the other hand, it may be available, but you cannot get enough time-share on the robot or it is not yet stable, resulting in a stream of unrelated issues lower down in the robotic stack that impede application development. So you make the sensible decision of moving to simulation.
Simulation or Mocked Robots? If you already have a robot simulation, it’s a great place to start. In the long run though, the investment of time to build a mock robot layer should, in most cases, pay itself off with a faster development cycle. Why? Testing an application is mostly about provoking and testing the many permutations and combinations o decision making. It’s not about the 20 minutes of travel from point A to point B in the building. With a mocked robot layer, you can emulate that travel at ludicrous speed and provide easy handles for mocking the problems that can arise.
So this is where the tutorials begin, with a very simple, mocked robot. They will then proceed to build up a behaviour tree application, one step at a time.
The Mock Robot¶
The tutorials here all run atop a very simple mock robot that encapsulates the following list of mocked components:
- Battery
- LED Strip
- Docking Action Server
- Move Base Action Server
- Rotation Action Server
- Safety Sensors Pipeline
Note
It should always be possible for the mock robot to be replaced by a gazebo simulated robot or the actual robot. Each of these underlying systems must implement exactly the same ROS API interface.
The tutorials take care of launching the mock robot, but it can be also launched on its own with:
$ ros2 launch py_trees_ros_tutorials mock_robot_launch.py
Tutorial 1 - Data Gathering¶
About¶
In this, the first of the tutorials, we start out with a behaviour that collects battery data from a subscriber and stores the result on the blackboard for other behaviours to utilise.
Data gathering up front via subscribers is a useful convention for a number of reasons:
- Freeze incoming data for remaining behaviours in the tree tick so that decision making is consistent across the entire tree
- Avoid redundantly invoking multiple subscribers to the same topic when not necessary
- Python access to the blackboard is easier than ROS middleware handling
Typically data gatherers will be assembled underneath a parallel at or near the very root of the tree so they may always trigger their update() method and be processed before any decision making behaviours elsewhere in the tree.
Tree¶
$ py-trees-render -b py_trees_ros_tutorials.one_data_gathering.tutorial_create_root
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | def tutorial_create_root() -> py_trees.behaviour.Behaviour:
"""
Create a basic tree and start a 'Topics2BB' work sequence that
will become responsible for data gathering behaviours.
Returns:
the root of the tree
"""
root = py_trees.composites.Parallel(
name="Tutorial One",
policy=py_trees.common.ParallelPolicy.SuccessOnAll(
synchronise=False
)
)
topics2bb = py_trees.composites.Sequence("Topics2BB")
battery2bb = py_trees_ros.battery.ToBlackboard(
name="Battery2BB",
topic_name="/battery/state",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
threshold=30.0
)
priorities = py_trees.composites.Selector("Tasks")
idle = py_trees.behaviours.Running(name="Idle")
flipper = py_trees.behaviours.Periodic(name="Flip Eggs", n=2)
root.add_child(topics2bb)
topics2bb.add_child(battery2bb)
root.add_child(priorities)
priorities.add_child(flipper)
priorities.add_child(idle)
return root
|
Along with the data gathering side, you’ll also notice the dummy branch for
priority jobs (complete with idle behaviour that is always
RUNNING
). This configuration is typical
of the data gathering pattern.
Behaviours¶
The tree makes use of the py_trees_ros.battery.ToBlackboard
behaviour.
This behaviour will cause the entire tree will tick over with
SUCCESS
so long as there is data incoming.
If there is no data incoming, it will simply
block and prevent the rest of the tree from acting.
Running¶
# Launch the tutorial
$ ros2 launch py_trees_ros_tutorials tutorial_one_data_gathering_launch.py
# In a different shell, introspect the entire blackboard
$ py-trees-blackboard-watcher
# Or selectively get the battery percentage
$ py-trees-blackboard-watcher --list
$ py-trees-blackboard-watcher /battery.percentage
Tutorial 2 - Battery Check¶
About¶
Here we add the first decision. What to do if the battery is low? For this, we’ll get the mocked robot to flash a notification over it’s led strip.
Tree¶
$ py-trees-render -b py_trees_ros_tutorials.two_battery_check.tutorial_create_root
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | """
Create a basic tree with a battery to blackboard writer and a
battery check that flashes the LEDs on the mock robot if the
battery level goes low.
Returns:
the root of the tree
"""
root = py_trees.composites.Parallel(
name="Tutorial Two",
policy=py_trees.common.ParallelPolicy.SuccessOnAll(
synchronise=False
)
)
topics2bb = py_trees.composites.Sequence("Topics2BB")
battery2bb = py_trees_ros.battery.ToBlackboard(
name="Battery2BB",
topic_name="/battery/state",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
threshold=30.0
)
tasks = py_trees.composites.Selector("Tasks")
flash_led_strip = behaviours.FlashLedStrip(
name="FlashLEDs",
colour="red"
)
def check_battery_low_on_blackboard(blackboard: py_trees.blackboard.Blackboard) -> bool:
return blackboard.battery_low_warning
battery_emergency = py_trees.decorators.EternalGuard(
name="Battery Low?",
condition=check_battery_low_on_blackboard,
blackboard_keys={"battery_low_warning"},
child=flash_led_strip
)
idle = py_trees.behaviours.Running(name="Idle")
root.add_child(topics2bb)
topics2bb.add_child(battery2bb)
root.add_child(tasks)
tasks.add_children([battery_emergency, idle])
return root
|
Here we’ve added a high priority branch for dealing with a low battery
that causes the hardware strip to flash. The py_trees.decorators.EternalGuard
enables a continuous check of the battery reading and subsequent termination of
the flashing strip as soon as the battery level has recovered sufficiently.
We could have equivalently made use of the py_trees.idioms.eternal_guard
idiom,
which yields a more verbose, but explicit tree and would also allow direct use of
the py_trees.blackboard.CheckBlackboardVariable
class as the conditional check.
Behaviours¶
This tree makes use of the py_trees_ros_tutorials.behaviours.FlashLedStrip
behaviour.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | class FlashLedStrip(py_trees.behaviour.Behaviour):
"""
This behaviour simply shoots a command off to the LEDStrip to flash
a certain colour and returns :attr:`~py_trees.common.Status.RUNNING`.
Note that this behaviour will never return with
:attr:`~py_trees.common.Status.SUCCESS` but will send a clearing
command to the LEDStrip if it is cancelled or interrupted by a higher
priority behaviour.
Publishers:
* **/led_strip/command** (:class:`std_msgs.msg.String`)
* colourised string command for the led strip ['red', 'green', 'blue']
Args:
name: name of the behaviour
topic_name : name of the battery state topic
colour: colour to flash ['red', 'green', blue']
"""
def __init__(
self,
name: str,
topic_name: str="/led_strip/command",
colour: str="red"
):
super(FlashLedStrip, self).__init__(name=name)
self.topic_name = topic_name
self.colour = colour
def setup(self, **kwargs):
"""
Setup the publisher which will stream commands to the mock robot.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("{}.setup()".format(self.qualified_name))
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.publisher = self.node.create_publisher(
msg_type=std_msgs.String,
topic=self.topic_name,
qos_profile=py_trees_ros.utilities.qos_profile_latched()
)
self.feedback_message = "publisher created"
def update(self) -> py_trees.common.Status:
"""
Annoy the led strip to keep firing every time it ticks over (the led strip will clear itself
if no command is forthcoming within a certain period of time).
This behaviour will only finish if it is terminated or priority interrupted from above.
Returns:
Always returns :attr:`~py_trees.common.Status.RUNNING`
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
self.publisher.publish(std_msgs.String(data=self.colour))
self.feedback_message = "flashing {0}".format(self.colour)
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
Shoot off a clearing command to the led strip.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug(
"{}.terminate({})".format(
self.qualified_name,
"{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status)
)
)
self.publisher.publish(std_msgs.String(data=""))
self.feedback_message = "cleared"
|
This is a typical ROS behaviour that accepts a ROS node on setup. This delayed style is preferred since it allows simple construction of the behaviour, in a tree, sans all of the ROS plumbing - useful when rendering dot graphs of the tree without having a ROS runtime around.
The rest of the behaviour too, is fairly conventional:
- ROS plumbing (i.e. the publisher) instantiated in setup()
- Flashing notifications published in update()
- The reset notification published when the behaviour is terminated
Running¶
$ ros2 launch py_trees_ros_tutorials tutorial_two_battery_check_launch.py
Then play with the battery slider in the qt dashboard to trigger the decision branching in the tree.
Tutorial 3 - Introspect the Blackboard¶
About¶
Tutorial three is a repeat of Tutorial 2 - Battery Check. The purpose of this
tutorial however is to introduce the tools provided to
allow introspection of the blackboard from ROS. Publishers and services
are provided by py_trees_ros.blackboard.Exchange
which is embedded in a py_trees_ros.trees.BehaviourTree
. Interaction
with the exchange is over a set of services and dynamically created topics
via the the py-trees-blackboard-watcher command line utility.
Running¶
$ ros2 launch py_trees_ros_tutorials tutorial_three_introspect_the_blackboard_launch.py
In another shell:
# watch the entire board
$ py-trees-blackboard-watcher
# watch with the recent activity log (activity stream)
$ py-trees-blackboard-watcher --activity
# watch variables associated with behaviours on the most recent tick's visited path
$ py-trees-blackboard-watcher --visited
# list variables available to watch
$ py-trees-blackboard-watcher --list
# watch a simple variable (slide the battery level on the dashboard to trigger a change)
$ py-trees-blackboard-watcher /battery_low_warning
# watch a variable with nested attributes
$ py-trees-blackboard-watcher /battery.percentage
Tutorial 4 - Introspecting the Tree¶
About¶
Again, this is a repeat of Tutorial 2 - Battery Check. In addition to services and
topics for the blackboard, the
py_trees_ros.trees.BehaviourTree
class provides services and topics
for introspection of the tree state itself as well as a command line utility,
py-trees-tree-watcher, to interact with these services and topics.
Running¶
Launch the tutorial:
$ ros2 launch py_trees_ros_tutorials tutorial_four_introspect_the_tree_launch.py
Using py-trees-tree-watcher
on a private snapshot stream:
# stream the tree state on changes
$ py-trees-tree-watcher
# stream the tree state on changes with statistics
$ py-trees-tree-watcher -s
# stream the tree state on changes with most recent blackboard activity
$ py-trees-tree-watcher -a
# stream the tree state on changes with visited blackboard variables
$ py-trees-tree-watcher -b
# serialise to a dot graph (.dot/.png/.svg) and view in xdot if available
$ py-trees-tree-watcher --dot-graph
# not necessary here, but if there are multiple trees to choose from
$ py-trees-tree-watcher --namespace=/tree/snapshot_streams
Using py-trees-tree-watcher
on the default snapshot stream (~/snapshots
):
# enable the default snapshot stream
$ ros2 param set /tree default_snapshot_stream True
$ ros2 param set /tree default_snapshot_blackboard_data True
$ ros2 param set /tree default_snapshot_blackboard_activity True
# connect to the stream
$ py-trees-tree-watcher -a -s -b /tree/snapshots
Using py_trees_ros_viewer to configure and visualise the stream:
# install
$ sudo apt install ros-<rosdistro>-py-trees-ros-viewer
# start the viewer
$ py-trees-tree-viewer
Tutorial 5 - Action Clients¶
About¶
This tutorial inserts a task between emergency and fallback (idle) behaviours to perform some actual work - rotate 360 degrees in place to scan a room whilst simultaneously notifying the user (via flashing led strip) of it’s actions. The task is triggered from the qt dashboard.
The rotation is performed with a ROS action which are almost the defacto
means of interfacing with the control systems of ROS robots.
Here we introduce the py_trees_ros.actions.ActionClient
behaviour - a simple means of sequentially interacting with an action server such
that a goal always executes to completion or is cancelled before another
goal is sent (a client-side kind of preemption).
It also demonstrates the value of coordinating subsystems from the behaviour tree. In this case, both action controllers and notification subsystems are managed from the tree to perform a task. This frees control subsystems from having to be dependent on each other and simultaneously aware of higher level application logic. Instead, the application logic is centralised in one place, the tree, where it can be easily monitored, logged, and reconstructed in a slightly different form for another application without requiring changes in the underlying control subsystems.
Tree¶
$ py-trees-render -b py_trees_ros_tutorials.five_action_clients.tutorial_create_root
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | def tutorial_create_root() -> py_trees.behaviour.Behaviour:
"""
Insert a task between battery emergency and idle behaviours that
controls a rotation action controller and notifications simultaenously
to scan a room.
Returns:
the root of the tree
"""
root = py_trees.composites.Parallel(
name="Tutorial Five",
policy=py_trees.common.ParallelPolicy.SuccessOnAll(
synchronise=False
)
)
topics2bb = py_trees.composites.Sequence("Topics2BB")
scan2bb = py_trees_ros.subscribers.EventToBlackboard(
name="Scan2BB",
topic_name="/dashboard/scan",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
variable_name="event_scan_button"
)
battery2bb = py_trees_ros.battery.ToBlackboard(
name="Battery2BB",
topic_name="/battery/state",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
threshold=30.0
)
tasks = py_trees.composites.Selector("Tasks")
flash_red = behaviours.FlashLedStrip(
name="Flash Red",
colour="red"
)
# Emergency Tasks
def check_battery_low_on_blackboard(blackboard: py_trees.blackboard.Blackboard) -> bool:
return blackboard.battery_low_warning
battery_emergency = py_trees.decorators.EternalGuard(
name="Battery Low?",
condition=check_battery_low_on_blackboard,
blackboard_keys={"battery_low_warning"},
child=flash_red
)
# Worker Tasks
scan = py_trees.composites.Sequence(name="Scan")
is_scan_requested = py_trees.behaviours.CheckBlackboardVariableValue(
name="Scan?",
check=py_trees.common.ComparisonExpression(
variable="event_scan_button",
value=True,
operator=operator.eq
)
)
scan_preempt = py_trees.composites.Selector(name="Preempt?")
is_scan_requested_two = py_trees.decorators.SuccessIsRunning(
name="SuccessIsRunning",
child=py_trees.behaviours.CheckBlackboardVariableValue(
name="Scan?",
check=py_trees.common.ComparisonExpression(
variable="event_scan_button",
value=True,
operator=operator.eq
)
)
)
scanning = py_trees.composites.Parallel(
name="Scanning",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
scan_rotate = py_trees_ros.actions.ActionClient(
name="Rotate",
action_type=py_trees_actions.Rotate,
action_name="rotate",
action_goal=py_trees_actions.Rotate.Goal(), # noqa
generate_feedback_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed)
)
flash_blue = behaviours.FlashLedStrip(
name="Flash Blue",
colour="blue"
)
scan_celebrate = py_trees.composites.Parallel(
name="Celebrate",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
flash_green = behaviours.FlashLedStrip(name="Flash Green", colour="green")
scan_pause = py_trees.timers.Timer("Pause", duration=3.0)
# Fallback task
idle = py_trees.behaviours.Running(name="Idle")
root.add_child(topics2bb)
topics2bb.add_children([scan2bb, battery2bb])
root.add_child(tasks)
tasks.add_children([battery_emergency, scan, idle])
scan.add_children([is_scan_requested, scan_preempt, scan_celebrate])
scan_preempt.add_children([is_scan_requested_two, scanning])
scanning.add_children([scan_rotate, flash_blue])
scan_celebrate.add_children([flash_green, scan_pause])
return root
|
Data Gathering¶
The Scan2BB behaviour collects incoming requests from the qt dashboard and drops them
onto the blackboard. This is your usual py_trees_ros.subscribers.EventToBlackboard
behaviour which will only register the result True on the blackboard if
there was an incoming message between the last and the current tick.
The Scanning Branch¶
The entire scanning branch is protected by a guard (the blackbox represents the lower part of the tree) which checks the blackboard to determine whether Scan2BB had recorded an incoming rqeuest. Once the scan event is received, this branch proceeds to work until it either finishes, or is pre-empted by the higher priority low battery branch.
Action Clients¶
This tree makes use of the py_trees_ros.actions.ActionClient
for the ‘Rotate’ behaviour.
- Goal details are configured at construction and cannot be changed thereafter
- New goals are sent on initialise()
- Monitoring of feedback and result response occurs in update()
- If the behaviour is interrupted, the goal will be cancelled in terminate()
This obviously places constraints on it’s usage. In particular, goal details cannot be assembled dynamically/elsewhere, nor can it send a new goal while a preceding goal is still active - the behaviour lifecycle forces it through terminate() before a new goal can be sent.
These constraints however, are fine in most situations and result in a very simple behaviour that almost always does what you need without perspiring inordinately on tree design ramifications.
Instantiating the action client, configured for rotations:
1 2 3 4 5 6 7 | name="Scanning",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
scan_rotate = py_trees_ros.actions.ActionClient(
name="Rotate",
action_type=py_trees_actions.Rotate,
action_name="rotate",
|
The notification behaviour (FlashLedStrip) runs in parallel with the action. Composing in this manner from the behaviour tree centralises design concepts (in this case, notifications) and decouples the need for the control subsystems to be aware each other and the application logic.
A Kind of Preemption¶
The higher priority branch in the scanning action enables a kind of pre-emption on the scanning action from the client side. If a new request comes in, it will trigger the secondary scan event check, invalidating whatever scanning action was currently running. This will clear the led command and cancel the rotate action. On the next tick, the scan event check will fail (it was consumed on the last tick) and the scanning will restart.
The decorator is used to signal farther up in the tree that the action is still running, even when being preempted.
Note
This is not true pre-emption since it cancels the rotate action and restarts it. It is however, exactly the pattern that is required in many instances. If you are looking for more complex logic, e.g. enabling interactions with a manipulation action server with which you would like to leave pre-emptions up to the server, then this will require either decomposing the separate parts of the action client behaviour (i.e. separate send goal, monitoring and cancelling) into separate behaviours or construct a more complex behaviour that manages the entire process itself. PR’s welcome!
Handling Failure¶
If the rotate action should fail, then the whole branch will also fail,
subsequently dropping the robot back to its idle state. A failure
event could be generated by monitoring either the status of the ‘Scanning’
parallel or the py_trees.trees.BehaviourTree.tip()
of the
tree and reacting to it’s state change.
Running¶
$ ros2 launch py_trees_ros_tutorials tutorial_five_action_clients_launch.py
Send scan requests from the qt dashboard.
Tutorial 6 - Context Switching¶
About¶
This tutorial inserts a context switching behaviour to run in tandem with the
scan rotation. A context switching behaviour will alter the runtime system
in some way when it is entered (i.e. in initialise()
)
and reset the runtime system to it’s original context
on terminate()
). Refer to context switch
for more detail.
In this example it will enable a hypothetical safety sensor pipeline, necessary necessary for dangerous but slow moving rotational maneuvres not required for normal modes of travel (suppose we have a large rectangular robot that is ordinarily blind to the sides - it may need to take advantage of noisy sonars to the sides or rotate forward facing sensing into position before engaging).
Tree¶
$ py-trees-render -b py_trees_ros_tutorials.six_context_switching.tutorial_create_root
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | def tutorial_create_root() -> py_trees.behaviour.Behaviour:
"""
Insert a task between battery emergency and idle behaviours that
controls a rotation action controller and notifications simultaenously
to scan a room.
Returns:
the root of the tree
"""
root = py_trees.composites.Parallel(
name="Tutorial Six",
policy=py_trees.common.ParallelPolicy.SuccessOnAll(
synchronise=False
)
)
topics2bb = py_trees.composites.Sequence("Topics2BB")
scan2bb = py_trees_ros.subscribers.EventToBlackboard(
name="Scan2BB",
topic_name="/dashboard/scan",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
variable_name="event_scan_button"
)
battery2bb = py_trees_ros.battery.ToBlackboard(
name="Battery2BB",
topic_name="/battery/state",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
threshold=30.0
)
tasks = py_trees.composites.Selector("Tasks")
flash_red = behaviours.FlashLedStrip(
name="Flash Red",
colour="red"
)
# Emergency Tasks
def check_battery_low_on_blackboard(blackboard: py_trees.blackboard.Blackboard) -> bool:
return blackboard.battery_low_warning
battery_emergency = py_trees.decorators.EternalGuard(
name="Battery Low?",
condition=check_battery_low_on_blackboard,
blackboard_keys={"battery_low_warning"},
child=flash_red
)
# Worker Tasks
scan = py_trees.composites.Sequence(name="Scan")
is_scan_requested = py_trees.behaviours.CheckBlackboardVariableValue(
name="Scan?",
check=py_trees.common.ComparisonExpression(
variable="event_scan_button",
value=True,
operator=operator.eq
)
)
scan_preempt = py_trees.composites.Selector(name="Preempt?")
is_scan_requested_two = py_trees.decorators.SuccessIsRunning(
name="SuccessIsRunning",
child=py_trees.behaviours.CheckBlackboardVariableValue(
name="Scan?",
check=py_trees.common.ComparisonExpression(
variable="event_scan_button",
value=True,
operator=operator.eq
)
)
)
scanning = py_trees.composites.Parallel(
name="Scanning",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
scan_context_switch = behaviours.ScanContext("Context Switch")
scan_rotate = py_trees_ros.actions.ActionClient(
name="Rotate",
action_type=py_trees_actions.Rotate,
action_name="rotate",
action_goal=py_trees_actions.Rotate.Goal(),
generate_feedback_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed)
)
flash_blue = behaviours.FlashLedStrip(
name="Flash Blue",
colour="blue"
)
scan_celebrate = py_trees.composites.Parallel(
name="Celebrate",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
flash_green = behaviours.FlashLedStrip(name="Flash Green", colour="green")
scan_pause = py_trees.timers.Timer("Pause", duration=3.0)
# Fallback task
idle = py_trees.behaviours.Running(name="Idle")
root.add_child(topics2bb)
topics2bb.add_children([scan2bb, battery2bb])
root.add_child(tasks)
tasks.add_children([battery_emergency, scan, idle])
scan.add_children([is_scan_requested, scan_preempt, scan_celebrate])
scan_preempt.add_children([is_scan_requested_two, scanning])
scanning.add_children([scan_context_switch, scan_rotate, flash_blue])
scan_celebrate.add_children([flash_green, scan_pause])
return root
|
Behaviour¶
The py_trees_ros_tutorials.behaviours.ScanContext
is the
context switching behaviour constructed for this tutorial.
initialise()
: trigger a sequence service calls to cache and set the /safety_sensors/enabled parameter to Trueupdate()
: complete the chain of service calls & maintain the contextterminate()
: reset the parameter to the cached value
Context Switching¶
On entry into the parallel, the ScanContext
behaviour will cache and switch
the safety sensors parameter. While in the parallel it will return with
RUNNING
indefinitely. When the rotation
action succeeds or fails, it will terminate the parallel and subsequently
the ScanContext
will terminate,
resetting the safety sensors parameter to it’s original value.
Running¶
# Launch the tutorial
$ ros2 launch py_trees_ros_tutorials tutorial_six_context_switching_launch.py
# In another shell, watch the parameter as a context switch occurs
$ watch -n 1 ros2 param get /safety_sensors enabled
# Trigger scan requests from the qt dashboard
Tutorial 7 - Docking, Cancelling, Failing¶
About¶
This tutorial adds additional complexity to the scanning application in order to introduce a few patterns typical of most applications - cancellations, recovery and result handling.
Specifically, there is now an undocking-move combination pre-scanning and a move-docking combination post-scanning. When cancelling, the robot should recover it’s initial state so it is ready to accept future requests. In this case, the robot must move home and dock, even when cancelled.
Additionally, the application should report out on it’s result upon completion.
Note
Preemption has been dropped from the application for simplicity. It could be reinserted, but care would be required to handle undocking and docking appropriately.
Tree¶
$ py-trees-render -b py_trees_ros_tutorials.seven_docking_cancelling_failing.tutorial_create_root
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | def tutorial_create_root() -> py_trees.behaviour.Behaviour:
"""
Insert a task between battery emergency and idle behaviours that
controls a rotation action controller and notifications simultaenously
to scan a room.
Returns:
the root of the tree
"""
root = py_trees.composites.Parallel(
name="Tutorial Seven",
policy=py_trees.common.ParallelPolicy.SuccessOnAll(
synchronise=False
)
)
topics2bb = py_trees.composites.Sequence("Topics2BB")
scan2bb = py_trees_ros.subscribers.EventToBlackboard(
name="Scan2BB",
topic_name="/dashboard/scan",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
variable_name="event_scan_button"
)
cancel2bb = py_trees_ros.subscribers.EventToBlackboard(
name="Cancel2BB",
topic_name="/dashboard/cancel",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
variable_name="event_cancel_button"
)
battery2bb = py_trees_ros.battery.ToBlackboard(
name="Battery2BB",
topic_name="/battery/state",
qos_profile=py_trees_ros.utilities.qos_profile_unlatched(),
threshold=30.0
)
tasks = py_trees.composites.Selector("Tasks")
flash_red = behaviours.FlashLedStrip(
name="Flash Red",
colour="red"
)
# Emergency Tasks
def check_battery_low_on_blackboard(blackboard: py_trees.blackboard.Blackboard) -> bool:
return blackboard.battery_low_warning
battery_emergency = py_trees.decorators.EternalGuard(
name="Battery Low?",
condition=check_battery_low_on_blackboard,
blackboard_keys={"battery_low_warning"},
child=flash_red
)
# Worker Tasks
scan = py_trees.composites.Sequence(name="Scan")
is_scan_requested = py_trees.behaviours.CheckBlackboardVariableValue(
name="Scan?",
check=py_trees.common.ComparisonExpression(
variable="event_scan_button",
value=True,
operator=operator.eq
)
)
scan_or_die = py_trees.composites.Selector(name="Scan or Die")
die = py_trees.composites.Sequence(name="Die")
failed_notification = py_trees.composites.Parallel(
name="Notification",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
failed_flash_green = behaviours.FlashLedStrip(name="Flash Red", colour="red")
failed_pause = py_trees.timers.Timer("Pause", duration=3.0)
result_failed_to_bb = py_trees.behaviours.SetBlackboardVariable(
name="Result2BB\n'failed'",
variable_name='scan_result',
variable_value='failed'
)
ere_we_go = py_trees.composites.Sequence(name="Ere we Go")
undock = py_trees_ros.actions.ActionClient(
name="UnDock",
action_type=py_trees_actions.Dock,
action_name="dock",
action_goal=py_trees_actions.Dock.Goal(dock=False),
generate_feedback_message=lambda msg: "undocking"
)
scan_or_be_cancelled = py_trees.composites.Selector("Scan or Be Cancelled")
cancelling = py_trees.composites.Sequence("Cancelling?")
is_cancel_requested = py_trees.behaviours.CheckBlackboardVariableValue(
name="Cancel?",
check=py_trees.common.ComparisonExpression(
variable="event_cancel_button",
value=True,
operator=operator.eq
)
)
move_home_after_cancel = py_trees_ros.actions.ActionClient(
name="Move Home",
action_type=py_trees_actions.MoveBase,
action_name="move_base",
action_goal=py_trees_actions.MoveBase.Goal(),
generate_feedback_message=lambda msg: "moving home"
)
result_cancelled_to_bb = py_trees.behaviours.SetBlackboardVariable(
name="Result2BB\n'cancelled'",
variable_name='scan_result',
variable_value='cancelled'
)
move_out_and_scan = py_trees.composites.Sequence("Move Out and Scan")
move_base = py_trees_ros.actions.ActionClient(
name="Move Out",
action_type=py_trees_actions.MoveBase,
action_name="move_base",
action_goal=py_trees_actions.MoveBase.Goal(),
generate_feedback_message=lambda msg: "moving out"
)
scanning = py_trees.composites.Parallel(
name="Scanning",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
scan_context_switch = behaviours.ScanContext("Context Switch")
scan_rotate = py_trees_ros.actions.ActionClient(
name="Rotate",
action_type=py_trees_actions.Rotate,
action_name="rotate",
action_goal=py_trees_actions.Rotate.Goal(),
generate_feedback_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed)
)
scan_flash_blue = behaviours.FlashLedStrip(name="Flash Blue", colour="blue")
move_home_after_scan = py_trees_ros.actions.ActionClient(
name="Move Home",
action_type=py_trees_actions.MoveBase,
action_name="move_base",
action_goal=py_trees_actions.MoveBase.Goal(),
generate_feedback_message=lambda msg: "moving home"
)
result_succeeded_to_bb = py_trees.behaviours.SetBlackboardVariable(
name="Result2BB\n'succeeded'",
variable_name='scan_result',
variable_value='succeeded'
)
celebrate = py_trees.composites.Parallel(
name="Celebrate",
policy=py_trees.common.ParallelPolicy.SuccessOnOne()
)
celebrate_flash_green = behaviours.FlashLedStrip(name="Flash Green", colour="green")
celebrate_pause = py_trees.timers.Timer("Pause", duration=3.0)
dock = py_trees_ros.actions.ActionClient(
name="Dock",
action_type=py_trees_actions.Dock,
action_name="dock",
action_goal=py_trees_actions.Dock.Goal(dock=True), # noqa
generate_feedback_message=lambda msg: "docking"
)
class SendResult(py_trees.behaviour.Behaviour):
def __init__(self, name: str):
super().__init__(name="Send Result")
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(
key="scan_result",
access=py_trees.common.Access.READ
)
def update(self):
print(console.green +
"********** Result: {} **********".format(self.blackboard.scan_result) +
console.reset
)
return py_trees.common.Status.SUCCESS
send_result = SendResult(name="Send Result")
# Fallback task
idle = py_trees.behaviours.Running(name="Idle")
root.add_child(topics2bb)
topics2bb.add_children([scan2bb, cancel2bb, battery2bb])
root.add_child(tasks)
tasks.add_children([battery_emergency, scan, idle])
scan.add_children([is_scan_requested, scan_or_die, send_result])
scan_or_die.add_children([ere_we_go, die])
die.add_children([failed_notification, result_failed_to_bb])
failed_notification.add_children([failed_flash_green, failed_pause])
ere_we_go.add_children([undock, scan_or_be_cancelled, dock, celebrate])
scan_or_be_cancelled.add_children([cancelling, move_out_and_scan])
cancelling.add_children([is_cancel_requested, move_home_after_cancel, result_cancelled_to_bb])
move_out_and_scan.add_children([move_base, scanning, move_home_after_scan, result_succeeded_to_bb])
scanning.add_children([scan_context_switch, scan_rotate, scan_flash_blue])
celebrate.add_children([celebrate_flash_green, celebrate_pause])
return root
|
Succeeding¶
Assuming everything works perfectly, then the subtree will sequentially progress to completion through undocking, move out, rotate, move home and docking actions as illustrated in the dot graph above. However, nothing ever works perfectly, so …
Failing¶
If any step of the ‘Ere we Go’ sequence fails the mock robot robot will simply stop, drop into the post-failure (‘Die’) subtree and commence post-failure actions. In this case this consists of both an alarm signal (flashing red) and communication of failure to the user (echoes to the screen, but could have been, for example, a middleware response to the user’s application).
These actions are merely post-failure notifications that would ostensibly result in manual (human assisted) recovery of the situation. To attempt an automated recovery, there are two options:
- Global Recovery - use the blackboard as a means of transferring information about the failure from the relevant behaviour (UnDock, Move Out, Move Home, Dock) to the post-failure subtree. Introspect the data and determine the right course of action in the post-failure subtree.
- Local Recovery - use a selector with each of the individual behaviours to immediately generate a recovery subtree specifically adapted to the behaviour that failed. This recovery subtree should also return
FAILURE
so the parent sequence also returnsFAILURE
. The ‘Die’ subtree is then merely for common post-failure actions (e.g. notification and response).
The latter is technically preferable as the decision logic is entirely visible in the tree connections, but it does cause an explosion in the scale of the tree and it’s maintenance.
Note
It is interesting to observe that although the application is considered to have
failed, the ‘Scan or Die’ operation will return with SUCCESS
after which post-failure actions will kick in.
Here, application failure is recorded in the ‘Result2BB’ behaviour which is later
transmitted back to the user in the final stages of the application.
Application failure is handled via the actions of behaviours, not the state of the tree.
Tip
Decision logic in the tree is for routing decision making, not routing application failure/success, nor logical errors. Overloading tree decision logic with more than one purpose will constrain your application design to the point of non-usefulness.
Cancelling¶
In this tutorial, the application listens continuously for cancellation requests and will cancel the operation if it is currently between undocking and docking actions.
Note
The approach demonstrated in this tutorial is simple, but sufficient as an example. Interactions are only one-way - from the user to the application. It neither prevents the user from requesting nor does it provide an informative response if the request is invalid (i.e. if the application is not running or already cancelling). It also falls short of caching and handling cancel requests across the entire application. These cases are easy to handle with additional logic in the tree - consider it a homework exercise :)
Cancelling begins with catching incoming cancel requests:
Cancelling is a high priority subtree, but here we make sure that the post-cancelling workflow integrates with the non-cancelling workflow so that the robot returns to it’s initial location and state.
Results¶
As noted earlier, it is typically important to keep application result logic separate from the decision tree logic. To do so, the blackboard is used to record the application result and an application result agnostic behaviour is used to communicate the result back to the user in the final stage of the application’s lifecycle.
Running¶
# Launch the tutorial
$ ros2 launch py_trees_ros_tutorials tutorial_seven_docking_cancelling_failing_launch.py
# In another shell
$ py-trees-tree-watcher -b
# Trigger scan/cancel requests from the qt dashboard
Tutorial 8 - Dynamic Application Loading¶
About¶
The previous tutorial enables execution of a specific job upon request. You will inevitably grow the functionality of the robot beyond this and a very common use case for the trees is to switch the context of the robot between ‘applications’ - calibration, tests, demos, scheduled tasks from a fleet server, etc.
While these contexts could be entirely managed by the tree simultaneously, the exclusivity of the applications lends itself far more easily to the following paradigm:
- Construct a tree on bringup for ticking over basic functionality while idling
- Dynamically insert/prune application subtrees on demand, rejecting requests when already busy
This mirrors both the way smart phones operate (which also happens to be a reasonable mode of operation for robots due to similar resource contention arguments) and the conventional use of roslaunch files to bringup a core and later bootstrap / tear down application level processes on demand.
This tutorial uses a wrapper class around py_trees_ros.trees.BehaviourTree
to handle:
- Construction of the core tree
- A job (application) request callback
- Insertion of the application subtree in the request callback (if not busy)
- Pruning of the application subtree in a post-tick handler (if finished)
- A status report service for external clients of the tree
Note
Only the basics are demonstrated here, but you could imagine extensions to this class that would make it truly useful in an application driven robotics system - abstractions so application modules need not be known in advance, application subtrees delivered as python code, more detailed tree introspection in status reports (given it’s responsibility to be the decision making engine for the robot, it is the best snapshot of the robot’s current activity). You’re only limited by your imagination!
Core Tree (Dot Graph)¶
$ py-trees-render -b py_trees_ros_tutorials.eight_dynamic_application_loading.tutorial_create_root
Application Subtree (Dot Graph)¶
$ py-trees-render --with-blackboard-variables py_trees_ros_tutorials.eight_dynamic_application_loading.tutorial_create_scan_subtree
Dynamic Application Tree (Class)¶
1 2 3 4 5 | class DynamicApplicationTree(py_trees_ros.trees.BehaviourTree):
"""
Wraps the ROS behaviour tree manager in a class that manages loading
and unloading of jobs.
"""
|
1 2 3 4 5 6 7 8 9 10 11 12 | def __init__(self):
"""
Create the core tree and add post tick handlers for post-execution
management of the tree.
"""
super().__init__(
root=tutorial_create_root(),
unicode_tree_debug=True
)
self.add_post_tick_handler(
self.prune_application_subtree_if_done
)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | def setup(self, timeout: float):
"""
Setup the tree and connect additional application management / status
report subscribers and services.
Args:
timeout: time (s) to wait (use common.Duration.INFINITE to block indefinitely)
"""
super().setup(timeout=timeout)
self._report_service = self.node.create_service(
srv_type=py_trees_srvs.StatusReport,
srv_name="~/report",
callback=self.deliver_status_report,
qos_profile=rclpy.qos.qos_profile_services_default
)
self._job_subscriber = self.node.create_subscription(
msg_type=std_msgs.Empty,
topic="/dashboard/scan",
callback=self.receive_incoming_job,
qos_profile=py_trees_ros.utilities.qos_profile_unlatched()
)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | def receive_incoming_job(self, msg: std_msgs.Empty):
"""
Incoming job callback.
Args:
msg: incoming goal message
Raises:
Exception: be ready to catch if any of the behaviours raise an exception
"""
if self.busy():
self.node.get_logger().warning("rejecting new job, last job is still active")
else:
scan_subtree = tutorial_create_scan_subtree()
try:
py_trees.trees.setup(
root=scan_subtree,
node=self.node
)
except Exception as e:
console.logerror(console.red + "failed to setup the scan subtree, aborting [{}]".format(str(e)) + console.reset)
sys.exit(1)
self.insert_subtree(scan_subtree, self.priorities.id, 1)
self.node.get_logger().info("inserted job subtree")
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def prune_application_subtree_if_done(self, tree):
"""
Check if a job is running and if it has finished. If so, prune the job subtree from the tree.
Additionally, make a status report upon introspection of the tree.
Args:
tree (:class:`~py_trees.trees.BehaviourTree`): tree to investigate/manipulate.
"""
# executing
if self.busy():
job = self.priorities.children[-2]
# finished
if job.status == py_trees.common.Status.SUCCESS or job.status == py_trees.common.Status.FAILURE:
self.node.get_logger().info("{0}: finished [{1}]".format(job.name, job.status))
for node in job.iterate():
node.shutdown()
tree.prune_subtree(job.id)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
def deliver_status_report(
self,
unused_request: py_trees_srvs.StatusReport.Request, # noqa
response: py_trees_srvs.StatusReport.Response # noqa
):
"""
Prepare a status report for an external service client.
Args:
unused_request: empty request message
"""
# last result value or none
last_result = self.blackboard_exchange.blackboard.get(name="scan_result")
if self.busy():
response.report = "executing"
elif self.root.tip().has_parent_with_name("Battery Emergency"):
response.report = "battery [last result: {}]".format(last_result)
else:
response.report = "idle [last result: {}]".format(last_result)
return response
|
Note
In the code above, there is a conspicuous absence of thread locks. This is possible due to the use of ROS2’s single threaded executors to handle service and subscriber callbacks along with the tree’s tick tock that operates from within ROS2 timer callbacks. If using a behaviour tree, as is exemplified here, to handle robot application logic, you should never need to go beyond single threaded execution and thus avoid the complexity and bugs that come along with having to handle concurrency (this is a considerable improvement on the situation for ROS1).
Running¶
# Launch the tutorial
$ ros2 launch py_trees_ros_tutorials tutorial_eight_dynamic_application_laoding_launch.py
# In another shell, catch the tree snapshots
$ py-trees-tree-watcher -b
# Trigger scan/cancel requests from the qt dashboard
Tutorial 9 - Bagging Trees¶
Coming soon…