[ROS2 知识] [Action综合] (3)python实现的服务-客户端
创始人
2025-05-28 01:56:17
0

一、说明

        action是 ROS 2 中的一种异步通信形式。动作客户端将目标请求发送到动作服务器。行动服务器将目标反馈和结果发送给行动客户端。本文将python实现action服务和客户端。

二、服务器代码编制

2.1 先决条件

        您将需要 action_tutorials_interfaces 包和 Fibonacci.action 接口,该接口在上一个教程创建操作中定义。

        让我们专注于编写一个动作服务器,使用我们在创建动作教程中创建的动作来计算斐波那契数列。

        到目前为止,您已经创建了包并使用 ros2 run 来运行您的节点。然而,为了在本教程中保持简单,我们将动作服务器的范围限定为单个文件。如果您想查看动作教程的完整包是什么样的,请查看 action_tutorials。

2.2 服务器接收并处理goal代码

        在您的主目录中打开一个新文件,我们将其命名为 fibonacci_action_server.py,并添加以下代码:

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionServer(Node):def __init__(self):super().__init__('fibonacci_action_server')self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')result = Fibonacci.Result()return result#这里goal_handle没用上,是留给后面feedback处理的。def main(args=None):rclpy.init(args=args)fibonacci_action_server = FibonacciActionServer()rclpy.spin(fibonacci_action_server)if __name__ == '__main__':main()

        第 8 行定义了一个 FibonacciActionServer 类,它是 Node 的子类。该类通过调用节点构造函数初始化,将我们的节点命名为 fibonacci_action_server:

                 super().__init__('fibonacci_action_server')

        在构造函数中,我们还实例化了一个新的动作服务器:

        self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)

注意:这个定义有三个要素,“接口-名称-回调函数”。
 

参数解释:

  1. 将动作客户端添加到的 ROS 2 节点:self。

  2. 操作类型:Fibonacci(在第 5 行中导入)。这个是'Fibonacci'的接口数据结构。

  3. 动作名称:'fibonacci'。与topic意义相同,只要有人发fibonacci的action,那么本服务就响应。内部有若干个队列,topic的、action的,只要队列不空,且有同名接口数据就接收。

  4. 执行接受目标的回调函数:self.execute_callback。此回调必须返回操作类型的结果消息。

对比topic和action的消息处理

         我们还在我们的类中定义了一个 execute_callback 方法:

    def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')result = Fibonacci.Result()return result

        这是在目标被接受后将被调用以执行目标的方法。让我们尝试运行我们的动作服务器:

        打开终端,启动服务指令

    python3 fibonacci_action_server.py

        在另一个终端中,我们可以使用命令行界面发送一个目标:

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

        在运行动作服务器的终端中,您应该会看到一条记录消息“Executing goal...”,然后是未设置目标状态的警告。默认情况下,如果未在执行回调中设置目标句柄状态,则它会采用中止状态。

        我们可以在目标句柄上使用 succeed() 方法来指示目标成功:

    def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')goal_handle.succeed()result = Fibonacci.Result()return result

        现在,如果您重新启动操作服务器并发送另一个目标,您应该会看到目标已完成且状态为 SUCCEEDED。

        现在让我们的目标执行实际计算并返回请求的斐波那契数列:

   def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')sequence = [0, 1]for i in range(1, goal_handle.request.order):sequence.append(sequence[i] + sequence[i-1])goal_handle.succeed()result = Fibonacci.Result()result.sequence = sequencereturn result

        计算完序列后,我们在返回之前将其分配给结果消息字段。再次重启动作服务器并发送另一个目标。您应该看到目标以正确的结果序列完成。

2.3 追加goal_handle发布反馈消息

        行动的好处之一是能够在目标执行期间向行动客户提供反馈。我们可以通过调用目标句柄的 publish_feedback() 方法让我们的动作服务器为动作客户端发布反馈。

        我们将替换序列变量,并使用反馈消息来存储序列。在 for 循环中每次更新反馈消息后,我们都会发布反馈消息并休眠以获得戏剧效果:

