Source code for py_trees_ros.programs.blackboard_watcher

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
#   https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
.. argparse::
   :module: py_trees_ros.programs.blackboard_watcher
   :func: command_line_argument_parser
   :prog: py-trees-blackboard-watcher

Example interaction with the services of an :class:`py_trees_ros.blackboard.Exchange`:

.. image:: images/blackboard-watcher.gif
"""

##############################################################################
# Imports
##############################################################################

import argparse
import functools
import sys

import rclpy
import std_msgs.msg as std_msgs

import py_trees.console as console
import py_trees_ros

##############################################################################
# Classes
##############################################################################


def description(formatted_for_sphinx):
    short = "Open up a window onto the blackboard!\n"
    long = ("\nIntrospect on the entire blackboard or a part thereof and receive a stream of\n"
            "updates whenever values change.\n"
            )
    examples = {
        "--list": "list all keys on the blackboard",
        "": "stream all variables",
        "--visited --activity": "stream only visited variables and access details",
        "odometry": "stream a single variable",
        "odometry.pose.pose.position": "stream only a single field within a variable"
    }
    script_name = "py-trees-blackboard-watcher"

    if formatted_for_sphinx:
        # for sphinx documentation (doesn't like raw text)
        s = short
        s += long
        s += "\n"
        s += "**Examples:**\n\n"
        s += ".. code-block:: bash\n"
        s += "    \n"
        for command, comment in examples.items():
            s += "    # {}\n".format(comment)
            s += "    $ " + script_name + " {}\n".format(command)
        s += "\n"
    else:
        banner_line = console.green + "*" * 79 + "\n" + console.reset
        s = "\n"
        s += banner_line
        s += console.bold_white + "Blackboard Watcher".center(79) + "\n" + console.reset
        s += banner_line
        s += "\n"
        s += short
        s += long
        s += "\n"
        s += console.bold + "Examples" + console.reset + "\n\n"
        for command, comment in examples.items():
            s += "    # {}\n".format(comment)
            s += "    $ " + console.cyan + script_name + console.yellow + " {}\n".format(command) + console.reset
        s += "\n\n"
        s += banner_line
    return s


def epilog(formatted_for_sphinx):
    if formatted_for_sphinx:
        return None
    else:
        return console.cyan + "And his noodly appendage reached forth to tickle the blessed...\n" + console.reset


def command_line_argument_parser(formatted_for_sphinx=True):
    # formatted_for_sphinx is an ugly hack to make sure sphinx does not pick up the colour codes.
    # works only by assuming that the only callee who calls it without setting the arg is sphinx's argparse
    parser = argparse.ArgumentParser(description=description(formatted_for_sphinx),
                                     epilog=epilog(formatted_for_sphinx),
                                     formatter_class=argparse.RawDescriptionHelpFormatter,
                                     )
    parser.add_argument('-l', '--list', action='store_true', default=None, help='list the blackboard variable names')
    parser.add_argument('-a', '--activity', action='store_true', help='include the logged activity stream for recent changes')
    parser.add_argument('-v', '--visited', action='store_true', help="filter selected keys from those associated with behaviours on the most recent tick's visited path")
    parser.add_argument('-n', '--namespace', nargs='?', default=None, help='namespace of blackboard services (if there should be more than one blackboard)')
    parser.add_argument('variables', nargs=argparse.REMAINDER, default=list(), help='space separated list of blackboard variable names (may be nested) to watch')
    return parser


def pretty_print_variables(variables):
    s = "\n"
    s += console.bold + console.cyan + "Blackboard Variables:" + console.reset + console.yellow + "\n"
    for variable in variables:
        variable = variable.split('.')
        if len(variable) > 1:
            sep = "."
        else:
            sep = ""
        s += "    " * len(variable) + sep + variable[-1] + "\n"
    s += console.reset
    print("{}".format(s))


##############################################################################
# Main
##############################################################################


[docs]def main(command_line_args=sys.argv[1:]): """ Entry point for the blackboard watcher script. """ # command_line_args = rclpy.utilities.remove_ros_args(command_line_args)[1:] parser = command_line_argument_parser(formatted_for_sphinx=False) args = parser.parse_args(command_line_args) rclpy.init(args=None) blackboard_watcher = py_trees_ros.blackboard.BlackboardWatcher( namespace_hint=args.namespace ) subscription = None #################### # Setup #################### try: blackboard_watcher.setup(timeout_sec=2.0) # setup discovery fails except py_trees_ros.exceptions.NotFoundError as e: print(console.red + "\nERROR: {}\n".format(str(e)) + console.reset) sys.exit(1) # setup discovery finds duplicates except py_trees_ros.exceptions.MultipleFoundError as e: print(console.red + "\nERROR: {}\n".format(str(e)) + console.reset) if args.namespace is None: print(console.red + "\nERROR: select one with the --namespace argument\n" + console.reset) else: print(console.red + "\nERROR: but none matching the requested '{}'\n".format(args.namespace) + console.reset) sys.exit(1) #################### # Execute #################### result = 0 try: if args.list: request, client = blackboard_watcher.create_service_client('list') future = client.call_async(request) rclpy.spin_until_future_complete(blackboard_watcher.node, future) if future.result() is None: raise py_trees_ros.exceptions.ServiceError( "service call failed [{}]".format(future.exception()) ) pretty_print_variables(future.result().variables) else: # request connection request, client = blackboard_watcher.create_service_client('open') request.variables = [variable.strip(',[]') for variable in args.variables] request.filter_on_visited_path = args.visited request.with_activity_stream = args.activity future = client.call_async(request) rclpy.spin_until_future_complete(blackboard_watcher.node, future) response = future.result() blackboard_watcher.node.destroy_client(client) # connect watcher_topic_name = response.topic blackboard_watcher.node.get_logger().info( "creating subscription [{}]".format(watcher_topic_name) ) subscription = blackboard_watcher.node.create_subscription( msg_type=std_msgs.String, topic=watcher_topic_name, callback=blackboard_watcher.echo_blackboard_contents, qos_profile=py_trees_ros.utilities.qos_profile_unlatched() ) # stream try: rclpy.spin(blackboard_watcher.node) except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): pass finally: # no pre-shutdown hooks from fumble # https://github.com/ros2/rclpy/issues/1077 # instead, letting the blackboard clean up it's own blackboard views # when the subscriber count goes to zero # https://github.com/splintered-reality/py_trees_ros/issues/185 pass # close connection # request, client = blackboard_watcher.create_service_client('close') # request.topic_name = watcher_topic_name # future = client.call_async(request) # rclpy.spin_until_future_complete(blackboard_watcher.node, future) # if future.result() is None: # raise py_trees_ros.exceptions.ServiceError( # "service call to close connection failed [{}]".format(future.exception()) # ) # connection problems except (py_trees_ros.exceptions.NotReadyError, py_trees_ros.exceptions.ServiceError, py_trees_ros.exceptions.TimedOutError) as e: print(console.red + "\nERROR: {}".format(str(e)) + console.reset) result = 1 finally: if subscription is not None: blackboard_watcher.node.destroy_subscription(subscription) blackboard_watcher.shutdown() rclpy.try_shutdown() sys.exit(result)