Source code for py_trees_ros.action_clients

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
#   https://raw.github.com/splintered-reality/py_trees_ros/license/LICENSE
#
##############################################################################
# Documentation
##############################################################################
from asyncio.tasks import wait_for

"""
Behaviours for ROS actions.
"""

##############################################################################
# Imports
##############################################################################

import typing
import uuid

import action_msgs.msg as action_msgs  # GoalStatus
import py_trees
import rclpy.action

from . import exceptions

##############################################################################
# Behaviours
##############################################################################


[docs]class FromBlackboard(py_trees.behaviour.Behaviour): """ An action client interface that draws goals from the blackboard. The lifecycle of this behaviour works as follows: * :meth:`initialise`: check blackboard for a goal and send * :meth:`update`: if a goal was sent, monitor progress * :meth:`terminate`: if interrupted while running, send a cancel request As a consequence, the status of this behaviour can be interpreted as follows: * :data:`~py_trees.common.Status.FAILURE`: no goal was found to send, it was rejected or it failed while executing * :data:`~py_trees.common.Status.RUNNING`: a goal was sent and is still executing on the server * :data:`~py_trees.common.Status.SUCCESS`: sent goal has completed with success To block on the arrival of a goal on the blackboard, use with the :class:`py_trees.behaviours.WaitForBlackboardVariable` behaviour. e.g. .. code-block:: python sequence = py_trees.composites.Sequence(name="Sequence", memory=True) wait_for_goal = py_trees.behaviours.WaitForBlackboardVariable( name="WaitForGoal", variable_name="/my_goal" ) action_client = py_trees_ros.aciton_clients.FromBlackboard( action_type=py_trees_actions.Dock, action_name="dock", name="ActionClient" ) sequence.add_children([wait_for_goal, action_client]) To customise a more interesting feedback message, pass in a method to the constructor, for example: .. code-block:: python action_client = py_trees_ros.action_clients.FromBlackboard( action_type=py_trees_actions.Dock, action_name="dock", name="ActionClient", generate_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed) ) Args: name: name of the behaviour action_type: spec type for the action (e.g. move_base_msgs.action.MoveBase) action_name: where you can find the action topics & services (e.g. "bob/move_base") key: name of the key on the blackboard generate_feedback_message: formatter for feedback messages, takes action_type.Feedback messages and returns strings (default: None) wait_for_server_timeout_sec: use negative values for a blocking but periodic check (default: -3.0) .. note:: The default setting for timeouts (a negative value) will suit most use cases. With this setting the behaviour will periodically check and issue a warning if the server can't be found. Actually aborting the setup can usually be left up to the behaviour tree manager. """ def __init__(self, name: str, action_type: typing.Any, action_name: str, key: str, generate_feedback_message: typing.Callable[[typing.Any], str]=None, wait_for_server_timeout_sec: float=-3.0 ): super().__init__(name) self.action_type = action_type self.action_name = action_name self.wait_for_server_timeout_sec = wait_for_server_timeout_sec self.blackboard = self.attach_blackboard_client(name=self.name) self.blackboard.register_key( key="goal", access=py_trees.common.Access.READ, # make sure to namespace it if not already remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key) ) self.generate_feedback_message = generate_feedback_message self.node = None self.action_client = None self.status_strings = { action_msgs.GoalStatus.STATUS_UNKNOWN : "STATUS_UNKNOWN", # noqa action_msgs.GoalStatus.STATUS_ACCEPTED : "STATUS_ACCEPTED", # noqa action_msgs.GoalStatus.STATUS_EXECUTING: "STATUS_EXECUTING", # noqa action_msgs.GoalStatus.STATUS_CANCELING: "STATUS_CANCELING", # noqa action_msgs.GoalStatus.STATUS_SUCCEEDED: "STATUS_SUCCEEDED", # noqa action_msgs.GoalStatus.STATUS_CANCELED : "STATUS_CANCELED", # noqa action_msgs.GoalStatus.STATUS_ABORTED : "STATUS_ABORTED" # noqa }
[docs] def setup(self, **kwargs): """ Setup the action client services and subscribers. Args: **kwargs (:obj:`dict`): distribute arguments to this behaviour and in turn, all of it's children Raises: :class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs :class:`~py_trees_ros.exceptions.TimedOutError`: if the action server could not be found """ self.logger.debug("{}.setup()".format(self.qualified_name)) try: self.node = kwargs['node'] except KeyError as e: error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.qualified_name) raise KeyError(error_message) from e # 'direct cause' traceability self.action_client = rclpy.action.ActionClient( node=self.node, action_type=self.action_type, action_name=self.action_name ) result = None if self.wait_for_server_timeout_sec > 0.0: result = self.action_client.wait_for_server(timeout_sec=self.wait_for_server_timeout_sec) else: iterations = 0 period_sec = -1.0*self.wait_for_server_timeout_sec while not result: iterations += 1 result = self.action_client.wait_for_server(timeout_sec=period_sec) if not result: self.node.get_logger().warning( "waiting for action server ... [{}s][{}][{}]".format( iterations * period_sec, self.action_name, self.qualified_name ) ) if not result: self.feedback_message = "timed out waiting for the server [{}]".format(self.action_name) self.node.get_logger().error("{}[{}]".format(self.feedback_message, self.qualified_name)) raise exceptions.TimedOutError(self.feedback_message) else: self.feedback_message = "... connected to action server [{}]".format(self.action_name) self.node.get_logger().info("{}[{}]".format(self.feedback_message, self.qualified_name))
[docs] def initialise(self): """ Reset the internal variables and kick off a new goal request. """ self.logger.debug("{}.initialise()".format(self.qualified_name)) # initialise some temporary variables self.goal_handle = None self.send_goal_future = None self.get_result_future = None self.result_message = None self.result_status = None self.result_status_string = None try: self.send_goal_request(self.blackboard.goal) self.feedback_message = "sent goal request" except KeyError: pass # self.send_goal_future will be None, check on that
[docs] def update(self): """ Check only to see whether the underlying action server has succeeded, is running, or has cancelled/aborted for some reason and map these to the usual behaviour return states. Returns: :class:`py_trees.common.Status` """ self.logger.debug("{}.update()".format(self.qualified_name)) if self.send_goal_future is None: self.feedback_message = "no goal to send" return py_trees.common.Status.FAILURE if self.goal_handle is not None and not self.goal_handle.accepted: # goal was rejected self.feedback_message = "goal rejected" return py_trees.common.Status.FAILURE if self.result_status is None: return py_trees.common.Status.RUNNING elif not self.get_result_future.done(): # should never get here self.node.get_logger().warn("got result, but future not yet done [{}]".format(self.qualified_name)) return py_trees.common.Status.RUNNING else: self.node.get_logger().debug("goal result [{}]".format(self.qualified_name)) self.node.get_logger().debug(" status: {}".format(self.result_status_string)) self.node.get_logger().debug(" message: {}".format(self.result_message)) if self.result_status == action_msgs.GoalStatus.STATUS_SUCCEEDED: # noqa self.feedback_message = "successfully completed" return py_trees.common.Status.SUCCESS else: self.feedback_message = "failed" return py_trees.common.Status.FAILURE
[docs] def terminate(self, new_status: py_trees.common.Status): """ If running and the current goal has not already succeeded, cancel it. Args: new_status: the behaviour is transitioning to this new status """ self.logger.debug( "{}.terminate({})".format( self.qualified_name, "{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status) ) ) if ( self.status == py_trees.common.Status.RUNNING and new_status == py_trees.common.Status.INVALID ): self.send_cancel_request()
[docs] def shutdown(self): """ Clean up the action client when shutting down. """ self.action_client.destroy()
######################################## # Action Client Methods ########################################
[docs] def feedback_callback(self, msg: typing.Any): """ Default generator for feedback messages from the action server. This will update the behaviour's feedback message with a stringified version of the incoming feedback message. Args: msg: incoming feedback message (e.g. move_base_msgs.action.MoveBaseFeedback) """ if self.generate_feedback_message is not None: self.feedback_message = "feedback: {}".format(self.generate_feedback_message(msg)) self.node.get_logger().debug( '{} [{}]'.format( self.feedback_message, self.qualified_name ) )
[docs] def send_goal_request(self, goal: typing.Any): """ Send the goal, get a future back and start lining up the chain of callbacks that will lead to a result. """ self.feedback_message = "sending goal ..." self.node.get_logger().debug("{} [{}]".format( self.feedback_message, self.qualified_name )) self.send_goal_future = self.action_client.send_goal_async( goal, feedback_callback=self.feedback_callback, # A random uuid is always generated, since we're not sending more than one # at a time, we don't need to generate and track them here # goal_uuid=unique_identifier_msgs.UUID(uuid=list(uuid.uuid4().bytes)) ) self.send_goal_future.add_done_callback(self.goal_response_callback)
[docs] def goal_response_callback(self, future: rclpy.task.Future): """ Handle goal response, proceed to listen for the result if accepted. Args: future: incoming goal request result delivered from the action server """ if future.result() is None: self.feedback_message = "goal request failed :[ [{}]\n{!r}".format(self.qualified_name, future.exception()) self.node.get_logger().debug('... {}'.format(self.feedback_message)) return self.goal_handle = future.result() if not self.goal_handle.accepted: self.feedback_message = "goal rejected :( [{}]".format(self.qualified_name) self.node.get_logger().debug('... {}'.format(self.feedback_message)) return else: self.feedback_message = "goal accepted :) [{}]".format(self.qualified_name) self.node.get_logger().debug("... {}".format(self.feedback_message)) self.node.get_logger().debug(" {!s}".format(future.result())) self.get_result_future = self.goal_handle.get_result_async() self.get_result_future.add_done_callback(self.get_result_callback)
[docs] def send_cancel_request(self): """ Send a cancel request to the server. This is triggered when the behaviour's status switches from :attr:`~py_trees.common.Status.RUNNING` to :attr:`~py_trees.common.Status.INVALID` (typically a result of a priority interrupt). """ self.feedback_message = "cancelling goal ... [{}]".format(self.qualified_name) self.node.get_logger().debug(self.feedback_message) if self.goal_handle is not None: future = self.goal_handle.cancel_goal_async() future.add_done_callback(self.cancel_response_callback)
[docs] def cancel_response_callback(self, future: rclpy.task.Future): """ Immediate callback for the result of a cancel request. This will set the behaviour's feedback message accordingly. Args: future: incoming cancellation result delivered from the action server """ cancel_response = future.result() if len(cancel_response.goals_canceling) > 0: self.feedback_message = "goal successfully cancelled [{}]".format(self.qualified_name) else: self.feedback_message = "goal failed to cancel [{}]".format(self.qualified_name) self.node.get_logger().debug('... {}'.format(self.feedback_message))
[docs] def get_result_callback(self, future: rclpy.task.Future): """ Immediate callback for the result, saves data into local variables so that the update method can react accordingly. Args: future: incoming goal result delivered from the action server """ self.result_message = future.result() self.result_status = future.result().status self.result_status_string = self.status_strings[self.result_status]
[docs]class FromConstant(FromBlackboard): """ Convenience version of the action client that only ever sends the same goal. .. see-also: :class:`py_trees_ros.action_clients.FromBlackboard` Args: name: name of the behaviour action_type: spec type for the action (e.g. move_base_msgs.action.MoveBase) action_name: where you can find the action topics & services (e.g. "bob/move_base") action_goal: the goal to send generate_feedback_message: formatter for feedback messages, takes action_type.Feedback messages and returns strings (default: None) wait_for_server_timeout_sec: use negative values for a blocking but periodic check (default: -3.0) .. note:: The default setting for timeouts (a negative value) will suit most use cases. With this setting the behaviour will periodically check and issue a warning if the server can't be found. Actually aborting the setup can usually be left up to the behaviour tree manager. """ def __init__(self, name: str, action_type: typing.Any, action_name: str, action_goal: typing.Any, generate_feedback_message: typing.Callable[[typing.Any], str]=None, wait_for_server_timeout_sec: float=-3.0 ): unique_id = uuid.uuid4() key = "/goal_" + str(unique_id) super().__init__( action_type=action_type, action_name=action_name, key=key, name=name, generate_feedback_message=generate_feedback_message, wait_for_server_timeout_sec=wait_for_server_timeout_sec ) # parent already instantiated a blackboard client self.blackboard.register_key( key=key, access=py_trees.common.Access.WRITE, ) self.blackboard.set(name=key, value=action_goal)