import timeimport rclpy
from rclpy.action import ActionServer
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionServer(Node):def __init__(self):super().__init__('fibonacci_action_server')self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')feedback_msg = Fibonacci.Feedback()feedback_msg.partial_sequence = [0, 1]for i in range(1, goal_handle.request.order):feedback_msg.partial_sequence.append(feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))goal_handle.publish_feedback(feedback_msg)time.sleep(1)goal_handle.succeed()result = Fibonacci.Result()result.sequence = feedback_msg.partial_sequencereturn resultdef main(args=None):rclpy.init(args=args)fibonacci_action_server = FibonacciActionServer()rclpy.spin(fibonacci_action_server)if __name__ == '__main__':main()

        重新启动操作服务器后,我们可以使用带有 --feedback 选项的命令行工具确认反馈现在已发布:

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

三、客户端代码编制

3.1 客户端发射goal请求

        我们还将动作客户端的范围限定为单个文件。打开一个新文件,我们将其命名为 fibonacci_action_client.py,并添加以下样板代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()return self._action_client.send_goal_async(goal_msg)def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()future = action_client.send_goal(10)rclpy.spin_until_future_complete(action_client, future)if __name__ == '__main__':main()

        我们定义了一个 FibonacciActionClient 类,它是 Node 的子类。该类通过调用节点构造函数初始化,将我们的节点命名为 fibonacci_action_client:

   super().__init__('fibonacci_action_client')

        同样在类构造函数中,我们使用上一个创建动作教程中的自定义动作定义创建了一个动作客户端:

  self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

        我们通过传递三个参数来创建一个 ActionClient:

  • 将动作客户端添加到的 ROS 2 节点:self
  • 动作类型:Fibonacci
  • 动作名称:'fibonacci'

我们的动作客户端将能够与相同动作名称和类型的动作服务器通信。我们还在 FibonacciActionClient 类中定义了一个方法 send_goal:

    def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()return self._action_client.send_goal_async(goal_msg)

        此方法等待动作服务器可用,然后将目标发送到服务器。它返回一个我们可以稍后等待的未来。

        在类定义之后,我们定义了一个函数 main() 来初始化 ROS 2 并创建我们的 FibonacciActionClient 节点的实例。然后它发送一个目标并等待该目标完成。

        最后,我们在 Python 程序的入口点调用 main()。

        让我们先运行之前构建的动作服务器来测试我们的动作客户端:

python3 fibonacci_action_server.py

        您应该看到操作服务器在成功执行目标时打印的消息:

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

        动作客户端应该启动,然后快速完成。此时,我们有一个功能正常的动作客户端,但我们看不到任何结果或得到任何反馈。

3.2 获取结果

        所以我们可以发送一个目标,但是我们怎么知道它什么时候完成呢?我们可以通过几个步骤获得结果信息。首先,我们需要为发送的目标获取目标句柄。然后,我们可以使用目标句柄来请求结果。

        下面是这个例子的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()self._send_goal_future = self._action_client.send_goal_async(goal_msg)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()action_client.send_goal(10)rclpy.spin(action_client)if __name__ == '__main__':main()

        ActionClient.send_goal_async() 方法将未来返回到目标句柄。首先,我们为未来完成时注册一个回调:

   self._send_goal_future.add_done_callback(self.goal_response_callback)

        请注意,当操作服务器接受或拒绝目标请求时,未来完成。让我们更详细地了解一下 goal_response_callback。我们可以检查目标是否被拒绝并提前返回,因为我们知道不会有结果:

  def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')

        现在我们有了一个目标句柄,我们可以使用它通过 get_result_async() 方法请求结果。与发送目标类似,我们将获得一个在结果准备就绪时完成的未来。让我们注册一个回调,就像我们为目标响应所做的那样:

        self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)

        在回调中,我们记录结果序列并关闭 ROS 2 以干净退出:

    def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()

        使用在单独终端中运行的动作服务器,继续尝试运行我们的 Fibonacci 动作客户端!

    python3 fibonacci_action_client.py

        您应该看到已接受的目标和最终结果的记录消息。

3.3 获取反馈

        我们的行动客户端可以发送目标。好的!但如果我们能从动作服务器获得一些关于我们发送的目标的反馈,那就太好了。

