跳至主要內容

DDS (Data Distribution Service) 学习笔记

apzs...大约 7 分钟

DDS (Data Distribution Service) 学习笔记

1. DDS 核心原理

DDS 的强大之处在于其“去中心化”和“自动发现”机制,即在局域网内无需配置目标 IP 即可互相通信 。这主要归功于其底层的 RTPS (Real-Time Publish-Subscribe) 协议,该协议分为两个主要阶段:

1.1 自动发现机制 (SPDP & SEDP)

  • 参与者发现阶段 (SPDP):底层依赖组播 (Multicast) 。当节点启动时,会自动向局域网内预先约定好的公共组播 IP(如 239.255.0.1)发送“心跳包”(Hello 消息) 。这使得同一网络下的 DDS 节点能瞬间互相认识并记录对方的真实单播 IP 。
  • 端点发现阶段 (SEDP):互相认识后,节点间建立单播连接,开始交换详细信息(如发送器/接收器的主题名称、数据类型、QoS 策略) 。匹配成功后,数据通道自动打通 。

1.2 组播 (Multicast) vs 广播 (Broadcast)

  • 广播:向全网所有设备发送数据,极度消耗带宽,且通常会被现代企业交换机在底层直接拦截 。
  • 组播:采用“频道订阅”逻辑 。DDS 节点通过 IGMP 协议向交换机申请加入特定的虚拟频道(如 239.255.0.1) 。交换机只会把数据包精准转发给加入了该频道的设备,避免了网络风暴 。

2. Python 接入与工程化实战

在 Python 中接入 DDS,推荐使用 Eclipse Cyclone DDS 结合现代化的包管理工具 uv

2.1 基于 uv 的现代工程化构建

使用 pyproject.toml 是目前最规范的做法,可实现优雅的依赖管理和环境隔离 。

  1. 初始化项目:执行 uv init dds_demo 生成标准项目骨架 。
  2. 安装依赖:执行 uv add cyclonedds,uv 会自动创建隐藏虚拟环境并更新 pyproject.toml
  3. 运行代码:直接使用 uv run python_script.py 即可执行 。

2.2 核心代码示例

在同一个项目目录下,创建以下文件以实现 DDS 通讯:

1. 定义数据模型 (data_model.py) DDS 需要统一的数据结构。Python API 允许使用 @dataclass 快速定义 IDL 模型 。

from dataclasses import dataclass
from cyclonedds.idl import IdlStruct

@dataclass
class ChatMessage(IdlStruct, typename="MyDomain.ChatMessage"):
    msg_id: int
    sender_name: str
    content: str

2. 单向通讯:发送端 (publisher.py)

import time
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from data_model import ChatMessage


def main():
    participant = DomainParticipant()
    topic = Topic(participant, 'ChatRoomTopic', ChatMessage)
    writer = DataWriter(participant, topic)

    print("启动 DDS Publisher,开始发送消息...")
    count = 0

    try:
        while True:
            count += 1
            msg = ChatMessage(
                msg_id=count,
                sender_name="PythonNode",
                content=f"基于 pyproject.toml 的 DDS 消息! 序号: {count}"
            )
            writer.write(msg)
            print(f"[发送] ID: {msg.msg_id} | 发送者: {msg.sender_name} | 内容: {msg.content}")
            time.sleep(1)

    except KeyboardInterrupt:
        print("\n检测到 Ctrl+C,停止发送。")


if __name__ == '__main__':
    main()

3. 单向通讯:接收端 (subscriber.py)

import time
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.sub import DataReader
from data_model import ChatMessage


def main():
    participant = DomainParticipant()
    topic = Topic(participant, 'ChatRoomTopic', ChatMessage)
    reader = DataReader(participant, topic)

    print("启动 DDS Subscriber,等待接收消息...")

    try:
        while True:
            for msg in reader.take_iter():
                print(f"[接收] ID: {msg.msg_id} | 发送者: {msg.sender_name} | 内容: {msg.content}")
            time.sleep(0.1)

    except KeyboardInterrupt:
        print("\n检测到 Ctrl+C,停止接收。")


if __name__ == '__main__':
    main()

4. 双向通讯实战 (chat_node.py) DDS 天然支持多向/双向通讯。只需在同一个程序中,同时实例化 DataWriter (发送器) 和 DataReader (接收器) 即可 。

多线程模型:推荐开启两个线程,一个主线程负责监听终端输入并发送,一个后台线程循环阻塞读取网络消息,即可实现类似聊天室的双向无缝通讯 。

import time
import threading
import sys
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from data_model import ChatMessage


def receive_messages(reader, my_name):
    """
    后台线程:负责持续监听并打印接收到的消息
    """
    try:
        while True:
            # take_iter 会获取所有新到达的数据
            for msg in reader.take_iter():
                # 过滤掉自己发送的消息,只看别人发的
                if msg.sender_name != my_name:
                    # 使用 \r 清除当前输入行,打印消息后重新显示输入提示符,防止控制台字符错乱
                    sys.stdout.write(f"\r\033[K[收到 - {msg.sender_name}]: {msg.content}\n")
                    sys.stdout.write(f"{my_name} > ")
                    sys.stdout.flush()

            time.sleep(0.1)  # 避免占用过高CPU
    except Exception:
        pass


