DDS (Data Distribution Service) 学习笔记
DDS (Data Distribution Service) 学习笔记
1. DDS 核心原理
DDS 的强大之处在于其“去中心化”和“自动发现”机制,即在局域网内无需配置目标 IP 即可互相通信 。这主要归功于其底层的 RTPS (Real-Time Publish-Subscribe) 协议,该协议分为两个主要阶段:
1.1 自动发现机制 (SPDP & SEDP)
- 参与者发现阶段 (SPDP):底层依赖组播 (Multicast) 。当节点启动时,会自动向局域网内预先约定好的公共组播 IP(如 239.255.0.1)发送“心跳包”(Hello 消息) 。这使得同一网络下的 DDS 节点能瞬间互相认识并记录对方的真实单播 IP 。
- 端点发现阶段 (SEDP):互相认识后,节点间建立单播连接,开始交换详细信息(如发送器/接收器的主题名称、数据类型、QoS 策略) 。匹配成功后,数据通道自动打通 。
1.2 组播 (Multicast) vs 广播 (Broadcast)
- 广播:向全网所有设备发送数据,极度消耗带宽,且通常会被现代企业交换机在底层直接拦截 。
- 组播:采用“频道订阅”逻辑 。DDS 节点通过 IGMP 协议向交换机申请加入特定的虚拟频道(如 239.255.0.1) 。交换机只会把数据包精准转发给加入了该频道的设备,避免了网络风暴 。
2. Python 接入与工程化实战
在 Python 中接入 DDS,推荐使用 Eclipse Cyclone DDS 结合现代化的包管理工具 uv 。
2.1 基于 uv 的现代工程化构建
使用 pyproject.toml 是目前最规范的做法,可实现优雅的依赖管理和环境隔离 。
- 初始化项目:执行
uv init dds_demo生成标准项目骨架 。 - 安装依赖:执行
uv add cyclonedds,uv 会自动创建隐藏虚拟环境并更新pyproject.toml。 - 运行代码:直接使用
uv run python_script.py即可执行 。
2.2 核心代码示例
在同一个项目目录下,创建以下文件以实现 DDS 通讯:
1. 定义数据模型 (data_model.py) DDS 需要统一的数据结构。Python API 允许使用 @dataclass 快速定义 IDL 模型 。
from dataclasses import dataclass
from cyclonedds.idl import IdlStruct
@dataclass
class ChatMessage(IdlStruct, typename="MyDomain.ChatMessage"):
msg_id: int
sender_name: str
content: str
2. 单向通讯:发送端 (publisher.py)
import time
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from data_model import ChatMessage
def main():
participant = DomainParticipant()
topic = Topic(participant, 'ChatRoomTopic', ChatMessage)
writer = DataWriter(participant, topic)
print("启动 DDS Publisher,开始发送消息...")
count = 0
try:
while True:
count += 1
msg = ChatMessage(
msg_id=count,
sender_name="PythonNode",
content=f"基于 pyproject.toml 的 DDS 消息! 序号: {count}"
)
writer.write(msg)
print(f"[发送] ID: {msg.msg_id} | 发送者: {msg.sender_name} | 内容: {msg.content}")
time.sleep(1)
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,停止发送。")
if __name__ == '__main__':
main()
3. 单向通讯:接收端 (subscriber.py)
import time
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.sub import DataReader
from data_model import ChatMessage
def main():
participant = DomainParticipant()
topic = Topic(participant, 'ChatRoomTopic', ChatMessage)
reader = DataReader(participant, topic)
print("启动 DDS Subscriber,等待接收消息...")
try:
while True:
for msg in reader.take_iter():
print(f"[接收] ID: {msg.msg_id} | 发送者: {msg.sender_name} | 内容: {msg.content}")
time.sleep(0.1)
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,停止接收。")
if __name__ == '__main__':
main()
4. 双向通讯实战 (chat_node.py) DDS 天然支持多向/双向通讯。只需在同一个程序中,同时实例化 DataWriter (发送器) 和 DataReader (接收器) 即可 。
多线程模型:推荐开启两个线程,一个主线程负责监听终端输入并发送,一个后台线程循环阻塞读取网络消息,即可实现类似聊天室的双向无缝通讯 。
import time
import threading
import sys
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from data_model import ChatMessage
def receive_messages(reader, my_name):
"""
后台线程:负责持续监听并打印接收到的消息
"""
try:
while True:
# take_iter 会获取所有新到达的数据
for msg in reader.take_iter():
# 过滤掉自己发送的消息,只看别人发的
if msg.sender_name != my_name:
# 使用 \r 清除当前输入行,打印消息后重新显示输入提示符,防止控制台字符错乱
sys.stdout.write(f"\r\033[K[收到 - {msg.sender_name}]: {msg.content}\n")
sys.stdout.write(f"{my_name} > ")
sys.stdout.flush()
time.sleep(0.1) # 避免占用过高CPU
except Exception:
pass
def main():
# 1. 启动前让用户输入一个昵称,用于区分不同的终端
my_name = input("请输入你的节点昵称 (例如 Alice 或 Bob): ").strip()
if not my_name:
my_name = "Anonymous"
# 2. 初始化 DDS 核心组件
participant = DomainParticipant()
# 所有人都在同一个 Topic 频道里聊天
topic = Topic(participant, 'ChatRoomTopic', ChatMessage)
# 3. 这个节点既是发送者,也是接收者
writer = DataWriter(participant, topic)
reader = DataReader(participant, topic)
print(f"\n=== 欢迎加入 DDS 聊天室,{my_name}!(输入 'exit' 退出) ===")
# 4. 启动后台接收线程
recv_thread = threading.Thread(target=receive_messages, args=(reader, my_name), daemon=True)
recv_thread.start()
# 5. 主线程:负责处理用户输入并发送
count = 0
try:
while True:
# 等待用户输入
text = input(f"{my_name} > ")
if text.lower() == 'exit':
break
if not text:
continue
count += 1
# 构建消息并发送
msg = ChatMessage(msg_id=count, sender_name=my_name, content=text)
writer.write(msg)
except KeyboardInterrupt:
pass
finally:
print("\n正在退出 DDS 网络...")
if __name__ == '__main__':
main()
3. 局域网使用注意事项与排障
DDS 自动发现高度依赖底层网络环境。如果在局域网内无法通信,通常由以下“四大元凶”导致:
- 组播被禁用:公司网络或公共 Wi-Fi 的交换机层面掐断了组播包(最常见)。
- 防火墙拦截:操作系统(Windows Defender、ufw 等)拦截了 DDS 默认的 UDP 端口流量 。
- 多网卡迷路:虚拟机、VPN 或 WSL 导致节点选错网卡,把握手包发到了黑洞网段 。
- AP 隔离:公共 Wi-Fi 限制了设备间的物理互通 。
3.1 强制单播发现 (组播失效时的终极方案)
当组播不可用时,可启用单播发现 (Unicast Discovery),强行指定队友 IP 。
操作方法:
创建一个 XML 配置文件(例如 cyclonedds.xml):
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config">
<Domain id="any">
<General>
<AllowMulticast>false</AllowMulticast>
</General>
<Discovery>
<Peers>
<Peer address="192.168.1.100"/>
<Peer address="192.168.1.200"/>
</Peers>
</Discovery>
</Domain>
</CycloneDDS>
运行 Python 代码前,通过环境变量加载此配置 :
- Linux/macOS:
export CYCLONEDDS_URI=file://$PWD/cyclonedds.xml - Windows:
set CYCLONEDDS_URI=file://%CD%\cyclonedds.xml
配置策略:
- 全连接模式 (2-5个节点):全员互相将其他节点的 IP 填入配置 。
- 种子节点模式 (>5个节点):选出 1-2 台性能稳定的机器作为“种子节点”,其余所有普通节点只指向种子节点即可完成全网拓扑发现 。
4. 资源冲突与高级配置 (IP与端口)
4.1 组播 IP 的修改与冲突
占用特性:组播 IP (如 239.255.0.1) 类似于电台频段,不存在“独占”报错。但如果多个无关应用同时向该 IP 发送数据,会导致 DDS 底层收到垃圾包,消耗 CPU 资源进行丢弃(即“串台”)。
修改方法: 可通过编写 cyclonedds.xml 配置文件强行指定新的组播地址,并通过环境变量 CYCLONEDDS_URI 加载 。
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config">
<Domain id="any">
<Discovery>
<MulticastRecvAddr>239.255.0.2</MulticastRecvAddr>
</Discovery>
</Domain>
</CycloneDDS>
4.2 端口分配机制对比
DDS 的底层端口分配有严格的公式规范,通常基于核心变量 Domain ID 。
| 端口类型 | 计算公式 | 冲突应对机制 |
|---|---|---|
| 组播发现端口 | Port_multicast = 7400 + (250 X DomainId) | 绝对固定。若被其他软件占用,DDS 节点将直接启动失败,不会自动更换 。 |
| 单播数据端口 | Port_unicast = 7400 + (250 X DomainId) + 10 + (2 X ParticipantId) | 智能自适应。若在同电脑启动多个节点,DDS 会自动递增 ParticipantId,自动更换单播端口直至成功 。 |
4.3 终极防冲突大招:修改 Domain ID
如果担心组播 IP 串台或默认端口 (7400) 被其他软件占用,最优雅的解决方案是在代码层面修改 Domain ID 。
- 原理:Domain 是 DDS 物理隔离的最高边界 。
- 操作:在代码实例化
DomainParticipant时传入非 0 数字即可(不同 Domain 之间的节点完全不互通,计算出的底层端口也完全隔离)。
代码实现:
from cyclonedds.domain import DomainParticipant
# 默认 Domain ID 为 0
# 传入非 0 数字即可将其放入一个全新的、隔离的物理网络域中
dp = DomainParticipant(domain_id=42)