Source code for py_trees_ros.utilities

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# License: BSD
# Documentation

Assorted utility functions.

# Imports

import os
import pathlib

import py_trees_ros_interfaces.msg as py_trees_msgs  # noqa
import py_trees_ros_interfaces.srv as py_trees_srvs  # noqa
import rclpy
import rclpy.node
import rclpy.qos
import time
import typing

from . import exceptions

# Methods

[docs]def find_service(node: rclpy.node.Node, service_type: str, namespace: str=None, timeout: float=0.5): """ Discover a service of the specified type and if necessary, under the specified namespace. Args: node (:class:`~rclpy.node.Node`): nodes have the discovery methods service_type (:obj:`str`): primary lookup hint namespace (:obj:`str`): secondary lookup hint timeout: immediately post node creation, can take time to discover the graph (sec) Returns: :obj:`str`: fully expanded service name Raises: :class:`~py_trees_ros.exceptions.NotFoundError`: if no services were found :class:`~py_trees_ros.exceptions.MultipleFoundError`: if multiple services were found """ # TODO: follow the pattern of ros2cli to create a node without the need to init # rcl (might get rid of the magic sleep this way). See: # # loop_period = 0.1 # seconds clock = rclpy.clock.Clock() start_time = service_names = [] while - start_time < rclpy.time.Duration(seconds=timeout): # Returns a list of the form: [('exchange/blackboard', ['std_msgs/String']) service_names_and_types = node.get_service_names_and_types() service_names = [name for name, types in service_names_and_types if service_type in types] if namespace is not None: service_names = [name for name in service_names if namespace in name] if service_names: break time.sleep(loop_period) if not service_names: raise exceptions.NotFoundError("service not found [type: {}]".format(service_type)) elif len(service_names) == 1: return service_names[0] else: raise exceptions.MultipleFoundError("multiple services found [type: {}]".format(service_type))
[docs]def find_topics( node: rclpy.node.Node, topic_type: str, namespace: str=None, timeout: float=0.5) -> typing.List[str]: """ Discover a topic of the specified type and if necessary, under the specified namespace. Args: node: nodes have the discovery methods topic_type: primary lookup hint namespace: secondary lookup hint timeout: check every 0.1s until this timeout is reached (can be None -> checks once) .. note: Immediately post node creation, it can take some time to discover the graph. Returns: list of fully expanded topic names (can be empty) """ # TODO: follow the pattern of ros2cli to create a node without the need to init # rcl (might get rid of the magic sleep this way). See: # # loop_period = 0.1 # seconds clock = rclpy.clock.Clock() start_time = topic_names = [] while True: # Returns a list of the form: [('exchange/blackboard', ['std_msgs/String']) topic_names_and_types = node.get_topic_names_and_types() topic_names = [name for name, types in topic_names_and_types if topic_type in types] if namespace is not None: topic_names = [name for name in topic_names if namespace in name] if topic_names: break if timeout is None or ( - start_time) > rclpy.time.Duration(seconds=timeout): break else: time.sleep(loop_period) return topic_names
[docs]def basename(name): """ Generate the basename from a ros name. Args: name (:obj:`str`): ros name Returns: :obj:`str`: name stripped up until the last slash or tilde character. Examples: .. code-block:: python basename("~dude") # 'dude' basename("/gang/dude") # 'dude' """ return name.rsplit('/', 1)[-1].rsplit('~', 1)[-1]
[docs]def get_py_trees_home(): """ Find the default home directory used for logging, bagging and other esoterica. """ # TODO: update with replacement for rospkg.get_ros_home() when it arrives home = os.path.join(str(pathlib.Path.home()), ".ros2", "py_trees") return home
[docs]def qos_profile_latched(): """ Convenience retrieval for a latched topic (publisher / subscriber) """ return rclpy.qos.QoSProfile( history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST, depth=1, durability=rclpy.qos.QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL, reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE )
[docs]def qos_profile_unlatched(): """ Default profile for an unlatched topic (in py_trees_ros). """ return rclpy.qos.QoSProfile( history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST, depth=1, durability=rclpy.qos.QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE, reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE )
[docs]def resolve_name(node, name): """ Convenience function for getting the resolved name (similar to 'publisher.resolved_name' in ROS1). Args: node (:class:`rclpy.node.Node`): the node, namespace it *should* be relevant to name (obj:`str`): topic or service name .. note:: This entirely depends on the user providing the relevant node, name pair. """ return rclpy.expand_topic_name.expand_topic_name( name, node.get_name(), node.get_namespace() )
[docs]def create_anonymous_node_name(node_name="node") -> str: """ Creates an anonoymous node name by adding a suffix created from a monotonic timestamp, sans the decimal. Returns: :obj:`str`: the unique, anonymous node name """ return node_name + "_" + str(time.monotonic()).replace('.', '')
############################################################################## # Convenience Classes ##############################################################################
[docs]class Publishers(object): """ Utility class that groups the publishers together in one convenient structure. Args: publisher_details (obj:`tuple`): list of (str, str, msgType, bool, int) tuples representing (unique_name, topic_name, publisher_type, latched) specifications for creating publishers Examples: Convert the incoming list of specifications into proper variables of this class. .. code-block:: python publishers = py_trees.utilities.Publishers( [ ('foo', '~/foo', std_msgs.String, True, 5), ('bar', '/foo/bar', std_msgs.String, False, 5), ('foobar', '/foo/bar', std_msgs.String, False, 5), ] ) """ def __init__(self, node, publisher_details, introspection_topic_name="publishers"): # TODO: check for the correct setting of publisher_details self.publisher_details_msg = [] for (name, topic_name, publisher_type, latched) in publisher_details: if latched: self.__dict__[name] = node.create_publisher( msg_type=publisher_type, topic=topic_name, qos_profile=qos_profile_latched() ) else: self.__dict__[name] = node.create_publisher( msg_type=publisher_type, topic=topic_name, qos_profile=qos_profile_unlatched() ) resolved_name = resolve_name(node, topic_name) message_type = publisher_type.__class__.__module__.split('.')[0] + "/" + publisher_type.__class__.__name__ self.publisher_details_msg.append( py_trees_msgs.PublisherDetails( topic_name=resolved_name, message_type=message_type, latched=latched ) ) self.introspection_service = node.create_service( py_trees_srvs.IntrospectPublishers, "~/introspection/" + introspection_topic_name, self.introspection_callback ) def introspection_callback(self, unused_request, response): response.publisher_details = self.publisher_details_msg return response
[docs]class Subscribers(object): """ Utility class that groups subscribers together in one convenient structure. Args: subscriber_details (obj:`tuple`): list of (str, str, msgType, bool, func) tuples representing (unique_name, topic_name, subscriber_type, latched, callback) specifications for creating subscribers Examples: Convert the incoming list of specifications into proper variables of this class. .. code-block:: python subscribers = py_trees.utilities.Subscribers( [ ('foo', '~/foo', std_msgs.String, True, foo), ('bar', '/foo/bar', std_msgs.String, False,, ('foobar', '/foo/bar', std_msgs.String, False,, ] ) """ def __init__(self, node, subscriber_details, introspection_topic_name="subscribers"): # TODO: check for the correct setting of subscriber_details self.subscriber_details_msg = [] for (name, topic_name, subscriber_type, latched, callback) in subscriber_details: if latched: self.__dict__[name] = node.create_subscription( msg_type=subscriber_type, topic=topic_name, callback=callback, qos_profile=qos_profile_latched() ) else: self.__dict__[name] = node.create_subscription( msg_type=subscriber_type, topic=topic_name, callback=callback, qos_profile=qos_profile_unlatched() ) resolved_name = resolve_name(node, topic_name) message_type = subscriber_type.__class__.__module__.split('.')[0] + "/" + subscriber_type.__class__.__name__ self.subscriber_details_msg.append( py_trees_msgs.SubscriberDetails( topic_name=resolved_name, message_type=message_type, latched=latched ) ) self.introspection_service = node.create_service( py_trees_srvs.IntrospectSubscribers, "~/introspection/" + introspection_topic_name, self.introspection_callback ) def introspection_callback(self, unused_request, response): response.subscriber_details = self.subscriber_details_msg return response
[docs]class Services(object): """ Utility class that groups services together in one convenient structure. Args: service_details (obj:`tuple`): list of (str, str, srvType, func) tuples representing (unique_name, topic_name, service_type, callback) specifications for creating services Examples: Convert the incoming list of specifications into proper variables of this class. .. code-block:: python services = py_trees.utilities.Services( [ ('open_foo', '~/get_foo', foo_interfaces.srv.OpenFoo, open_foo_callback), ('open_foo', '/foo/open', foo_interfaces.srv.OpenFoo, self.open_foo_callback), ('get_foo_bar', '/foo/bar', foo_interfaces.srv.GetBar,, ] ) """ def __init__(self, node, service_details, introspection_topic_name="services"): # TODO: check for the correct setting of subscriber_details self.service_details_msg = [] for (name, service_name, service_type, callback) in service_details: self.__dict__[name] = node.create_service( srv_type=service_type, srv_name=service_name, callback=callback, qos_profile=rclpy.qos.qos_profile_services_default ) resolved_name = resolve_name(node, service_name) service_type = service_type.__class__.__module__.split('.')[0] + "/" + service_type.__class__.__name__ self.service_details_msg.append( py_trees_msgs.ServiceDetails( service_name=resolved_name, service_type=service_type, ) ) self.introspection_service = node.create_service( py_trees_srvs.IntrospectServices, "~/introspection/" + introspection_topic_name, self.introspection_callback ) def introspection_callback(self, unused_request, response): response.subscriber_details = self.subscriber_details_msg return response