def main():
    # 1. 启动前让用户输入一个昵称,用于区分不同的终端
    my_name = input("请输入你的节点昵称 (例如 Alice 或 Bob): ").strip()
    if not my_name:
        my_name = "Anonymous"

    # 2. 初始化 DDS 核心组件
    participant = DomainParticipant()
    # 所有人都在同一个 Topic 频道里聊天
    topic = Topic(participant, 'ChatRoomTopic', ChatMessage)

    # 3. 这个节点既是发送者,也是接收者
    writer = DataWriter(participant, topic)
    reader = DataReader(participant, topic)

    print(f"\n=== 欢迎加入 DDS 聊天室,{my_name}!(输入 'exit' 退出) ===")

    # 4. 启动后台接收线程
    recv_thread = threading.Thread(target=receive_messages, args=(reader, my_name), daemon=True)
    recv_thread.start()

    # 5. 主线程:负责处理用户输入并发送
    count = 0
    try:
        while True:
            # 等待用户输入
            text = input(f"{my_name} > ")

            if text.lower() == 'exit':
                break
            if not text:
                continue

            count += 1
            # 构建消息并发送
            msg = ChatMessage(msg_id=count, sender_name=my_name, content=text)
            writer.write(msg)

    except KeyboardInterrupt:
        pass
    finally:
        print("\n正在退出 DDS 网络...")


if __name__ == '__main__':
    main()

3. 局域网使用注意事项与排障

DDS 自动发现高度依赖底层网络环境。如果在局域网内无法通信,通常由以下“四大元凶”导致:

  1. 组播被禁用:公司网络或公共 Wi-Fi 的交换机层面掐断了组播包(最常见)。
  2. 防火墙拦截:操作系统(Windows Defender、ufw 等)拦截了 DDS 默认的 UDP 端口流量 。
  3. 多网卡迷路:虚拟机、VPN 或 WSL 导致节点选错网卡,把握手包发到了黑洞网段 。
  4. AP 隔离:公共 Wi-Fi 限制了设备间的物理互通 。

3.1 强制单播发现 (组播失效时的终极方案)

当组播不可用时,可启用单播发现 (Unicast Discovery),强行指定队友 IP 。

操作方法:

创建一个 XML 配置文件(例如 cyclonedds.xml):

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config">
    <Domain id="any">
        <General>
            <AllowMulticast>false</AllowMulticast>
        </General>
        <Discovery>
            <Peers>
                <Peer address="192.168.1.100"/>
                <Peer address="192.168.1.200"/>
            </Peers>
        </Discovery>
    </Domain>
</CycloneDDS>

运行 Python 代码前,通过环境变量加载此配置 :

  • Linux/macOS: export CYCLONEDDS_URI=file://$PWD/cyclonedds.xml
  • Windows: set CYCLONEDDS_URI=file://%CD%\cyclonedds.xml

配置策略

  • 全连接模式 (2-5个节点):全员互相将其他节点的 IP 填入配置 。
  • 种子节点模式 (>5个节点):选出 1-2 台性能稳定的机器作为“种子节点”,其余所有普通节点只指向种子节点即可完成全网拓扑发现 。

4. 资源冲突与高级配置 (IP与端口)

4.1 组播 IP 的修改与冲突

占用特性:组播 IP (如 239.255.0.1) 类似于电台频段,不存在“独占”报错。但如果多个无关应用同时向该 IP 发送数据,会导致 DDS 底层收到垃圾包,消耗 CPU 资源进行丢弃(即“串台”)。

修改方法: 可通过编写 cyclonedds.xml 配置文件强行指定新的组播地址,并通过环境变量 CYCLONEDDS_URI 加载 。

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config">
    <Domain id="any">
        <Discovery>
            <MulticastRecvAddr>239.255.0.2</MulticastRecvAddr>
        </Discovery>
    </Domain>
</CycloneDDS>

4.2 端口分配机制对比

DDS 的底层端口分配有严格的公式规范,通常基于核心变量 Domain ID

端口类型计算公式冲突应对机制
组播发现端口Port_multicast = 7400 + (250 X DomainId)绝对固定。若被其他软件占用,DDS 节点将直接启动失败,不会自动更换 。
单播数据端口Port_unicast = 7400 + (250 X DomainId) + 10 + (2 X ParticipantId)智能自适应。若在同电脑启动多个节点,DDS 会自动递增 ParticipantId,自动更换单播端口直至成功 。

4.3 终极防冲突大招:修改 Domain ID

如果担心组播 IP 串台或默认端口 (7400) 被其他软件占用,最优雅的解决方案是在代码层面修改 Domain ID

  • 原理:Domain 是 DDS 物理隔离的最高边界 。
  • 操作:在代码实例化 DomainParticipant 时传入非 0 数字即可(不同 Domain 之间的节点完全不互通,计算出的底层端口也完全隔离)。

代码实现:

from cyclonedds.domain import DomainParticipant

# 默认 Domain ID 为 0
# 传入非 0 数字即可将其放入一个全新的、隔离的物理网络域中
dp = DomainParticipant(domain_id=42)
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.0.0-alpha.8