下面是这个例子的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Nodefrom action_tutorials_interfaces.action import Fibonacciclass FibonacciActionClient(Node):def __init__(self):super().__init__('fibonacci_action_client')self._action_client = ActionClient(self, Fibonacci, 'fibonacci')def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()def feedback_callback(self, feedback_msg):feedback = feedback_msg.feedbackself.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))def main(args=None):rclpy.init(args=args)action_client = FibonacciActionClient()action_client.send_goal(10)rclpy.spin(action_client)if __name__ == '__main__':main()

        下面是反馈消息的回调函数:

   def feedback_callback(self, feedback_msg):feedback = feedback_msg.feedbackself.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

        在回调中,我们获取消息的反馈部分并将 partial_sequence 字段打印到屏幕上。 我们需要向动作客户端注册回调。这是通过在我们发送目标时额外将回调传递给操作客户端来实现的:

self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        我们都准备好了。如果我们运行我们的动作客户端,您应该会看到反馈被打印到屏幕上。

四、后记

        在本教程中,您逐行将 Python 动作服务器和动作客户端码放在一起,并配置它们以交换目标、反馈和结果。然而整个action服务是一整套协议,需要边实践,边琢磨,才能grasp。

相关内容

热门资讯

小白网络安全学习2 简要目录;DVWA 靶场;XSS平台搭建;xss检测和利用;...
最新或2023(历届)珠海市内...  办理对象  群众  办理条件  一、申请条件及所需资料:(以下有关证件由派出所审核原件后收取复印件...
Vue基础23之路由第二节 Vue基础23路由路由的query参数src/router/index.jsDetail.vueHo...
最新或2023(历届)清远户口...  办理对象  需户口迁移人员  办理条件  符合政策规定的户口迁移  所需材料  无  窗口办理流程...
最新或2023(历届)重庆户口...  办理程序、时限及相关迁移手续:  (一)、区内迁移的,居民持相关手续到入户地派出所办理入户,手续齐...
最新或2023(历届)佛山三水...  办理对象  拟迁入本区的非本市户籍人员  办理条件  市外户籍人员在我市购买商品房(不含二手房和自...
最新或2023(历届)汕尾户口...  办理对象  群众  办理条件  1、购房入户  2、夫妻迁移入户  3、子女投靠父母  所需材料 ...
基础数论算法刷题笔记 理论 最小公倍数、最大公约数 (a+b)%n = a%n+b%n (ab)...
LeetCode 112. 路... LeetCode 112. 路径总和        给你二叉树的根节点 root 和一个表示目标和的...
最新或2023(历届)新生婴儿...   新生儿落户口办理程序:  1、用医院开具的孩子出生证明到当地的妇幼保健站开具正式的出生医学证明。...
最新或2023(历届)新生儿户...   新生儿申报户口须知:  一、办理条件 新生婴儿可按随父随母自愿的原则申报户口。非婚生育的新生婴儿...
新生儿落户办理户口时间限制,新...   新生儿作为国家未来的主体,国家在一些政策上也是有规定的,那么新生儿落户政策有哪些呢?新生儿在落户...
最新或2023(历届)深圳户口...  第一项——调干条件 根据 人事局颁布相关政策,调干的方式及条件分别为:  一、单调 1、享受国务院...
最新或2023(历届)毕业生办... 市民您好!感谢您对公安工作的支持与理解。根据您提出的问题,依据我市现行的户口政策,现答复如下:需本人...
TDK| 电源——反激变压器设... 电源参数根据功率、输入输出的情况,我们选择反激电源拓扑。反激式变压器的优点有:1、 电...
[个人笔记] 投影矩阵的导数计... [个人笔记] 投影矩阵的导数计算(1) 问题(2ÿ...
19.特殊工具与技术 文章目录特殊工具与技术19.1控制内存分配19.1.1重载new和deleteoperator ne...
最新或2023(历届)最新户口... 户口迁移手续[网友提问]人在北京上班三年,现在想在北京定居落户。可是户口还在老家湖南。想把湖南户口迁...
揭秘:春秋战国时代的美食 春秋... 我们今天再来说说春秋战国的饮食。新春佳节,寒冬腊月,三五好友,围坐于火锅旁,天南海北的胡侃,便是人生...
宁都起义简介 起义的环境,过程... 宁都起义,又名26路军起义,1931年(民国二十年)12月,在第二次国内革命战争中,国民革命军第26...