Source code for py_trees_ros.blackboard

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# License: BSD
# Documentation

This module provides the :class:`py_trees_ros.blackboard.Exchange` class,
a ROS wrapper around a
:ref:`Blackboard <py_trees:blackboards-section>` that permits
introspection of either the entire board or a window onto a smaller part
of the board over a ROS API via the :ref:`py-trees-blackboard-watcher`
command line utility.

# Imports

import copy
import operator
import pickle
import py_trees
import py_trees.console as console
import py_trees_ros_interfaces.srv as py_trees_srvs
import rclpy.expand_topic_name
import rclpy.node
import std_msgs.msg as std_msgs

from typing import Any

from . import exceptions
from . import utilities

# SubView of a Blackboard

[docs]class BlackboardView(object): """ Utility class that enables tracking and publishing of relevant parts of the blackboard for a user. This is used by the :class:`~py_trees_ros.blackboard.Exchange` operator. Args: node: an rclpy node for communications handling topic_name: name of the topic for the publisher attrs (:obj:`???`): """ def __init__( self, node: rclpy.node.Node, topic_name: str, attrs ): self.blackboard = py_trees.blackboard.Blackboard() self.topic_name = topic_name self.attrs = attrs self.dict = {} self.cached_dict = {} self.node = node self.publisher = self.node.create_publisher(std_msgs.String, topic_name)
[docs] def shutdown(self): """ Shutdown the temporarily created publisher. """ self.node.destroy_publisher(self.publisher)
def _update_sub_blackboard(self): if not self.attrs: self.dict = copy.copy(self.blackboard.__dict__) else: for attr in self.attrs: if '/' in attr: check_attr = operator.attrgetter(".".join(attr.split('/'))) else: check_attr = operator.attrgetter(attr) try: value = check_attr(self.blackboard) self.dict[attr] = value except AttributeError: pass
[docs] def is_changed(self): """ Checks the current blackboard against a cached version. This then caches the current blackboard. .. warning:: Since this caches the current blackboard, it can't be used multiple times in succession. Returns: :class:`bool` """ self._update_sub_blackboard() current_pickle = pickle.dumps(self.dict, -1) blackboard_changed = current_pickle != self.cached_dict self.cached_dict = current_pickle return blackboard_changed
def __str__(self): """ A formatted string representation of the view, taking care to trim the blackboard contents according to the view that was requested. Returns: :class:`str` """ s = "" max_length = 0 for k in self.dict.keys(): max_length = len(k) if len(k) > max_length else max_length keys = sorted(self.dict) for key in keys: value = self.dict[key] if value is None: value_string = "-" s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ": " + console.yellow + "%s" % (value_string) + console.reset + "\n" else: lines = ("%s" % value).split('\n') if len(lines) > 1: s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ":\n" for line in lines: s += console.yellow + " %s" % line + console.reset + "\n" else: s += console.cyan + " " + '{0: <{1}}'.format(key, max_length + 1) + console.reset + ": " + console.yellow + "%s" % (value) + console.reset + "\n" return s.rstrip() # get rid of the trailing newline...print will take care of adding a new line
############################################################################## # Exchange ##############################################################################
[docs]class Exchange(object): """ Establishes ros communications around a :class:`~py_trees.blackboard.Blackboard` that enable users to introspect or watch relevant parts of the blackboard. ROS Publishers: * **~/blackboard** (:class:`std_msgs.msg.String`) * streams (string form) the contents of the entire blackboard as it updates ROS Services: * **~/get_blackboard_variables** (:class:`py_trees_msgs.srv.GetBlackboardVariables`) * list all the blackboard variable names (not values) * **~/open_blackboard_watcher** (:class:`py_trees_msgs.srv.OpenBlackboardWatcher`) * request a publisher to stream a part of the blackboard contents * **~/close_blackboard_watcher** (:class:`py_trees_msgs.srv.CloseBlackboardWatcher`) * close a previously opened watcher Watching could be more simply enabled by just providing a *get* style service on a particular variable(s), however that would introduce an asynchronous interrupt on the library's usage and subsequently, locking. Instead, a watcher service requests for a stream, i.e. a publisher to be created for private use that streams only a subsection of the blackboard to the user. Additional services are provided for closing the stream and listing the variables on the blackboard. .. seealso:: :class:`~py_trees_ros.trees.BehaviourTree` (in which it is used) and :ref:`py-trees-blackboard-watcher` (working with the watchers). """ _counter = 0 """Incremental counter guaranteeing unique watcher names""" def __init__(self): self.node = None self.blackboard = py_trees.blackboard.Blackboard() self.cached_blackboard_dict = {} self.views = [] self.publisher = None = {}
[docs] def setup(self, node: rclpy.node.Node): """ This is where the ros initialisation of publishers and services happens. It is kept outside of the constructor for the same reasons that the familiar py_trees :meth:`~py_trees.trees.BehaviourTree.setup` method has - to enable construction of behaviours and trees offline (away from their execution environment) so that dot graphs and other visualisations of the tree can be created. Args: node (:class:`~rclpy.node.Node`): node to hook ros communications on Examples: It is expected that users will use this in their own customised tree custodian: .. code-block:: python class MyTreeManager(py_trees.trees.BehaviourTree): def __init__(self): pass def setup(self, timeout): super(MyTreeManager, self).setup(timeout) = py_trees_ros.blackboard.Exchange() .. seealso:: This class is used as illustrated above in :class:`~py_trees_ros.trees.BehaviourTree`. """ self.node = node self.publisher = self.node.create_publisher( msg_type=std_msgs.String, topic='~/exchange/blackboard', qos_profile=rclpy.qos.qos_profile_system_default ) for name in ["get_blackboard_variables", "open_blackboard_watcher", "close_blackboard_watcher"]: camel_case_name = ''.join(x.capitalize() for x in name.split('_'))[name] = self.node.create_service( srv_type=getattr(py_trees_srvs, camel_case_name), srv_name='~/exchange/' + name, callback=getattr(self, "_{}_service".format(name)), qos_profile=rclpy.qos.qos_profile_services_default )
def _get_nested_keys(self): variables = [] def inner(v, k): for attr in dir(type(v)): if not isinstance(v, (bool, list, str, int, float)): if not attr.startswith("_"): value = getattr(v, attr) if not callable(value): if not attr.isupper(): variables.append(k + "/" + attr) inner(value, k + "/" + attr) for key in sorted(self.blackboard.__dict__): variables.append(key) inner(self.blackboard.__dict__[key], key) return variables def _is_changed(self): current_pickle = pickle.dumps(self.blackboard.__dict__, -1) blackboard_changed = current_pickle != self.cached_blackboard_dict self.cached_blackboard_dict = current_pickle return blackboard_changed
[docs] def publish_blackboard(self, unused_tree=None): """ Lazy string publishing of the blackboard (the contents of the blackboard can be of many and varied types, so string form is the only way to transmit this across a ros message) on the **~blackboard** topic. Typically you would call this from a tree custodian (e.g. :class:`py_trees_ros.trees.BehaviourTree`) after each and every tick. .. note:: Lazy: it will only do the relevant string processing if there are subscribers present. Args: unused_tree (:obj:`any`): if used as a post_tick_handler, needs the argument, but nonetheless, gets unused """ if self.publisher is None: self.node.get_logger().error("Blackboard Exchange: no publishers [hint: call setup() on the exchange]") return # publish blackboard # the blackboard watcher doesn't actually use the lazy publisher, but # if latching and string formatting in ROS gets up to speed, it will be a # more useful thing to have around - easy to detect and rostopic echo the contents if self.node.count_subscribers(self.publisher.topic) > 0: if self._is_changed(): msg = std_msgs.String() = "{0}".format(self.blackboard) self.publisher.publish(msg) # publish watchers # print("DJS: publish_blackboard()") # print(" DJS: # views = %s" % len(self.views)) if len(self.views) > 0: for view in self.views: # print(" DJS: view publisher topic: %s" % (view.publisher.topic)) # print(" DJS: # view publishers = %s" % self.node.count_publishers(view.topic_name)) # print(" DJS: # view subscribers = %s" % self.node.count_subscribers(view.topic_name)) if self.node.count_subscribers(view.topic_name) > 0: if view.is_changed(): # print(" DJS: # printing on: %s" % view.topic_name) msg = std_msgs.String() = "{0}".format(view) view.publisher.publish(msg)
def _close_blackboard_watcher_service(self, request, response): response.result = False for view in self.views: # print(" DJS: close watcher? [%s][%s]" % (view.topic_name, request.topic_name)) if view.topic_name == request.topic_name: view.shutdown() # that node.destroy_publisher call makes havoc response.result = True break self.views[:] = [view for view in self.views if view.topic_name != request.topic_name] # print("DJS: close result: %s" % response.result) return response def _get_blackboard_variables_service(self, unused_request, response): response.variables = self._get_nested_keys() return response def _open_blackboard_watcher_service(self, request, response): response.topic = rclpy.expand_topic_name.expand_topic_name( topic_name="~/exchange/_watcher_" + str(Exchange._counter), node_name=self.node.get_name(), node_namespace=self.node.get_namespace()) Exchange._counter += 1 view = BlackboardView(self.node, response.topic, request.variables) self.views.append(view) return response
############################################################################## # Blackboard Watcher ##############################################################################
[docs]class BlackboardWatcher(object): """ The blackboard watcher sits on the other side of the exchange and is a useful mechanism from which to pull all or part of the blackboard contents. This is useful for live introspection, or logging of the blackboard contents. .. seealso:: :ref:`py-trees-blackboard-watcher` """ def __init__( self, namespace_hint: str=None ): """ Args: namespace_hint: (optionally) used to locate the blackboard if there exists more than one """ self.namespace_hint = namespace_hint self.service_names = { 'list': None, 'open': None, 'close': None } self.service_type_strings = { 'list': 'py_trees_ros_interfaces/srv/GetBlackboardVariables', 'open': 'py_trees_ros_interfaces/srv/OpenBlackboardWatcher', 'close': 'py_trees_ros_interfaces/srv/CloseBlackboardWatcher' } self.service_types = { 'list': py_trees_srvs.GetBlackboardVariables, 'open': py_trees_srvs.OpenBlackboardWatcher, 'close': py_trees_srvs.CloseBlackboardWatcher }
[docs] def setup(self, timeout_sec: float): """ Creates the node and checks that all of the server-side services are available for calling on to open a connection. Args: timeout_sec: time (s) to wait (use common.Duration.INFINITE to block indefinitely) Raises: :class:`~py_trees_ros.exceptions.NotFoundError`: if no services were found :class:`~py_trees_ros.exceptions.MultipleFoundError`: if multiple services were found """ self.node = rclpy.create_node( node_name=utilities.create_anonymous_node_name(node_name='watcher'), start_parameter_services=False ) for service_name in self.service_names.keys(): # can raise NotFoundError and MultipleFoundError self.service_names[service_name] = utilities.find_service( node=self.node, service_type=self.service_type_strings[service_name], namespace=self.namespace_hint, timeout=timeout_sec )
[docs] def create_service_client(self, key: str): """ Convenience api for opening a service client and waiting for the service to appear. Args: key: one of 'open', 'close'. Raises: :class:`~py_trees_ros.exceptions.NotReadyError`: if setup() wasn't called to identify the relevant services to connect to. :class:`~py_trees_ros.exceptions.TimedOutError`: if it times out waiting for the server """ if self.service_names[key] is None: raise exceptions.NotReadyError( "no known '{}' service known [did you call setup()?]".format(self.service_types[key]) ) client = self.node.create_client( srv_type=self.service_types[key], srv_name=self.service_names[key], qos_profile=rclpy.qos.qos_profile_services_default ) # hardcoding timeouts will get us into trouble if not client.wait_for_service(timeout_sec=3.0): raise exceptions.TimedOutError( "timed out waiting for {}".format(self.service_names['close']) ) return (self.service_types[key].Request(), client)
[docs] def echo_blackboard_contents(self, msg: std_msgs.String): """ Very simple formatter of incoming messages. Args: msg (:class:`std_msgs.String`): incoming blackboard message as a string. """ # print("DJS: echo_blackboard_contents") print("{}".format(
[docs] def shutdown(self): """ Perform any ros-specific shutdown. """ if self.node: self.node.destroy_node()