Source code for py_trees_ros.transforms

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
#   https://raw.githubusercontent.com/splintered-reality/py_trees_ros/devel/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
Various behaviours that enable common interactions with ROS transforms.
"""

##############################################################################
# Imports
##############################################################################

import py_trees
import rclpy.qos
import tf2_ros

import py_trees.console as console

import geometry_msgs.msg as geometry_msgs

##############################################################################
# Behaviours
##############################################################################


[docs]class FromBlackboard(py_trees.behaviour.Behaviour): """ Broadcast a transform from the blackboard using the transform broadcaster mechanisms. If no it fails to find a geometry_msgs.Transform object on the blackboard, or the blackboard variable is None, this behaviour will update with status :attr:`~py_trees.common.Status.FAILURE`. Args: variable_name: name of the transform variable on the blackboard target_frame: name of the frame to transform into source_frame: name of the input frame static: designate whether it is a static transform or otherwise qos_profile: qos profile for the non-static broadcaster static_qos_profile: qos profile for the static broadcaster (default: use tf2_ros' defaults) name: name of the behaviour """ def __init__( self, name: str, variable_name: str, target_frame: str, source_frame: str, static: bool, qos_profile: rclpy.qos.QoSProfile, static_qos_profile: rclpy.qos.QoSProfile=None, ): super().__init__(name=name) self.blackboard = self.attach_blackboard_client(name) self.variable_name = variable_name self.target_frame = target_frame self.source_frame = source_frame self.static = static self.qos_profile = qos_profile self.static_qos_profile = static_qos_profile self.blackboard.register_key( key=self.variable_name, access=py_trees.common.Access.READ )
[docs] def setup(self, **kwargs): """ Initialises the transform broadcaster. Args: **kwargs (:obj:`dict`): distribute arguments to this behaviour and in turn, all of it's children Raises: KeyError: if a ros2 node isn't passed under the key 'node' in kwargs """ try: self.node = kwargs['node'] except KeyError as e: error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.name, self.__class__.__name__) raise KeyError(error_message) from e # 'direct cause' traceability if self.static: self.broadcaster = tf2_ros.StaticTransformBroadcaster( node=self.node, qos=self.static_qos_profile ) else: self.broadcaster = tf2_ros.TransformBroadcaster( node=self.node, qos=self.qos_profile )
[docs] def update(self): """ Retrieves the transform from the blackboard, stamps it and subsequently broadcasts it. Raises: TypeError: if the blackboard variable is of the wrong type. """ try: transform = self.blackboard.get(self.variable_name) except KeyError: self.feedback_message = "no transform to broadcast" return py_trees.common.Status.FAILURE if transform is None: self.feedback_message = "no transform to broadcast" return py_trees.common.Status.FAILURE if type(transform) != geometry_msgs.Transform: raise TypeError("'{}' is not of type geometry_msgs/Transform".format(self.variable_name)) self.feedback_message = "transform sent" transform_stamped = geometry_msgs.TransformStamped() transform_stamped.header.stamp = rclpy.clock.Clock().now().to_msg() transform_stamped.header.frame_id = self.source_frame transform_stamped.child_frame_id = self.target_frame transform_stamped.transform = transform # print(console.green + "Send: {}".format(transform_stamped) + console.reset) self.broadcaster.sendTransform(transform_stamped) return py_trees.common.Status.SUCCESS
[docs]class ToBlackboard(py_trees.behaviour.Behaviour): """ Blocking behaviour that looks for a transform and writes it to a variable on the blackboard. If it fails to find a transform immediately, it will update with status :attr:`~py_trees.common.Status.RUNNING` and write 'None' to the blackboard. .. tip:: To ensure consistent decision making, use this behaviour up-front in your tree's tick to record a transform that can be locked in for the remainder of the tree tick. **Usage Patterns** * clearing_policy == :attr:`~py_trees.common.ClearingPolicy.ON_INTIALISE` Use if you have subsequent behaviours that need to make decisions on whether the transform was received or not. * clearing_policy == :attr:`~py_trees.common.ClearingPolicy.NEVER` Never clear the result. Useful for static transforms or if you are doing your own lookup on the timestamps for any relevant decision making. Args: variable_name: name of the key to write to on the blackboard target_frame: name of the frame to transform into source_frame: name of the input frame qos_profile: qos profile for the non-static subscriber static_qos_profile: qos profile for the static subscriber (default: use tf2_ros' defaults) name: name of the behaviour Raises: TypeError: if the clearing policy is neither :attr:`~py_trees.common.ClearingPolicy.ON_INITIALISE` or :attr:`~py_trees.common.ClearingPolicy.NEVER` """ def __init__( self, name: str, variable_name, target_frame: str, source_frame: str, qos_profile: rclpy.qos.QoSProfile, static_qos_profile: rclpy.qos.QoSProfile=None, clearing_policy: py_trees.common.ClearingPolicy=py_trees.common.ClearingPolicy.ON_INITIALISE, ): super().__init__(name=name) self.variable_name = variable_name self.blackboard = self.attach_blackboard_client(name) self.blackboard.register_key( key=self.variable_name, access=py_trees.common.Access.WRITE ) self.target_frame = target_frame self.source_frame = source_frame self.qos_profile = qos_profile self.static_qos_profile = static_qos_profile self.clearing_policy = clearing_policy if self.clearing_policy == py_trees.common.ClearingPolicy.ON_SUCCESS: raise TypeError("ON_SUCCESS is not a valid policy for transforms.ToBlackboard") self.buffer = tf2_ros.Buffer() # initialise the blackboard self.blackboard.set(self.variable_name, None)
[docs] def setup(self, **kwargs): """ Initialises the transform listener. Args: **kwargs (:obj:`dict`): distribute arguments to this behaviour and in turn, all of it's children Raises: KeyError: if a ros2 node isn't passed under the key 'node' in kwargs """ try: self.node = kwargs['node'] except KeyError as e: error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.name, self.__class__.__name__) raise KeyError(error_message) from e # 'direct cause' traceability self.listener = tf2_ros.TransformListener( buffer=self.buffer, node=self.node, # spin_thread=False, qos=self.qos_profile, static_qos=self.static_qos_profile )
[docs] def initialise(self): """ Clear the blackboard variable (set to 'None') if using the :attr:`~py_trees.common.ClearingPolicy.ON_INTIALISE` policy. """ if self.clearing_policy == py_trees.common.ClearingPolicy.ON_INITIALISE: self.blackboard.set(self.variable_name, None)
[docs] def update(self): """ Checks for the latest transform and posts it to the blackboard if available. """ class get_latest(object): def __init__(self): self.nanoseconds = 0 if self.buffer.can_transform( target_frame=self.target_frame, source_frame=self.source_frame, time=get_latest(), # timeout=rclpy.duration.Duration(seconds=5) # don't block ): stamped_transform = self.buffer.lookup_transform( target_frame=self.target_frame, source_frame=self.source_frame, time=get_latest(), # timeout=rclpy.duration.Duration(seconds=5) # don't block ) self.blackboard.set(self.variable_name, stamped_transform) self.feedback_message = "transform saved to {}".format(self.variable_name) return py_trees.common.Status.SUCCESS else: self.feedback_message = "waiting for transform".format( self.target_frame, self.source_frame ) return py_trees.common.Status.RUNNING