action是 ROS 2 中的一种异步通信形式。动作客户端将目标请求发送到动作服务器。行动服务器将目标反馈和结果发送给行动客户端。本文将python实现action服务和客户端。
您将需要 action_tutorials_interfaces 包和 Fibonacci.action 接口,该接口在上一个教程创建操作中定义。
让我们专注于编写一个动作服务器,使用我们在创建动作教程中创建的动作来计算斐波那契数列。
到目前为止,您已经创建了包并使用 ros2 run 来运行您的节点。然而,为了在本教程中保持简单,我们将动作服务器的范围限定为单个文件。如果您想查看动作教程的完整包是什么样的,请查看 action_tutorials。
在您的主目录中打开一个新文件,我们将其命名为 fibonacci_action_server.py,并添加以下代码:
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionServer(Node):def __init__(self):super().__init__('fibonacci_action_server')self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')result = Fibonacci.Result()return result#这里goal_handle没用上,是留给后面feedback处理的。def main(args=None):rclpy.init(args=args)fibonacci_action_server = FibonacciActionServer()rclpy.spin(fibonacci_action_server)if __name__ == '__main__':main()
第 8 行定义了一个 FibonacciActionServer 类,它是 Node 的子类。该类通过调用节点构造函数初始化,将我们的节点命名为 fibonacci_action_server:
super().__init__('fibonacci_action_server')
在构造函数中,我们还实例化了一个新的动作服务器:
self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)
注意:这个定义有三个要素,“接口-名称-回调函数”。
参数解释:
将动作客户端添加到的 ROS 2 节点:self。
操作类型:Fibonacci(在第 5 行中导入)。这个是'Fibonacci'的接口数据结构。
动作名称:'fibonacci'。与topic意义相同,只要有人发fibonacci的action,那么本服务就响应。内部有若干个队列,topic的、action的,只要队列不空,且有同名接口数据就接收。
执行接受目标的回调函数:self.execute_callback。此回调必须返回操作类型的结果消息。
我们还在我们的类中定义了一个 execute_callback 方法:
def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')result = Fibonacci.Result()return result
这是在目标被接受后将被调用以执行目标的方法。让我们尝试运行我们的动作服务器:
打开终端,启动服务指令
python3 fibonacci_action_server.py
在另一个终端中,我们可以使用命令行界面发送一个目标:
ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
在运行动作服务器的终端中,您应该会看到一条记录消息“Executing goal...”,然后是未设置目标状态的警告。默认情况下,如果未在执行回调中设置目标句柄状态,则它会采用中止状态。
我们可以在目标句柄上使用 succeed() 方法来指示目标成功:
def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')goal_handle.succeed()result = Fibonacci.Result()return result
现在,如果您重新启动操作服务器并发送另一个目标,您应该会看到目标已完成且状态为 SUCCEEDED。
现在让我们的目标执行实际计算并返回请求的斐波那契数列:
def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')sequence = [0, 1]for i in range(1, goal_handle.request.order):sequence.append(sequence[i] + sequence[i-1])goal_handle.succeed()result = Fibonacci.Result()result.sequence = sequencereturn result
计算完序列后,我们在返回之前将其分配给结果消息字段。再次重启动作服务器并发送另一个目标。您应该看到目标以正确的结果序列完成。
行动的好处之一是能够在目标执行期间向行动客户提供反馈。我们可以通过调用目标句柄的 publish_feedback() 方法让我们的动作服务器为动作客户端发布反馈。
我们将替换序列变量,并使用反馈消息来存储序列。在 for 循环中每次更新反馈消息后,我们都会发布反馈消息并休眠以获得戏剧效果:
import timeimport rclpy
from rclpy.action import ActionServer
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionServer(Node):def __init__(self):super().__init__('fibonacci_action_server')self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')feedback_msg = Fibonacci.Feedback()feedback_msg.partial_sequence = [0, 1]for i in range(1, goal_handle.request.order):feedback_msg.partial_sequence.append(feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))goal_handle.publish_feedback(feedback_msg)time.sleep(1)goal_handle.succeed()result = Fibonacci.Result()result.sequence = feedback_msg.partial_sequencereturn resultdef main(args=None):rclpy.init(args=args)fibonacci_action_server = FibonacciActionServer()rclpy.spin(fibonacci_action_server)if __name__ == '__main__':main()
重新启动操作服务器后,我们可以使用带有 --feedback 选项的命令行工具确认反馈现在已发布:
ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
我们还将动作客户端的范围限定为单个文件。打开一个新文件,我们将其命名为 fibonacci_action_client.py,并添加以下样板代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()return self._action_client.send_goal_async(goal_msg)def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()future = action_client.send_goal(10)rclpy.spin_until_future_complete(action_client, future)if __name__ == '__main__':main()
我们定义了一个 FibonacciActionClient 类,它是 Node 的子类。该类通过调用节点构造函数初始化,将我们的节点命名为 fibonacci_action_client:
super().__init__('fibonacci_action_client')
同样在类构造函数中,我们使用上一个创建动作教程中的自定义动作定义创建了一个动作客户端:
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
我们通过传递三个参数来创建一个 ActionClient:
我们的动作客户端将能够与相同动作名称和类型的动作服务器通信。我们还在 FibonacciActionClient 类中定义了一个方法 send_goal:
def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()return self._action_client.send_goal_async(goal_msg)
此方法等待动作服务器可用,然后将目标发送到服务器。它返回一个我们可以稍后等待的未来。
在类定义之后,我们定义了一个函数 main() 来初始化 ROS 2 并创建我们的 FibonacciActionClient 节点的实例。然后它发送一个目标并等待该目标完成。
最后,我们在 Python 程序的入口点调用 main()。
让我们先运行之前构建的动作服务器来测试我们的动作客户端:
python3 fibonacci_action_server.py
您应该看到操作服务器在成功执行目标时打印的消息:
[INFO] [fibonacci_action_server]: Executing goal... [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3]) [INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5]) # etc.
动作客户端应该启动,然后快速完成。此时,我们有一个功能正常的动作客户端,但我们看不到任何结果或得到任何反馈。
所以我们可以发送一个目标,但是我们怎么知道它什么时候完成呢?我们可以通过几个步骤获得结果信息。首先,我们需要为发送的目标获取目标句柄。然后,我们可以使用目标句柄来请求结果。
下面是这个例子的完整代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()self._send_goal_future = self._action_client.send_goal_async(goal_msg)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()action_client.send_goal(10)rclpy.spin(action_client)if __name__ == '__main__':main()
ActionClient.send_goal_async() 方法将未来返回到目标句柄。首先,我们为未来完成时注册一个回调:
self._send_goal_future.add_done_callback(self.goal_response_callback)
请注意,当操作服务器接受或拒绝目标请求时,未来完成。让我们更详细地了解一下 goal_response_callback。我们可以检查目标是否被拒绝并提前返回,因为我们知道不会有结果:
def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')
现在我们有了一个目标句柄,我们可以使用它通过 get_result_async() 方法请求结果。与发送目标类似,我们将获得一个在结果准备就绪时完成的未来。让我们注册一个回调,就像我们为目标响应所做的那样:
self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)
在回调中,我们记录结果序列并关闭 ROS 2 以干净退出:
def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()
使用在单独终端中运行的动作服务器,继续尝试运行我们的 Fibonacci 动作客户端!
python3 fibonacci_action_client.py
您应该看到已接受的目标和最终结果的记录消息。
我们的行动客户端可以发送目标。好的!但如果我们能从动作服务器获得一些关于我们发送的目标的反馈,那就太好了。
下面是这个例子的完整代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()def feedback_callback(self, feedback_msg):feedback = feedback_msg.feedbackself.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()action_client.send_goal(10)rclpy.spin(action_client)if __name__ == '__main__':main()
下面是反馈消息的回调函数:
def feedback_callback(self, feedback_msg):feedback = feedback_msg.feedbackself.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
在回调中,我们获取消息的反馈部分并将 partial_sequence 字段打印到屏幕上。 我们需要向动作客户端注册回调。这是通过在我们发送目标时额外将回调传递给操作客户端来实现的:
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
我们都准备好了。如果我们运行我们的动作客户端,您应该会看到反馈被打印到屏幕上。
在本教程中,您逐行将 Python 动作服务器和动作客户端码放在一起,并配置它们以交换目标、反馈和结果。然而整个action服务是一整套协议,需要边实践,边琢磨,才能grasp。
上一篇:QSS查询手册