#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
# https://raw.github.com/splintered-reality/py_trees_ros/license/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
Behaviours for ROS actions.
"""
##############################################################################
# Imports
##############################################################################
import action_msgs.msg as action_msgs # GoalStatus
import py_trees
import rclpy.action
from typing import Any, Callable
from . import exceptions
##############################################################################
# Behaviours
##############################################################################
[docs]class ActionClient(py_trees.behaviour.Behaviour):
"""
A generic action client interface. This simply sends a pre-configured
goal to the action client.
Args:
action_type: spec type for the action (e.g. move_base_msgs.action.MoveBase)
action_name: where you can find the action topics & services (e.g. "bob/move_base")
action_goal: pre-configured action goal (e.g. move_base_msgs.action.MoveBaseGoal())
name: name of the behaviour (default: lowercase class name)
generate_feedback_message: formatter for feedback messages, takes action_type.Feedback
messages and returns strings (default: None)
"""
def __init__(self,
action_type: Any,
action_name: str,
action_goal: Any,
name: str=py_trees.common.Name.AUTO_GENERATED,
generate_feedback_message: Callable[[Any], str]=None,
):
super().__init__(name)
self.action_type = action_type
self.action_name = action_name
self.action_goal = action_goal
self.generate_feedback_message = generate_feedback_message
self.node = None
self.action_client = None
self.status_strings = {
action_msgs.GoalStatus.STATUS_UNKNOWN : "STATUS_UNKNOWN", # noqa
action_msgs.GoalStatus.STATUS_ACCEPTED : "STATUS_ACCEPTED", # noqa
action_msgs.GoalStatus.STATUS_EXECUTING: "STATUS_EXECUTING", # noqa
action_msgs.GoalStatus.STATUS_CANCELING: "STATUS_CANCELING", # noqa
action_msgs.GoalStatus.STATUS_SUCCEEDED: "STATUS_SUCCEEDED", # noqa
action_msgs.GoalStatus.STATUS_CANCELED : "STATUS_CANCELED", # noqa
action_msgs.GoalStatus.STATUS_ABORTED : "STATUS_ABORTED" # noqa
}
[docs] def setup(self, **kwargs):
"""
Setup the action client services and subscribers.
Args:
**kwargs (:obj:`dict`): distribute arguments to this
behaviour and in turn, all of it's children
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
:class:`~py_trees_ros.exceptions.TimedOutError`: if the action server could not be found
"""
self.logger.debug("{}.setup()".format(self.qualified_name))
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = rclpy.action.ActionClient(
node=self.node,
action_type=self.action_type,
action_name=self.action_name
)
self.node.get_logger().info(
"waiting for server ... [{}][{}]".format(
self.action_name, self.qualified_name
)
)
result = self.action_client.wait_for_server(timeout_sec=2.0)
if not result:
self.feedback_message = "timed out waiting for the server [{}]".format(self.action_name)
self.node.get_logger().error("{}[{}]".format(self.feedback_message, self.qualified_name))
raise exceptions.TimedOutError(self.feedback_message)
else:
self.feedback_message = "... connected to action server [{}]".format(self.action_name)
self.node.get_logger().info("{}[{}]".format(self.feedback_message, self.qualified_name))
[docs] def initialise(self):
"""
Reset the internal variables and kick off a new goal request.
"""
self.logger.debug("{}.initialise()".format(self.qualified_name))
# initialise some temporary variables
self.goal_handle = None
self.send_goal_future = None
self.get_result_future = None
self.result_message = None
self.result_status = None
self.result_status_string = None
self.feedback_message = "sent goal request"
self.send_goal_request()
[docs] def update(self):
"""
Check only to see whether the underlying action server has
succeeded, is running, or has cancelled/aborted for some reason and
map these to the usual behaviour return states.
Returns:
:class:`py_trees.common.Status`
"""
self.logger.debug("{}.update()".format(self.qualified_name))
if self.goal_handle is not None and not self.goal_handle.accepted:
# goal was rejected
self.feedback_message = "goal rejected"
return py_trees.common.Status.FAILURE
if self.result_status is None:
return py_trees.common.Status.RUNNING
elif not self.get_result_future.done():
# should never get here
self.node.get_logger().warn("got result, but future not yet done [{}]".format(self.qualified_name))
return py_trees.common.Status.RUNNING
else:
self.node.get_logger().info("goal result [{}]".format(self.qualified_name))
self.node.get_logger().info(" status: {}".format(self.result_status_string))
self.node.get_logger().info(" message: {}".format(self.result_message))
if self.result_status == action_msgs.GoalStatus.STATUS_SUCCEEDED: # noqa
self.feedback_message = "successfully completed"
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = "failed"
return py_trees.common.Status.FAILURE
[docs] def terminate(self, new_status: py_trees.common.Status):
"""
If running and the current goal has not already succeeded, cancel it.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug(
"{}.terminate({})".format(
self.qualified_name,
"{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status)
)
)
if (
self.status == py_trees.common.Status.RUNNING
and new_status == py_trees.common.Status.INVALID
):
self.send_cancel_request()
[docs] def shutdown(self):
"""
Clean up the action client when shutting down.
"""
self.action_client.destroy()
########################################
# Action Client Methods
########################################
[docs] def feedback_callback(self, msg: Any):
"""
Default generator for feedback messages from the action server. This will
update the behaviour's feedback message with a stringified version of the
incoming feedback message.
Args:
msg: incoming feedback message (e.g. move_base_msgs.action.MoveBaseFeedback)
"""
if self.generate_feedback_message is not None:
self.feedback_message = "feedback: {}".format(self.generate_feedback_message(msg))
self.node.get_logger().info(
'{} [{}]'.format(
self.feedback_message,
self.qualified_name
)
)
[docs] def send_goal_request(self):
"""
Send the goal, get a future back and start lining up the
chain of callbacks that will lead to a result.
"""
self.feedback_message = "sending goal ..."
self.node.get_logger().info("{} [{}]".format(
self.feedback_message,
self.qualified_name
)
)
self.send_goal_future = self.action_client.send_goal_async(
self.action_goal,
feedback_callback=self.feedback_callback,
# A random uuid is always generated, since we're not sending more than one
# at a time, we don't need to generate and track them here
# goal_uuid=unique_identifier_msgs.UUID(uuid=list(uuid.uuid4().bytes))
)
self.send_goal_future.add_done_callback(self.goal_response_callback)
[docs] def goal_response_callback(self, future: rclpy.task.Future):
"""
Handle goal response, proceed to listen for the result if accepted.
Args:
future: incoming goal request result delivered from the action server
"""
if future.result() is None:
self.feedback_message = "goal request failed :[ [{}]\n{!r}".format(self.qualified_name, future.exception())
self.node.get_logger().info('... {}'.format(self.feedback_message))
return
self.goal_handle = future.result()
if not self.goal_handle.accepted:
self.feedback_message = "goal rejected :( [{}]".format(self.qualified_name)
self.node.get_logger().info('... {}'.format(self.feedback_message))
return
else:
self.feedback_message = "goal accepted :) [{}]".format(self.qualified_name)
self.node.get_logger().info("... {}".format(self.feedback_message))
self.node.get_logger().debug(" {!s}".format(future.result()))
self.get_result_future = self.goal_handle.get_result_async()
self.get_result_future.add_done_callback(self.get_result_callback)
[docs] def send_cancel_request(self):
"""
Send a cancel request to the server. This is triggered when the
behaviour's status switches from :attr:`~py_trees.common.Status.RUNNING` to
:attr:`~py_trees.common.Status.INVALID` (typically a result of a priority
interrupt).
"""
self.feedback_message = "cancelling goal ... [{}]".format(self.qualified_name)
self.node.get_logger().info(self.feedback_message)
if self.goal_handle is not None:
future = self.goal_handle.cancel_goal_async()
future.add_done_callback(self.cancel_response_callback)
[docs] def cancel_response_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result of a cancel request. This will
set the behaviour's feedback message accordingly.
Args:
future: incoming cancellation result delivered from the action server
"""
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.feedback_message = "goal successfully cancelled [{}]".format(self.qualified_name)
else:
self.feedback_message = "goal failed to cancel [{}]".format(self.qualified_name)
self.node.get_logger().info('... {}'.format(self.feedback_message))
[docs] def get_result_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result, saves data into local variables so that
the update method can react accordingly.
Args:
future: incoming goal result delivered from the action server
"""
self.result_message = future.result()
self.result_status = future.result().status
self.result_status_string = self.status_strings[self.result_status]