ROS2 Python 教学合并版:从环境搭建到 Topic 通信实战

张开发
2026/4/17 1:09:38 15 分钟阅读

分享文章

ROS2 Python 教学合并版:从环境搭建到 Topic 通信实战
搭建 ROS2 Humble WSL2 Ubuntu 22.04 开发环境。编写第一个 Python 节点理解rclpy、Node、spin。从普通 Python 类进化到正式 ROS2 节点类。学习多线程与回调避免耗时任务卡死节点。完成一个Topic 发布订阅与语音朗读项目。一、环境搭建ROS2 Humble WSL2 Ubuntu 22.041.1 安装思路推荐使用 Ubuntu 22.04 搭配 ROS2 Humble。Humble 是 Ubuntu 22.04 对应的 ROS2 LTS 版本资料多、插件丰富、稳定性适合入门和项目练习。可以使用鱼香 ROS 的一键安装脚本wgethttp://fishros.com/install-Ofishros.fishros这类脚本通常会完成换源、添加 ROS2 软件源、安装 ROS2 基础组件等步骤。国内网络环境下换成清华、华为等镜像源可以明显减少下载失败和超时。1.2 验证 ROS2 是否安装成功ros2--version如果能正常输出版本信息说明ros2命令已经进入当前终端环境。1.3 VS Code 开发环境推荐安装这些插件WSLRemote DevelopmentPythonROSWSL 插件负责让 VS Code 进入 Ubuntu 子系统Python 插件负责补全、跳转、调试ROS 插件可以辅助识别 ROS2 工作空间、包和消息类型。二、ROS2 第一个 Python 节点2.1 节点最小代码先从最小节点开始理解 ROS2 程序的骨架。importrclpyfromrclpy.nodeimportNodedefmain():rclpy.init()# 初始化 ROS2 通信环境nodeNode(py_node)# 创建节点节点名为 py_nodenode.get_logger().info(你好 ROS2)rclpy.spin(node)# 循环监听rclpy.shutdown()# 释放资源2.2 关键语句解释rclpy.init()像开机启动。它会初始化当前进程的 ROS2 通信环境包括上下文、网络发现和底层通信资源。Node(py_node)给程序一个节点身份。在 ROS2 网络里节点名就像身份证其他工具和节点可以通过这个名字识别它。node.get_logger().info()是 ROS2 推荐的日志输出方式。它比print更适合机器人系统因为日志会带时间戳、等级并且能被rqt_console等工具查看。rclpy.spin(node)是节点持续运行的关键。没有spinPython 脚本执行完就退出有了spin节点会不断检查是否有消息、定时器或其他回调需要执行。rclpy.shutdown()用于优雅退出通知 ROS2 回收资源。三、规范化开发工作空间与功能包ROS2 项目不建议把代码零散放在任意目录里而是通过工作空间和功能包管理。3.1 创建工作空间mkdir-p~/ros2_ws/srccd~/ros2_ws/srcros2_ws是工作空间src用于存放功能包源码。3.2 创建 Python 功能包ros2 pkg create --build-type ament_python--licenseApache-2.0 demo_python_pkg--build-type ament_python表示这是一个 Python 包。demo_python_pkg是包名。3.3 编译与运行前置环境回到工作空间根目录cd~/ros2_ws colcon buildsourceinstall/setup.bashcolcon build会检查依赖、执行安装脚本把 Python 源码安装到install目录并生成环境脚本。source install/setup.bash会更新当前终端的环境变量例如PATH和PYTHONPATH。如果忘了执行ros2 run可能会找不到刚刚编写的包或节点。四、从 Python 类进化到 ROS2 节点ROS2 Python 节点本质上通常是一个继承自Node的类。先理解普通 Python 类再接入 ROS2会更顺。4.1 阶段一普通 Python 类classPersonNode:def__init__(self,name_value:str,age_value:int)-None:self.namename_value self.ageage_valuedefeat(self,food_name:str):print(f{self.name},{self.age}岁,爱吃{food_name})__init__是构造函数。当执行node PersonNode(...)时Python 会自动调用它。它常用于初始化对象属性。self代表对象自己。没有self变量只是函数内部的临时变量有了self变量会跟着对象一起存在。4.2 阶段二多实例验证defmain():nodePersonNode(法外狂徒张三,18)node1PersonNode(法外狂徒小王,89)node.eat(鱼香肉丝)node1.eat(西红柿)同一个类可以创建多个对象每个对象都有自己的name和age。在机器人程序里这对应一种很常见的写法写一个MotorController类再创建left_motor、right_motor两个实例。4.3 阶段三继承与super()新建writer_node.pyfromdemo_python_pkg.person_nodeimportPersonNodeclassWriterNode(PersonNode):def__init__(self,name:str,age:int,book:str)-None:super().__init__(name,age)self.bookbookclass WriterNode(PersonNode)表示WriterNode继承PersonNode。子类会自动拥有父类的方法例如eat。super().__init__(name, age)用于调用父类构造函数。如果不调用父类中的name和age就不会被正确初始化。4.4 阶段四正式成为 ROS2 节点importrclpyfromrclpy.nodeimportNodeclassPersonNode(Node):def__init__(self,node_name:str,name_value:str,age_value:int)-None:super().__init__(node_name)self.namename_value self.ageage_valuedefeat(self,food_name:str):self.get_logger().info(f{self.name},{self.age}岁,爱吃{food_name})这里的变化有三个PersonNode继承了 ROS2 的Node类。super().__init__(node_name)把节点注册进 ROS2 通信系统。self.get_logger().info()替代了print输出变成 ROS2 日志。如果不调用super().__init__(node_name)这个对象就无法真正成为 ROS2 节点也无法调用self.get_logger()。4.5 在setup.py中注册运行入口console_scripts:[person_node demo_python_pkg.person_node:main,writer_node demo_python_pkg.writer_node:main,],这相当于给节点注册快捷方式。当执行ros2 run demo_python_pkg person_nodeROS2 会根据console_scripts找到demo_python_pkg.person_node里的main函数并执行。五、多线程下载器与回调机制在机器人程序里传感器读取、图片处理、网络请求、语音合成等任务可能很耗时。如果这些任务直接写在 ROS2 回调里节点就容易卡住。5.1 准备本地 HTTP 文件服务创建测试小说文件echo第一章 少年踏上修仙路novel1.txtecho第二章 学习修仙马上就上天novel2.txt启动本地 HTTP 服务python3-mhttp.server8000--bind0.0.0.05.2 多线程下载器importthreadingimportrequestsclassDownLoader:defdownload(self,url,callback_func):print(f线程{threading.get_ident()}开始下载)responserequests.get(url)response.encodingutf-8callback_func(url,response.text)defstart_download(self,url,callback_func):threadthreading.Thread(targetself.download,args(url,callback_func))thread.start()requests.get(url)是网络 IO 操作可能阻塞当前线程。放到子线程后主程序可以继续运行。threading.Thread(target..., args...)表示创建一个子线程target是子线程要执行的函数args是传给函数的参数。callback_func是回调函数。下载完成后下载器主动调用它把结果交回去。这是一种“干完活再通知你”的模式。5.3 为什么 ROS2 节点需要理解多线程ROS2 节点里的消息接收、定时器等逻辑都是回调。如果在某个回调里执行耗时任务例如几秒钟的图像处理节点可能在这段时间里无法处理新的消息。常见处理思路有两种把耗时任务交给独立线程或线程池。使用 ROS2 的多线程执行器例如MultiThreadedExecutor。5.4 常见问题为什么要设置response.encoding utf-8因为中文内容在网络响应里可能无法被自动识别为 UTF-8手动设置可以避免乱码。为什么start_download不直接写self.download(...)直接调用就是同步执行主程序仍然会等待下载完成。只有通过threading.Thread包装并启动下载才会在后台异步运行。threading.get_ident()是什么它会返回当前线程 ID常用于验证代码是否真的运行在子线程中。六、Topic 通信实战修仙小说自动朗读系统这一部分把前面的节点、队列、多线程和回调串起来完成一个 ROS2 Topic 项目发布者下载小说文本把每一行发布到/novel话题。订阅者订阅/novel话题把收到的文本交给语音引擎朗读。6.1 创建新的工作空间与功能包mkdir-p~/chapt3_ws/srccd~/chapt3_ws/src ros2 pkg create demo_python_topic --build-type ament_python--dependenciesrclpy example_interfaces--licenseApache-2.0rclpy是 Python 节点必需依赖。example_interfaces提供标准消息类型例如String。6.2 准备小说文件服务器在当前文件夹下新建novel1.txt第一章 少年踏上修仙路 第二章 学习修仙马上上天 第三章 女大三千位列仙班启动 HTTP 服务python3-mhttp.server8000--bind0.0.0.06.3 发布者第一版只下载不发布novel_pub_node.pyimportrclpyfromrclpy.nodeimportNodeimportrequestsclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f{node_name}启动)defdownload(self,url):responserequests.get(url)response.encodingutf-8self.get_logger().info(f下载完成{url}, 长度{len(response.text)})defmain():rclpy.init()nodeNovelPubNode(novel_pub)node.download(http://localhost:8000/novel1.txt)rclpy.spin(node)rclpy.shutdown()这一版先验证节点可以启动并且能够通过 HTTP 下载小说内容。6.4 发布者第二版发布 Topic目标把下载到的小说内容切分成行然后定时发布到novel话题。importrclpyfromrclpy.nodeimportNodeimportrequestsfromexample_interfaces.msgimportStringfromqueueimportQueueclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f{node_name}启动)self.novels_queue_Queue()self.nobel_pubblisherself.create_publisher(String,novel,10)self.create_timer(3.0,self.timer_callback)deftimer_callback(self):ifself.novels_queue_.qsize()0:lineself.novels_queue_.get()msgString()msg.dataline self.nobel_pubblisher.publish(msg)self.get_logger().info(f发布了{msg.data})self.novels_queue_.put(line)defdownload(self,url):responserequests.get(url)response.encodingutf-8forlineinresponse.text.splitlines():ifline.strip():self.novels_queue_.put(line)核心点create_publisher(String, novel, 10)创建发布者。String是消息类型发布和订阅两端必须一致。novel是话题名ROS2 中实际显示时通常会看到/novel。10是消息队列深度接收方处理慢时可以缓存最近的消息。create_timer(3.0, self.timer_callback)每 3 秒触发一次发布逻辑。不要用while True持续发布否则可能卡住节点其他功能。6.5 订阅者接收文本并语音朗读安装语音引擎sudoaptupdatesudoaptinstallespeak-ng-ypip3installespeakngnovel_sub_node.pyimportrclpyfromrclpy.nodeimportNodefromexample_interfaces.msgimportStringfromqueueimportQueueimportthreadingimporttimeimportespeakngclassNovelSubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f{node_name}启动)self.novels_queue_Queue()self.nobels_subscriber_self.create_subscription(String,novel,self.novel_callback,10)self.speech_thread_threading.Thread(targetself.speake_thread)self.speech_thread_.start()defnovel_callback(self,msg):self.novels_queue_.put(msg.data)self.get_logger().info(f收到新章节{msg.data})defspeake_thread(self):speakerespeakng.Speaker()speaker.voicezhwhilerclpy.ok():ifself.novels_queue_.qsize()0:textself.novels_queue_.get()self.get_logger().info(f正在朗读{text})speaker.say(text)speaker.wait()else:time.sleep(1)defmain():rclpy.init()nodeNovelSubNode(novel_sub)rclpy.spin(node)rclpy.shutdown()订阅者里有两个关键设计create_subscription(String, novel, self.novel_callback, 10)订阅发布者的同名话题。语音朗读放进独立线程避免speaker.wait()卡住 ROS2 主线程。rclpy.ok()可以感知节点是否还在运行。当按下CtrlC后后台线程能跟着退出。6.6 注册 Topic 节点入口修改setup.pyconsole_scripts:[novel_pub_node demo_python_topic.novel_pub_node:main,novel_sub_node demo_python_topic.novel_sub_node:main,],如果你还在同一个教学包里整理全部案例可以把入口统一写成console_scripts:[person_node demo_python_pkg.person_node:main,writer_node demo_python_pkg.writer_node:main,learn_thread demo_python_pkg.learn_thread:main,novel_pub_node demo_python_topic.novel_pub_node:main,novel_sub_node demo_python_topic.novel_sub_node:main,],实际项目里入口属于哪个包就应写在那个包自己的setup.py中。6.7 编译与联调在工作空间根目录执行colcon buildsourceinstall/setup.bash终端 1 启动发布者ros2 run demo_python_topic novel_pub_node终端 2 启动订阅者sourceinstall/setup.bash ros2 run demo_python_topic novel_sub_node如果发布者和订阅者都正常运行订阅者会收到发布者发来的小说文本并调用语音引擎朗读。6.8 Topic 调试命令查看当前 ROS2 网络中的话题ros2 topic list直接查看/novel话题内容ros2 topicecho/novel查看/novel发布频率ros2 topic hz /novel查看发布者节点信息ros2nodeinfo /novel_pub排查顺序建议先用ros2 topic list看有没有/novel。再用ros2 topic echo /novel看有没有数据。如果 echo 有数据发布者没大问题重点查订阅者。如果 echo 没数据重点查发布者、下载逻辑、定时器和source install/setup.bash。七、核心知识总结7.1 ROS2 节点生命周期一个最小 Python 节点通常包含rclpy.init()nodeNode(node_name)rclpy.spin(node)rclpy.shutdown()类形式节点通常写成classMyNode(Node):def__init__(self):super().__init__(my_node)7.2 工作空间运行规律每次修改代码后通常需要colcon buildsourceinstall/setup.bash新开终端后也要重新执行source install/setup.bash。7.3 Topic 发布订阅规律发布者负责publisherself.create_publisher(String,topic_name,10)publisher.publish(msg)订阅者负责subscriberself.create_subscription(String,topic_name,callback,10)两端的消息类型和话题名必须一致。7.4 为什么使用Queue下载、发布、朗读的速度可能完全不同。Queue可以把这些环节解耦下载快了先把数据放进队列。发布按定时器慢慢取。朗读更慢也可以按自己的节奏消费。这样各模块互不硬等节点更稳定。7.5 避坑清单忘记source install/setup.bashros2 run找不到包或入口。没有rclpy.spin(node)节点启动后立刻结束。没有super().__init__(node_name)类无法真正注册为 ROS2 节点。发布和订阅话题名不一致订阅者收不到消息。发布和订阅消息类型不一致通信失败或行为异常。在回调里执行耗时任务节点卡死、消息堆积。中文 HTTP 文本没设置 UTF-8下载内容乱码。八、学习路线建议建议按以下顺序练习只运行第一个py_node确认 ROS2 基础环境可用。创建demo_python_pkg练习person_node和writer_node。用learn_thread.py单独理解线程和回调。创建demo_python_topic先写发布者再写订阅者。用ros2 topic echo /novel验证数据流。最后接入espeakng完成自动朗读系统。学完这条线后你已经掌握了 ROS2 Python 入门最重要的几块拼图节点、包、构建、入口、面向对象、线程、队列、发布者、订阅者和调试命令。

更多文章