#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
# https://raw.github.com/splintered-reality/py_trees_ros/license/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
The :class:`py_trees_ros.trees.BehaviourTree` class
extends the core :class:`py_trees.trees.BehaviourTree` class
with both blackboard and (tree) snapshot streaming services.
Interact with these services via the :ref:`py-trees-blackboard-watcher` and
:ref:`py-trees-tree-watcher` command line utilities.
"""
##############################################################################
# Imports
##############################################################################
import collections
import enum
import functools
import os
import math
import statistics
import subprocess
import tempfile
import time
import typing
import uuid
import diagnostic_msgs.msg as diagnostic_msgs # noqa
import py_trees
import py_trees.console as console
import py_trees_ros_interfaces.msg as py_trees_msgs # noqa
import py_trees_ros_interfaces.srv as py_trees_srvs # noqa
import rcl_interfaces.msg as rcl_interfaces_msgs
import rclpy
import unique_identifier_msgs.msg as unique_identifier_msgs
from . import blackboard
from . import conversions
from . import exceptions
from . import utilities
from . import visitors
##############################################################################
# Tree Management
##############################################################################
[docs]class SnapshotStream(object):
"""
SnapshotStream instances are responsible for creating / destroying
a snapshot stream as well as the configurable curation of snapshots
that are published on it.
"""
_counter = 0
"""Incremental counter guaranteeing unique watcher names"""
[docs] class Parameters(object):
"""
Reconfigurable parameters for the snapshot stream.
Args:
blackboard_data: publish blackboard variables on the visited path
blackboard_activity: enable and publish blackboard activity in the last tick
snapshot_period: period between snapshots (use /inf to only publish on tree status changes)
"""
def __init__(
self,
blackboard_data: bool=False,
blackboard_activity: bool=False,
snapshot_period: float=py_trees.common.Duration.INFINITE
):
self.blackboard_data = blackboard_data
self.blackboard_activity = blackboard_activity
self.snapshot_period = py_trees.common.Duration.INFINITE.value if snapshot_period == py_trees.common.Duration.INFINITE else snapshot_period
def __init__(
self,
node: rclpy.node.Node,
topic_name: str=None,
parameters: 'SnapshotStream.Parameters'=None,
):
"""
Create the publisher, ready for streaming.
Args:
node: node to hook ros communications on
topic_name: snapshot stream name, uniquely generated if None
parameters: configuration of the snapshot stream
"""
self.node = node
self.topic_name = SnapshotStream.expand_topic_name(self.node, topic_name)
self.publisher = self.node.create_publisher(
msg_type=py_trees_msgs.BehaviourTree,
topic=self.topic_name,
qos_profile=utilities.qos_profile_latched()
)
self.parameters = parameters if parameters is not None else SnapshotStream.Parameters()
self.last_snapshot_timestamp = None
self.statistics = None
[docs] @staticmethod
def expand_topic_name(node: rclpy.node.Node, topic_name: str) -> str:
"""
Custom name expansion depending on the topic name provided. This is part of the
stream configuration on request which either provides no hint (automatic name
generation), a simple hint (relative name expanded under the snapshot streams
namespace) or a complete hint (absolute name that does not need expansion).
Args:
node: node used to reference the absolute namespace if the hint is not absolute
topic_name: hint for the topic name
Returns:
the expanded topic name
"""
if topic_name is None or not topic_name:
expanded_topic_name = rclpy.expand_topic_name.expand_topic_name(
topic_name="~/snapshot_streams/_snapshots_" + str(SnapshotStream._counter),
node_name=node.get_name(),
node_namespace=node.get_namespace()
)
SnapshotStream._counter += 1
return expanded_topic_name
elif topic_name.startswith("~"):
return rclpy.expand_topic_name.expand_topic_name(
topic_name=topic_name,
node_name=node.get_name(),
node_namespace=node.get_namespace()
)
elif not topic_name.startswith("/"):
return rclpy.expand_topic_name.expand_topic_name(
topic_name="~/snapshot_streams/" + topic_name,
node_name=node.get_name(),
node_namespace=node.get_namespace()
)
else:
return topic_name
[docs] def publish(
self,
root: py_trees.behaviour.Behaviour,
changed: bool,
statistics: py_trees_msgs.Statistics,
visited_behaviour_ids: typing.Set[uuid.UUID],
visited_blackboard_client_ids: typing.Set[uuid.UUID]
):
""""
Publish a snapshot, including only what has been parameterised.
Args:
root: the tree
changed: whether the tree status / graph changed or not
statistics: add tree statistics to the snapshot data
visited_behaviour_ids: behaviours on the visited path
visited_blackboard_client_ids: blackboard clients belonging to behaviours on the visited path
"""
if self.last_snapshot_timestamp is None:
changed = True
self.last_snapshot_timestamp = time.monotonic()
current_timestamp = time.monotonic()
elapsed_time = current_timestamp - self.last_snapshot_timestamp
if_its_close_enough = 0.98 # https://github.com/splintered-reality/py_trees_ros/issues/144
if (not changed and elapsed_time < if_its_close_enough * self.parameters.snapshot_period):
return
tree_message = py_trees_msgs.BehaviourTree()
tree_message.changed = changed
# tree
for behaviour in root.iterate():
msg = conversions.behaviour_to_msg(behaviour)
msg.is_active = True if behaviour.id in visited_behaviour_ids else False
tree_message.behaviours.append(msg)
# blackboard
if self.parameters.blackboard_data:
visited_keys = py_trees.blackboard.Blackboard.keys_filtered_by_clients(
client_ids=visited_blackboard_client_ids
)
for key in visited_keys:
try:
value = str(py_trees.blackboard.Blackboard.get(key))
except KeyError:
value = "-"
tree_message.blackboard_on_visited_path.append(
diagnostic_msgs.KeyValue(
key=key,
value=value
)
)
# activity stream
# TODO: checking the stream is not None is redundant, perhaps use it as an exception check?
if self.parameters.blackboard_activity and py_trees.blackboard.Blackboard.activity_stream is not None:
tree_message.blackboard_activity = conversions.activity_stream_to_msgs()
# other
if statistics is not None:
tree_message.statistics = statistics
self.publisher.publish(tree_message)
self.last_snapshot_timestamp = current_timestamp
[docs] def shutdown(self):
"""
Shutdown the (temporarily) created publisher.
"""
self.node.destroy_publisher(self.publisher)
[docs]class BehaviourTree(py_trees.trees.BehaviourTree):
"""
Extend the :class:`py_trees.trees.BehaviourTree` class with
a few bells and whistles for ROS.
ROS Parameters:
* **default_snapshot_stream**: enable/disable the default snapshots stream in ~/snapshots` (default: False)
* **default_snapshot_period**: periodically publish (default: :data:`math.inf`)
* **default_snapshot_blackboard_data**: include tracking and visited variables (default: True)
* **default_snapshot_blackboard_activity**: include the blackboard activity (default: False)
* **setup_timeout**: time (s) to wait (default: :data:`math.inf`)
* if :data:`math.inf`, it will block indefinitely
ROS Publishers:
* **~/snapshots** (:class:`py_trees_interfaces.msg.BehaviourTree`): the default snapshot stream, if enabled
ROS Services:
* **~/blackboard_streams/close** (:class:`py_trees_ros_interfaces.srv.CloselackboardWatcher`)
* **~/blackboard_streams/get_variables** (:class:`py_trees_ros_interfaces.srv.GetBlackboardVariables`)
* **~/blackboard_streams/open** (:class:`py_trees_ros_interfaces.srv.OpenBlackboardWatcher`)
* **~/snapshot_streams/close** (:class:`py_trees_ros_interfaces.srv.CloseSnapshotsStream`)
* **~/snapshot_streams/open** (:class:`py_trees_ros_interfaces.srv.OpenSnapshotsStream`)
* **~/snapshot_streams/reconfigure** (:class:`py_trees_ros_interfaces.srv.ReconfigureSnapshotsStream`)
Topics and services are not intended for direct use, but facilitate the operation of the
utilities :ref:`py-trees-tree-watcher` and :ref:`py-trees-blackboard-watcher`.
Args:
root: root node of the tree
unicode_tree_debug: print to console the visited ascii tree after every tick
Raises:
AssertionError: if incoming root variable is not the correct type
"""
def __init__(self,
root: py_trees.behaviour.Behaviour,
unicode_tree_debug: bool=False):
super(BehaviourTree, self).__init__(root)
if unicode_tree_debug:
self.snapshot_visitor = py_trees.visitors.DisplaySnapshotVisitor()
else:
self.snapshot_visitor = py_trees.visitors.SnapshotVisitor()
self.visitors.append(self.snapshot_visitor)
self.statistics = None
self.tick_start_time = None
self.time_series = collections.deque([])
self.tick_interval_series = collections.deque([])
self.tick_duration_series = collections.deque([])
self.pre_tick_handlers.append(self._statistics_pre_tick_handler)
self.post_tick_handlers.append(self._statistics_post_tick_handler)
self.timer = None
# delay ROS artifacts so we can construct the tree without a ROS connection
self.node = None
self.snapshot_streams = {}
[docs] def setup(
self,
node: typing.Optional[rclpy.node.Node]=None,
node_name: str="tree",
timeout: float=py_trees.common.Duration.INFINITE,
visitor: typing.Optional[py_trees.visitors.VisitorBase]=None,
**kwargs: int
):
"""
Setup the publishers, exchange and add ROS relevant pre/post tick handlers to the tree.
Ultimately relays this call down to all the behaviours in the tree.
Args:
node: Optional ROS Node object. If None (default), creates its own node.
node_name: Name of ROS node created. Only takes effect if `node` is None.
timeout: time (s) to wait (use common.Duration.INFINITE to block indefinitely)
visitor: runnable entities on each node after it's setup
**kwargs: distribute args to this behaviour and in turn, to it's children
.. note:
This method declares parameters for the snapshot_period and setup_timeout.
These parameters take precedence over the period and timeout args provided here.
If parameters are not configured at runtime, then the period and timeout args
provided here will initialise the declared parameters.
Raises:
rclpy.exceptions.NotInitializedException: rclpy not yet initialised
TypeError: node argument is not a valid rclpy.node.Node object
Exception: be ready to catch if any of the behaviours raise an exception
"""
# node creation - can raise rclpy.exceptions.NotInitializedException
if node:
# Use existing node if one is passed in, and is of the correct type.
if isinstance(node, rclpy.node.Node):
self.node = node
else:
raise TypeError(f"invalid node object [received: {type(node)}][expected: rclpy.node.Node]")
else:
# Node creation - can raise rclpy.exceptions.NotInitializedException
self.node = rclpy.create_node(node_name=node_name)
if visitor is None:
visitor = visitors.SetupLogger(node=self.node)
self.default_snapshot_stream_topic_name = SnapshotStream.expand_topic_name(
node=self.node,
topic_name="~/snapshots"
)
########################################
# ROS Comms
########################################
self.snapshot_stream_services = utilities.Services(
node=self.node,
service_details=[
("close", "~/snapshot_streams/close", py_trees_srvs.CloseSnapshotStream, self._close_snapshot_stream),
("open", "~/snapshot_streams/open", py_trees_srvs.OpenSnapshotStream, self._open_snapshot_stream),
("reconfigure", "~/snapshot_streams/reconfigure", py_trees_srvs.ReconfigureSnapshotStream, self._reconfigure_snapshot_stream),
],
introspection_topic_name="snapshot_streams/services"
)
self.blackboard_exchange = blackboard.Exchange()
self.blackboard_exchange.setup(self.node)
################################################################################
# Parameters
################################################################################
self.node.add_on_set_parameters_callback(
callback=self._set_parameters_callback
)
########################################
# default_snapshot_stream
########################################
self.node.declare_parameter(
name='default_snapshot_stream',
value=False,
descriptor=rcl_interfaces_msgs.ParameterDescriptor(
name="default_snapshot_stream",
type=rcl_interfaces_msgs.ParameterType.PARAMETER_BOOL, # noqa
description="enable/disable the default snapshot stream in ~/snapshots",
additional_constraints="",
read_only=False,
)
)
########################################
# default_snapshot_period
########################################
self.node.declare_parameter(
name='default_snapshot_period',
value=2.0, # DJS: py_trees.common.Duration.INFINITE.value,
descriptor=rcl_interfaces_msgs.ParameterDescriptor(
name="default_snapshot_period",
type=rcl_interfaces_msgs.ParameterType.PARAMETER_DOUBLE, # noqa
description="time between snapshots, set to math.inf to only publish tree state changes",
additional_constraints="",
read_only=False,
floating_point_range=[rcl_interfaces_msgs.FloatingPointRange(
from_value=0.0,
to_value=py_trees.common.Duration.INFINITE.value)]
)
)
########################################
# default_snapshot_blackboard_data
########################################
self.node.declare_parameter(
name='default_snapshot_blackboard_data',
value=True,
descriptor=rcl_interfaces_msgs.ParameterDescriptor(
name="default_snapshot_blackboard_data",
type=rcl_interfaces_msgs.ParameterType.PARAMETER_BOOL, # noqa
description="append blackboard data (tracking status, visited variables) to the default snapshot stream",
additional_constraints="",
read_only=False,
)
)
########################################
# default_snapshot_blackboard_activity
########################################
self.node.declare_parameter(
name='default_snapshot_blackboard_activity',
value=False,
descriptor=rcl_interfaces_msgs.ParameterDescriptor(
name="default_snapshot_blackboard_activity",
type=rcl_interfaces_msgs.ParameterType.PARAMETER_BOOL, # noqa
description="append the blackboard activity stream to the default snapshot stream",
additional_constraints="",
read_only=False,
)
)
########################################
# setup_timeout
########################################
self.node.declare_parameter(
name='setup_timeout',
value=timeout if timeout != py_trees.common.Duration.INFINITE else py_trees.common.Duration.INFINITE.value,
descriptor=rcl_interfaces_msgs.ParameterDescriptor(
name="setup_timeout",
type=rcl_interfaces_msgs.ParameterType.PARAMETER_DOUBLE, # noqa
description="timeout for ROS tree setup (node, pubs, subs, ...)",
additional_constraints="",
read_only=True,
floating_point_range=[rcl_interfaces_msgs.FloatingPointRange(
from_value=0.0,
to_value=py_trees.common.Duration.INFINITE.value)]
)
)
# Get the resulting timeout
setup_timeout = self.node.get_parameter("setup_timeout").value
# Ugly workaround to accomodate use of the enum (TODO: rewind this)
# Need to pass the enum for now (instead of just a float) in case
# there are behaviours out in the wild that apply logic around the
# use of the enum
if setup_timeout == py_trees.common.Duration.INFINITE.value:
setup_timeout = py_trees.common.Duration.INFINITE
########################################
# Behaviours
########################################
try:
super().setup(
timeout=setup_timeout,
visitor=visitor,
node=self.node,
**kwargs
)
except RuntimeError as e:
if str(e) == "tree setup interrupted or timed out":
raise exceptions.TimedOutError(str(e))
else:
raise
########################################
# Setup Handlers
########################################
# set a handler to publish future modifications whenever the tree is modified
# (e.g. pruned). The tree_update_handler method is in the base class, set this
# to the callback function here.
self.tree_update_handler = self._on_tree_update_handler
self.post_tick_handlers.append(self._snapshots_post_tick_handler)
def _set_parameters_callback(
self,
parameters: typing.List[rclpy.parameter.Parameter]
) -> rcl_interfaces_msgs.SetParametersResult:
"""
Callback that dynamically handles changes in parameters.
Args:
parameters: list of one or more declared parameters with updated values
Returns:
result of the set parameters requests
"""
for parameter in parameters:
if parameter.name == "default_snapshot_stream":
if self.default_snapshot_stream_topic_name in self.snapshot_streams:
self.snapshot_streams[self.default_snapshot_stream_topic_name].shutdown()
if self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.blackboard_activity:
self.blackboard_exchange.unregister_activity_stream_client()
del self.snapshot_streams[self.default_snapshot_stream_topic_name]
if parameter.value:
try:
parameters = SnapshotStream.Parameters(
snapshot_period=self.node.get_parameter("default_snapshot_period").value,
blackboard_data=self.node.get_parameter("default_snapshot_blackboard_data").value,
blackboard_activity=self.node.get_parameter("default_snapshot_blackboard_activity").value
)
except rclpy.exceptions.ParameterNotDeclaredException:
parameters = SnapshotStream.Parameters()
self.snapshot_streams[self.default_snapshot_stream_topic_name] = SnapshotStream(
node=self.node,
topic_name=self.default_snapshot_stream_topic_name,
parameters=parameters
)
if self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.blackboard_activity:
self.blackboard_exchange.register_activity_stream_client()
elif parameter.name == "default_snapshot_blackboard_data":
if self.default_snapshot_stream_topic_name in self.snapshot_streams:
self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.blackboard_data = parameter.value
elif parameter.name == "default_snapshot_blackboard_activity":
if self.default_snapshot_stream_topic_name in self.snapshot_streams:
if self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.blackboard_activity != parameter.value:
if parameter.value:
self.blackboard_exchange.register_activity_stream_client()
else:
self.blackboard_exchange.unregister_activity_stream_client()
self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.blackboard_activity = parameter.value
elif parameter.name == "default_snapshot_period":
if self.default_snapshot_stream_topic_name in self.snapshot_streams:
self.snapshot_streams[self.default_snapshot_stream_topic_name].parameters.snapshot_period = parameter.value
return rcl_interfaces_msgs.SetParametersResult(successful=True)
[docs] def tick_tock(
self,
period_ms,
number_of_iterations=py_trees.trees.CONTINUOUS_TICK_TOCK,
pre_tick_handler=None,
post_tick_handler=None):
"""
Tick continuously at the period specified.
This is a re-implementation of the
:meth:`~py_trees.trees.BehaviourTree.tick_tock`
tick_tock that takes advantage of the rclpy timers so callbacks are interleaved inbetween
rclpy callbacks (keeps everything synchronous so no need for locks).
Args:
period_ms (:obj:`float`): sleep this much between ticks (milliseconds)
number_of_iterations (:obj:`int`): number of iterations to tick-tock
pre_tick_handler (:obj:`func`): function to execute before ticking
post_tick_handler (:obj:`func`): function to execute after ticking
"""
self.timer = self.node.create_timer(
period_ms / 1000.0, # unit 'seconds'
functools.partial(
self._tick_tock_timer_callback,
number_of_iterations=number_of_iterations,
pre_tick_handler=pre_tick_handler,
post_tick_handler=post_tick_handler
)
)
self.tick_tock_count = 0
[docs] def shutdown(self):
"""
Cleanly shut down rclpy timers and nodes.
"""
# stop ticking if we're ticking
if self.node is not None:
if self.timer is not None:
self.timer.cancel()
self.node.destroy_timer(self.timer)
# call shutdown on each behaviour first, in case it has
# some esoteric shutdown steps
super().shutdown()
if self.node is not None:
# shutdown the node - this *should* automagically clean
# up any non-estoeric shutdown of ros communications
# inside behaviours
self.node.destroy_node()
def _tick_tock_timer_callback(
self,
number_of_iterations,
pre_tick_handler,
post_tick_handler):
"""
Tick tock callback passed to the timer to be periodically triggered.
Args:
number_of_iterations (:obj:`int`): number of iterations to tick-tock
pre_tick_handler (:obj:`func`): function to execute before ticking
post_tick_handler (:obj:`func`): function to execute after ticking
"""
if (number_of_iterations == py_trees.trees.CONTINUOUS_TICK_TOCK or
self.tick_tock_count < number_of_iterations):
self.tick(pre_tick_handler, post_tick_handler)
self.tick_tock_count += 1
else:
self.timer.cancel()
def _on_tree_update_handler(self):
"""
Whenever there has been a modification to the tree (insertion/pruning), publish
the snapshot.
"""
# only worth notifying once we've actually commenced
if self.statistics is not None:
rclpy_start_time = rclpy.clock.Clock().now()
self.statistics.stamp = rclpy_start_time.to_msg()
for unused_topic_name, snapshot_stream in self.snapshot_streams.items():
snapshot_stream.publish(
root=self.root,
changed=True,
statistics=self.statistics,
visited_behaviour_ids=self.snapshot_visitor.visited.keys(),
visited_blackboard_client_ids=self.snapshot_visitor.visited_blackboard_client_ids
)
def _statistics_pre_tick_handler(self, tree: py_trees.trees.BehaviourTree):
"""
Pre-tick handler that resets the statistics and starts the clock.
Args:
tree (:class:`~py_trees.trees.BehaviourTree`): the behaviour tree that has just been ticked
"""
if len(self.time_series) == 10:
self.time_series.popleft()
self.tick_interval_series.popleft()
rclpy_start_time = rclpy.clock.Clock().now()
self.time_series.append(conversions.rclpy_time_to_float(rclpy_start_time))
if len(self.time_series) == 1:
self.tick_interval_series.append(0.0)
else:
self.tick_interval_series.append(self.time_series[-1] - self.time_series[-2])
self.statistics = py_trees_msgs.Statistics()
self.statistics.count = self.count
self.statistics.stamp = rclpy_start_time.to_msg()
self.statistics.tick_interval = self.tick_interval_series[-1]
self.statistics.tick_interval_average = sum(self.tick_interval_series) / len(self.tick_interval_series)
if len(self.tick_interval_series) > 1:
self.statistics.tick_interval_variance = statistics.variance(
self.tick_interval_series,
self.statistics.tick_interval_average
)
else:
self.statistics.tick_interval_variance = 0.0
def _statistics_post_tick_handler(self, tree: py_trees.trees.BehaviourTree):
"""
Post-tick handler that completes the statistics generation.
Args:
tree (:class:`~py_trees.trees.BehaviourTree`): the behaviour tree that has just been ticked
"""
duration = conversions.rclpy_time_to_float(rclpy.clock.Clock().now()) - self.time_series[-1]
if len(self.tick_duration_series) == 10:
self.tick_duration_series.popleft()
self.tick_duration_series.append(duration)
self.statistics.tick_duration = duration
self.statistics.tick_duration_average = sum(self.tick_duration_series) / len(self.tick_duration_series)
if len(self.tick_duration_series) > 1:
self.statistics.tick_duration_variance = statistics.variance(
self.tick_duration_series,
self.statistics.tick_duration_average
)
else:
self.statistics.tick_duration_variance = 0.0
def _snapshots_post_tick_handler(self, tree: py_trees.trees.BehaviourTree):
"""
Post-tick handler that checks for changes in the tree/blackboard as a result
of it's last tick and publish updates on ROS topics.
Args:
tree (:class:`~py_trees.trees.BehaviourTree`): the behaviour tree that has just been ticked
"""
# checks
if self.node is None:
self.node.get_logger().error("call setup() on this tree to initialise the ros components")
return
if self.root.tip() is None:
self.node.get_logger().error("the root behaviour failed to return a tip [hint: tree is in an INVALID state, usually as a result of incorrectly coded behaviours]")
return
# publish the default snapshot stream (useful for logging)
for unused_topic_name, snapshot_stream in self.snapshot_streams.items():
snapshot_stream.publish(
root=self.root,
changed=self.snapshot_visitor.changed,
statistics=self.statistics,
visited_behaviour_ids=self.snapshot_visitor.visited.keys(),
visited_blackboard_client_ids=self.snapshot_visitor.visited_blackboard_client_ids
)
# every tick publish on watchers, clear activity stream (note: not expensive as watchers by default aren't connected)
self.blackboard_exchange.post_tick_handler(
visited_client_ids=self.snapshot_visitor.visited_blackboard_client_ids # .keys()
)
def _close_snapshot_stream(
self,
request: py_trees_srvs.CloseSnapshotStream.Request, # noqa
response: py_trees_srvs.CloseSnapshotStream.Response # noqa
) -> py_trees_srvs.CloseSnapshotStream.Response:
response.result = True
try:
self.snapshot_streams[request.topic_name].shutdown()
if self.snapshot_streams[request.topic_name].parameters.blackboard_activity:
self.blackboard_exchange.unregister_activity_stream_client()
del self.snapshot_streams[request.topic_name]
except KeyError:
response.result = False
return response
def _open_snapshot_stream(
self,
request: py_trees_srvs.OpenSnapshotStream.Request, # noqa
response: py_trees_srvs.OpenSnapshotStream.Response # noqa
) -> py_trees_srvs.OpenSnapshotStream.Response:
snapshot_stream = SnapshotStream(
node=self.node,
topic_name=request.topic_name,
parameters=SnapshotStream.Parameters(
blackboard_data=request.parameters.blackboard_data,
blackboard_activity=request.parameters.blackboard_activity,
snapshot_period=request.parameters.snapshot_period
)
)
if snapshot_stream.parameters.blackboard_activity:
self.blackboard_exchange.register_activity_stream_client()
self.snapshot_streams[snapshot_stream.topic_name] = snapshot_stream
response.topic_name = snapshot_stream.topic_name
return response
def _reconfigure_snapshot_stream(
self,
request: py_trees_srvs.ReconfigureSnapshotStream.Request, # noqa
response: py_trees_srvs.ReconfigureSnapshotStream.Response # noqa
) -> py_trees_srvs.ReconfigureSnapshotStream.Response:
response.result = True
try:
snapshot_stream = self.snapshot_streams[request.topic_name]
snapshot_stream.parameters.blackboard_data = request.parameters.blackboard_data
if (snapshot_stream.parameters.blackboard_activity != request.parameters.blackboard_activity):
if request.parameters.blackboard_activity:
self.blackboard_exchange.register_activity_stream_client()
else:
self.blackboard_exchange.unregister_activity_stream_client()
snapshot_stream.parameters.blackboard_activity = request.parameters.blackboard_activity
snapshot_stream.parameters.snapshot_period = request.parameters.snapshot_period
except KeyError:
response.result = False
return response
def _cleanup(self):
with self.lock:
# self.bag.close()
self.interrupt_tick_tocking = True
self._bag_closed = True
##############################################################################
# Tree Watcher
##############################################################################
[docs]class WatcherMode(enum.Enum):
"""An enumerator specifying the view mode for the watcher"""
SNAPSHOTS = "SNAPSHOTS"
"""Render ascii/unicode snapshots from a snapshot stream"""
DOT_GRAPH = "DOT_GRAPH"
"""Render with the dot graph representation of the static tree (using an application or text to console)."""
[docs]class Watcher(object):
"""
The tree watcher sits on the other side of a running
:class:`~py_trees_ros.trees.BehaviourTree` and is a useful mechanism for
quick introspection of it's current state.
Args:
topic_name: location of the snapshot stream (optional)
namespace_hint: refine search for snapshot stream services if there is more than one tree (optional)
parameters: snapshot stream configuration controlling both on-the-fly stream creation and display
statistics: display statistics
.. seealso:: :mod:`py_trees_ros.programs.tree_watcher`
"""
def __init__(
self,
topic_name: str=None,
namespace_hint: str=None,
parameters: SnapshotStream.Parameters=None,
statistics: bool=False,
mode: WatcherMode=WatcherMode.SNAPSHOTS
):
self.topic_name = topic_name
self.namespace_hint = namespace_hint
self.mode = mode
self.parameters = parameters if parameters is not None else SnapshotStream.Parameters()
self.statistics = statistics
self.subscriber = None
self.snapshot_visitor = py_trees.visitors.SnapshotVisitor()
self.done = False
self.xdot_process = None
self.rendered = None
self.services = {
'open': None,
'close': None
}
self.service_names = {
'open': None,
'close': None
}
self.service_type_strings = {
'open': 'py_trees_ros_interfaces/srv/OpenSnapshotStream',
'close': 'py_trees_ros_interfaces/srv/CloseSnapshotStream'
}
self.service_types = {
'open': py_trees_srvs.OpenSnapshotStream,
'close': py_trees_srvs.CloseSnapshotStream
}
[docs] def setup(self, timeout_sec: float):
"""
Creates the node and checks that all of the server-side services are available
for calling on to open a connection.
Args:
timeout_sec: time (s) to wait (use common.Duration.INFINITE to block indefinitely)
Raises:
:class:`~py_trees_ros.exceptions.NotFoundError`: if services/topics were not found
:class:`~py_trees_ros.exceptions.MultipleFoundError`: if multiple services were found
:class:`~py_trees_ros.exceptions.TimedOutError`: if service clients time out waiting for the server
"""
self.node = rclpy.create_node(
node_name=utilities.create_anonymous_node_name(node_name='tree_watcher'),
start_parameter_services=False
)
# open a snapshot stream
if self.topic_name is None:
# discover actual service names
for service_name in self.service_names.keys():
# can raise NotFoundError and MultipleFoundError
self.service_names[service_name] = utilities.find_service(
node=self.node,
service_type=self.service_type_strings[service_name],
namespace=self.namespace_hint,
timeout=timeout_sec
)
# create service clients
self.services["open"] = self.create_service_client(key="open")
self.services["close"] = self.create_service_client(key="close")
# request a stream
request = self.service_types["open"].Request()
request.parameters.blackboard_data = self.parameters.blackboard_data
request.parameters.blackboard_activity = self.parameters.blackboard_activity
request.parameters.snapshot_period = self.parameters.snapshot_period
future = self.services["open"].call_async(request)
rclpy.spin_until_future_complete(self.node, future)
response = future.result()
self.topic_name = response.topic_name
# connect to a snapshot stream
start_time = time.monotonic()
while True:
elapsed_time = time.monotonic() - start_time
if elapsed_time > timeout_sec:
raise exceptions.TimedOutError("timed out waiting for a snapshot stream publisher [{}]".format(self.topic_name))
if self.node.count_publishers(self.topic_name) > 0:
break
time.sleep(0.1)
self.subscriber = self.node.create_subscription(
msg_type=py_trees_msgs.BehaviourTree,
topic=self.topic_name,
callback=self.callback_snapshot,
qos_profile=utilities.qos_profile_latched()
)
def shutdown(self):
if self.services["close"] is not None:
request = self.service_types["close"].Request()
request.topic_name = self.topic_name
future = self.services["close"].call_async(request)
rclpy.spin_until_future_complete(self.node, future)
unused_response = future.result()
self.node.destroy_node()
[docs] def create_service_client(self, key: str):
"""
Convenience api for opening a service client and waiting for the service to appear.
Args:
key: one of 'open', 'close'.
Raises:
:class:`~py_trees_ros.exceptions.NotReadyError`: if setup() wasn't called to identify the relevant services to connect to.
:class:`~py_trees_ros.exceptions.TimedOutError`: if it times out waiting for the server
"""
if self.service_names[key] is None:
raise exceptions.NotReadyError(
"no known '{}' service known [did you call setup()?]".format(self.service_types[key])
)
client = self.node.create_client(
srv_type=self.service_types[key],
srv_name=self.service_names[key],
qos_profile=rclpy.qos.qos_profile_services_default
)
# hardcoding timeouts will get us into trouble
if not client.wait_for_service(timeout_sec=3.0):
raise exceptions.TimedOutError(
"timed out waiting for {}".format(self.service_names['close'])
)
return client
[docs] def callback_snapshot(self, msg):
"""
Formats the string message coming in.
Args
msg (:class:`py_trees_ros_interfaces.msg.BehaviourTree`):serialised snapshot
"""
####################
# Processing
####################
self.snapshot_visitor.previously_visited = self.snapshot_visitor.visited
self.snapshot_visitor.visited = {}
serialised_behaviours = {}
root_id = None
for serialised_behaviour in msg.behaviours:
if serialised_behaviour.parent_id == unique_identifier_msgs.UUID():
root_id = conversions.msg_to_uuid4(serialised_behaviour.own_id)
serialised_behaviours[
conversions.msg_to_uuid4(serialised_behaviour.own_id)
] = serialised_behaviour
def deserialise_tree_recursively(msg):
behaviour = conversions.msg_to_behaviour(msg)
for serialised_child_id in msg.child_ids:
child_id = conversions.msg_to_uuid4(serialised_child_id)
child = deserialise_tree_recursively(
serialised_behaviours[child_id]
)
# invasive hack to revert the dummy child we added in msg_to_behaviour
if isinstance(behaviour, py_trees.decorators.Decorator):
behaviour.children = [child]
behaviour.decorated = behaviour.children[0]
else:
behaviour.children.append(child)
child.parent = behaviour
# set the current child so tip() works properly everywhere
if behaviour.children:
if msg.current_child_id != unique_identifier_msgs.UUID():
current_child_id = conversions.msg_to_uuid4(msg.current_child_id)
for index, child in enumerate(behaviour.children):
if child.id == current_child_id:
# somewhat ugly not having a consistent api here
if isinstance(behaviour, py_trees.composites.Selector):
behaviour.current_child = child
elif isinstance(behaviour, py_trees.composites.Sequence):
behaviour.current_index = index
# else Parallel, nothing to do since it infers
# the current child from children's status on the fly
break
if msg.is_active:
self.snapshot_visitor.visited[behaviour.id] = behaviour.status
return behaviour
# we didn't set the tip in any behaviour, but nothing depends
# on that right now
root = deserialise_tree_recursively(serialised_behaviours[root_id])
####################
# Streaming
####################
if self.mode == WatcherMode.SNAPSHOTS:
if msg.changed:
colour = console.green
else:
colour = console.bold
####################
# Banner
####################
title = "Tick {}".format(msg.statistics.count)
print(colour + "\n" + 80 * "*" + console.reset)
print(colour + "* " + console.bold + title.center(80) + console.reset)
print(colour + 80 * "*" + "\n" + console.reset)
####################
# Tree Snapshot
####################
print(
py_trees.display.unicode_tree(
root=root,
visited=self.snapshot_visitor.visited,
previously_visited=self.snapshot_visitor.previously_visited
)
)
print(colour + "-" * 80 + console.reset)
####################
# Stream Variables
####################
if self.parameters.blackboard_data:
print("")
print(colour + "Blackboard Data" + console.reset)
indent = " " * 4
if not msg.blackboard_on_visited_path:
print(
console.cyan + indent + "-" + console.reset + " : " +
console.yellow + "-" + console.reset
)
else:
# could probably re-use the unicode_blackboard by passing a dict to it
# like we've done for the activity stream
max_length = 0
for variable in msg.blackboard_on_visited_path:
max_length = len(variable.key) if len(variable.key) > max_length else max_length
for variable in msg.blackboard_on_visited_path:
print(
console.cyan + indent +
'{0: <{1}}'.format(variable.key, max_length + 1) + console.reset + ": " +
console.yellow + '{0}'.format(variable.value) + console.reset
)
####################
# Stream Activity
####################
if self.parameters.blackboard_activity:
print("")
print(colour + "Blackboard Activity Stream" + console.reset)
if msg.blackboard_activity:
print(py_trees.display.unicode_blackboard_activity_stream(
msg.blackboard_activity,
indent=0,
show_title=False
))
else:
indent = " " * 4
print(
console.cyan + indent + "-" + console.reset + " : " +
console.yellow + "-" + console.reset
)
####################
# Stream Statistics
####################
if self.statistics:
print("")
print(colour + "Statistics" + console.reset)
print(
console.cyan + " Timestamp: " + console.yellow +
"{}".format(
conversions.rclpy_time_to_float(
rclpy.time.Time.from_msg(
msg.statistics.stamp
)
)
)
)
print(
console.cyan + " Duration : " + console.yellow +
"{:.3f}/{:.3f}/{:.3f} (ms) [time/avg/stddev]".format(
msg.statistics.tick_duration * 1000,
msg.statistics.tick_duration_average * 1000,
math.sqrt(msg.statistics.tick_duration_variance) * 1000
)
)
print(
console.cyan + " Interval : " + console.yellow +
"{:.3f}/{:.3f}/{:.3f} (s) [time/avg/stddev]".format(
msg.statistics.tick_interval,
msg.statistics.tick_interval_average,
math.sqrt(msg.statistics.tick_interval_variance)
)
)
print(console.reset)
####################
# Dot Graph
####################
elif self.mode == WatcherMode.DOT_GRAPH and not self.rendered:
self.rendered = True
directory_name = tempfile.mkdtemp()
py_trees.display.render_dot_tree(
root=root,
target_directory=directory_name,
with_blackboard_variables=self.parameters.blackboard_data
)
xdot_program = py_trees.utilities.which('xdot')
if not xdot_program:
print("")
console.logerror("No xdot viewer found [hint: sudo apt install xdot]")
print("")
print(py_trees.display.dot_tree(root=root).to_string())
self.done = True
self.xdot_process = None
return
filename = py_trees.utilities.get_valid_filename(root.name) + '.dot'
if xdot_program:
try:
self.xdot_process = subprocess.Popen(
[
xdot_program,
os.path.join(directory_name, filename)
]
)
except KeyboardInterrupt:
pass
self.done = True