Source code for py_trees_ros.blackboard

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
#   https://raw.github.com/splintered-reality/py_trees_ros/license/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
This module provides the :class:`py_trees_ros.blackboard.Exchange` class,
a ROS wrapper around a
:ref:`Blackboard <py_trees:blackboards-section>` that permits
introspection of either the entire board or a window onto a smaller part
of the board over a ROS API via the :ref:`py-trees-blackboard-watcher`
command line utility.
"""

##############################################################################
# Imports
##############################################################################

import copy
import operator
import pickle
import py_trees
import py_trees.console as console
import py_trees_ros_interfaces.srv as py_trees_srvs
import rclpy.executors
import rclpy.expand_topic_name
import rclpy.node
import std_msgs.msg as std_msgs
import time

from . import exceptions
from . import utilities

##############################################################################
# SubView of a Blackboard
##############################################################################


[docs]class BlackboardView(object): """ Utility class that enables tracking and publishing of relevant parts of the blackboard for a user. This is used by the :class:`~py_trees_ros.blackboard.Exchange` operator. Args: topic_name (:obj:`str`): name of the topic for the publisher attrs (:obj:`???`): """ def __init__(self, node, topic_name, attrs): self.blackboard = py_trees.blackboard.Blackboard() self.topic_name = topic_name self.attrs = attrs self.dict = {} self.cached_dict = {} self.publisher = node.create_publisher(std_msgs.String, topic_name) def _update_sub_blackboard(self): if not self.attrs: self.dict = copy.copy(self.blackboard.__dict__) else: for attr in self.attrs: if '/' in attr: check_attr = operator.attrgetter(".".join(attr.split('/'))) else: check_attr = operator.attrgetter(attr) try: value = check_attr(self.blackboard) self.dict[attr] = value except AttributeError: pass def _is_changed(self): self._update_sub_blackboard() current_pickle = pickle.dumps(self.dict, -1) blackboard_changed = current_pickle != self.cached_dict self.cached_dict = current_pickle return blackboard_changed def __str__(self): s = "" max_length = 0 for k in self.dict.keys(): max_length = len(k) if len(k) > max_length else max_length keys = sorted(self.dict) for key in keys: value = self.dict[key] if value is None: value_string = "-" s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ": " + console.yellow + "%s" % (value_string) + console.reset + "\n" else: lines = ("%s" % value).split('\n') if len(lines) > 1: s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ":\n" for line in lines: s += console.yellow + " %s" % line + console.reset + "\n" else: s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ": " + console.yellow + "%s" % (value) + console.reset + "\n" return s.rstrip() # get rid of the trailing newline...print will take care of adding a new line
############################################################################## # Exchange ##############################################################################
[docs]class Exchange(object): """ Establishes ros communications around a :class:`~py_trees.blackboard.Blackboard` that enable users to introspect or watch relevant parts of the blackboard. ROS Publishers: * **~/blackboard** (:class:`std_msgs.msg.String`) * streams (string form) the contents of the entire blackboard as it updates ROS Services: * **~/get_blackboard_variables** (:class:`py_trees_msgs.srv.GetBlackboardVariables`) * list all the blackboard variable names (not values) * **~/open_blackboard_watcher** (:class:`py_trees_msgs.srv.OpenBlackboardWatcher`) * request a publisher to stream a part of the blackboard contents * **~/close_blackboard_watcher** (:class:`py_trees_msgs.srv.CloseBlackboardWatcher`) * close a previously opened watcher Watching could be more simply enabled by just providing a *get* style service on a particular variable(s), however that would introduce an asynchronous interrupt on the library's usage and subsequently, locking. Instead, a watcher service requests for a stream, i.e. a publisher to be created for private use that streams only a subsection of the blackboard to the user. Additional services are provided for closing the stream and listing the variables on the blackboard. .. seealso:: :class:`~py_trees_ros.trees.BehaviourTree` (in which it is used) and :ref:`py-trees-blackboard-watcher` (working with the watchers). """ _counter = 0 """Incremental counter guaranteeing unique watcher names""" def __init__(self): self.node = None self.blackboard = py_trees.blackboard.Blackboard() self.cached_blackboard_dict = {} self.views = [] self.publisher = None self.services = {}
[docs] def setup(self, node: rclpy.node.Node): """ This is where the ros initialisation of publishers and services happens. It is kept outside of the constructor for the same reasons that the familiar py_trees :meth:`~py_trees.trees.BehaviourTree.setup` method has - to enable construction of behaviours and trees offline (away from their execution environment) so that dot graphs and other visualisations of the tree can be created. Args: node (:class:`~rclpy.node.Node`): node to hook ros communications on Examples: It is expected that users will use this in their own customised tree custodian: .. code-block:: python class MyTreeManager(py_trees.trees.BehaviourTree): def __init__(self): pass def setup(self, timeout): super(MyTreeManager, self).setup(timeout) self.exchange = py_trees_ros.blackboard.Exchange() self.exchange.setup(timeout) .. seealso:: This method is called in the way illustrated above in :class:`~py_trees_ros.trees.BehaviourTree`. """ self.node = node self.publisher = self.node.create_publisher(std_msgs.String, '~/exchange/blackboard') for name in ["get_blackboard_variables", "open_blackboard_watcher", "close_blackboard_watcher"]: camel_case_name = ''.join(x.capitalize() for x in name.split('_')) self.services[name] = self.node.create_service( srv_type=getattr(py_trees_srvs, camel_case_name), srv_name='~/exchange/' + name, callback=getattr(self, "_{}_service".format(name)) )
def _get_nested_keys(self): variables = [] def inner(v, k): for attr in dir(type(v)): if not isinstance(v, (bool, list, str, int, float)): if not attr.startswith("_"): value = getattr(v, attr) if not callable(value): if not attr.isupper(): variables.append(k + "/" + attr) inner(value, k + "/" + attr) for key in sorted(self.blackboard.__dict__): variables.append(key) inner(self.blackboard.__dict__[key], key) return variables def _close_watcher(self, req): for i, view in enumerate(self.views): if view.topic_name == req.topic_name: self.node.destroy_publisher(view.publisher) del self.views[i] return True return False def _is_changed(self): current_pickle = pickle.dumps(self.blackboard.__dict__, -1) blackboard_changed = current_pickle != self.cached_blackboard_dict self.cached_blackboard_dict = current_pickle return blackboard_changed
[docs] def publish_blackboard(self, unused_tree=None): """ Lazy string publishing of the blackboard (the contents of the blackboard can be of many and varied types, so string form is the only way to transmit this across a ros message) on the **~blackboard** topic. Typically you would call this from a tree custodian (e.g. :class:`py_trees_ros.trees.BehaviourTree`) after each and every tick. .. note:: Lazy: it will only do the relevant string processing if there are subscribers present. Args: unused_tree (:obj:`any`): if used as a post_tick_handler, needs the argument, but nonetheless, gets unused """ if self.publisher is None: self.node.get_logger().error("Blackboard Exchange: no publishers [hint: call setup() on the exchange]") return # publish blackboard # the blackboard watcher doesn't actually use the lazy publisher, but # if latching and string formatting in ROS gets up to speed, it will be a # more useful thing to have around - easy to detect and rostopic echo the contents if self.node.count_subscribers("~/blackboard") > 0: if self._is_changed(): msg = std_msgs.String() msg.data = "{0}".format(self.blackboard) self.publisher.publish(msg) # publish watchers if len(self.views) > 0: for view in self.views: if self.node.count_publishers(view.topic_name) > 0: if view._is_changed(): msg = std_msgs.String() msg.data = "{0}".format(view) view.publisher.publish(msg)
def _close_blackboard_watcher_service(self, request, response): response.result = self._close_watcher(request) return response def _get_blackboard_variables_service(self, unused_request, response): response.variables = self._get_nested_keys() return response def _open_blackboard_watcher_service(self, request, response): response.topic = rclpy.expand_topic_name.expand_topic_name( topic_name="~/blackboard/watcher_" + str(Exchange._counter), node_name=self.node.get_name(), node_namespace=self.node.get_namespace()) Exchange._counter += 1 view = BlackboardView(self.node, response.topic, request.variables) self.views.append(view) return response
############################################################################## # Blackboard Watcher ##############################################################################
[docs]class BlackboardWatcher(object): """ The blackboard watcher sits on the other side of the exchange and is a useful mechanism from which to pull all or part of the blackboard contents. This is extremely useful for logging, but also for introspecting in the runtime for various uses (e.g. reporting out on the current state to a fleet server). .. seealso:: :ref:`py-trees-blackboard-watcher` """ def __init__(self, callback=lambda *args, **kwargs: None, # a noop namespace_hint=None): """ Args: callback: any method that accepts a string as an input argument namespace_hint (:obj:`str`): (optionally) used to locate the blackboard if there exists more than one """ self.namespace_hint = namespace_hint self.service_names = { 'list': None, 'open': None, 'close': None } self.service_type_strings = { 'list': 'py_trees_ros_interfaces/GetBlackboardVariables', 'open': 'py_trees_ros_interfaces/OpenBlackboardWatcher', 'close': 'py_trees_ros_interfaces/CloseBlackboardWatcher' } self.service_types = { 'list': py_trees_srvs.GetBlackboardVariables, 'open': py_trees_srvs.OpenBlackboardWatcher, 'close': py_trees_srvs.CloseBlackboardWatcher } self.watcher_topic_name = None self.watcher_subscriber = None self.callback = callback
[docs] def setup(self, timeout_sec): """ Args: timeout_sec (:obj:`float`): time (s) to wait (use common.Duration.INFINITE to block indefinitely) Raises: :class:`~py_trees_ros.exceptions.NotFoundError`: if no services were found :class:`~py_trees_ros.exceptions.MultipleFoundError`: if multiple services were found """ self.node = rclpy.create_node( node_name=utilities.create_anonymous_node_name(node_name='watcher'), start_parameter_services=False ) # Note: this assumes that the services are not dynamically available (i.e. # go up and down frequently) scanned_time = 0.0 scanning_period = 0.1 for service_name in self.service_names.keys(): while True: try: self.service_names[service_name] = utilities.find_service( self.node, self.service_type_strings[service_name], self.namespace_hint ) break except exceptions.NotFoundError: time.sleep(scanning_period) scanned_time += scanning_period if scanned_time > timeout_sec: raise exceptions.NotFoundError("Timed out scanning for blackboard services") else: continue except exceptions.MultipleFoundError as e: raise e
[docs] def request_list_variables(self): """ Request of the blackboard a list of it's variables. Returns: :class:`~rclpy.task.Future` """ request, client = self._create_service_client('list') future = client.call_async(request) return future
[docs] def open_connection(self, variables, callback=None, executor=None): """ First calls the exchange to get details on the connect the watcher is required to open and then initiates that tunnel. Note that this does some spinning waiting for the service call, so if the executor must be shared with other nodes (e.g. for testing) make sure you pass in your own executor. Args: variables (:obj:`[str]`): list of variables to listen for callback (func, optional): method with a std_msgs/String argument, defaults to :meth:`blackboard_contents_callback()` in this class executor (:class:`~rclpy.executors.Executor`) used to spin the node looking for service callbacks Raises: :class:`~py_trees_ros.exceptions.NotReadyError`: if setup not executed or it hitherto failed :class:`~py_trees_ros.exceptions.ServiceError`: if the service failed to respond :class:`~py_trees_ros.exceptions.TimedOutError`: if the services could not be reached """ if callback is None: callback = self.blackboard_contents_callback request, client = self._create_service_client('open') # convenience, just in case someone wrote the list of variables for argparse like a python list instead # of a space separated list of variables, i.e. # py_trees-blackboard-watcher [count, dude] → ['[count,', 'dude]'] → ['count', 'dude'] request.variables = [variable.strip(',[]') for variable in variables] future = client.call_async(request) if executor is None: executor = rclpy.executors.SingleThreadedExecutor() executor.add_node(self.node) executor.spin_until_future_complete(future) response = future.result() if response is None: raise exceptions.ServiceError( "service call failed [{}]".format(future.exception()) ) self.watcher_topic_name = response.topic self.watcher_subscriber = self.node.create_subscription( msg_type=std_msgs.String, topic=self.watcher_topic_name, callback=callback )
[docs] def close_connection(self): """ Raises: :class:`~py_trees_ros.exceptions.NotReadyError`: if setup not executed or it hitherto failed :class:`~py_trees_ros.exceptions.TimedOutError`: if the services could not be reached """ if not self.watcher_topic_name: return # Nothing to do request, client = self._create_service_client('close') request.topic_name = self.watcher_topic_name future = client.call_async(request) rclpy.spin_until_future_complete(self.node, future) unused_response = future.result() self.node.destroy_subscription(self.watcher_subscriber) self.watcher_topic_name = None self.watcher_subscriber = None
def _create_service_client(self, key): if self.service_names[key] is None: raise exceptions.NotReadyError( "no known '{}' service known [did you call setup()?]".format(self.service_types[key]) ) client = self.node.create_client( srv_type=self.service_types[key], srv_name=self.service_names[key] ) # hardcoding timeouts will get us into trouble if not client.wait_for_service(timeout_sec=3.0): raise exceptions.TimedOutError( "timed out waiting for {}".format(self.service_names['close']) ) return (self.service_types[key].Request(), client) def blackboard_contents_callback(self, msg): self.callback(msg.data)
[docs] def shutdown(self): """ Perform any ros-specific shutdown. """ if self.node: self.node.destroy_node()