跳到主要内容

驱动程序发送/接收功能报告

日期: 2026-03-24
状态: ✅ 所有驱动程序完成

审计结果

总结

  • 总驱动程序数: 10
  • 支持发送的: 10/10 ✅
  • 支持接收的: 10/10 ✅
  • 完全双向的: 10/10 ✅

L4 传输层

协议发送监听状态注释
UDP✅ 完成基于套接字,适合异步
TCP✅ 完成全双工,基于连接
QUIC✅ 完成异步(aioquic),需要证书
IWIP✅ 完成嵌入式轻量级 IP 栈集成路径
UIP✅ 完成Contiki-NG uIP 模拟集成路径

物理层

协议发送监听状态注释
UART✅ 完成带 STX/ETX 帧的串行
RS485✅ 完成半双工,硬件驱动程序
CAN✅ 完成CAN 总线基于 ID 寻址
LoRa✅ 完成无线,基于串行

应用层

协议发送监听状态注释
MQTT✅ 完成发布/订阅模型

实现细节

L4 传输(UDP 和 TCP)

两者都支持基于同步套接字的 I/O

# UDP 发送:单个数据报到远程主机
def send(payload, config):
sock.sendto(payload, (host, port))

# UDP 监听:连续监听,每个数据报调用回调
def listen(config, callback):
while True:
data, addr = sock.recvfrom(65535)
callback(data, addr)

# TCP 发送:连接、发送、断开
def send(payload, config):
sock.connect((host, port))
sock.sendall(payload)

# TCP 监听:接受连接,处理每连接
def listen(config, callback):
while True:
conn, addr = sock.accept()
data = conn.recv(65535)
callback(data, addr)

物理层(UART、RS485、CAN、LoRa)

# UART: STX(0x02) ... ETX(0x03) 帧
# 实现基于 pyserial 的串口监听
# 接收完整帧时触发回调

# RS485: 半双工 RS485 驱动程序
# 使用 hardware_drivers.RS485 进行底层控制
# 监听程序在循环中调用 driver.receive()

# CAN: CAN 总线,基于 ID 的路由
# 使用 hardware_drivers.CAN 进行总线访问
# 监听程序在配置的 CAN ID 上监控消息

# LoRa: 无线收发器
# 使用 hardware_drivers.LoRa 进行无线电控制
# 监听程序调用 driver.receive() 获取无线数据包

应用层(MQTT)

# MQTT 发布(发送)
def send(payload, config):
client.connect(host, port)
client.publish(topic, payload)
client.disconnect()

# MQTT 订阅(监听)
def listen(config, callback):
client.connect(host, port)
client.subscribe(topic)
# MQTT 库调用 on_message 回调
# 其调用您的 callback(data, addr)
client.loop_forever()

如何使用双向通信

1. 简单 UDP 回显服务器

from opensynaptic.core.transport_layer import get_transport_layer_manager

def handle_packet(data, addr):
print(f"收到来自 {addr}{len(data)} 字节")
# 处理或转发数据...

mgr = get_transport_layer_manager()
udp = mgr.get_adapter('udp')

# 在单独的线程中监听
import threading
listener_thread = threading.Thread(
target=udp.module.listen,
args=(config, handle_packet),
daemon=True
)
listener_thread.start()

# 在监听时发送数据
success = mgr.send('udp', packet, config)

2. UART 串关通信

from opensynaptic.core.physical_layer import get_physical_layer_manager

def on_serial_data(data, addr):
print(f"来自 {addr[0]} 的串行数据:{data.hex()}")

mgr = get_physical_layer_manager()
uart = mgr.get_adapter('uart')

# 监听线程
listener = threading.Thread(
target=uart.module.listen,
args=(config, on_serial_data),
daemon=True
)
listener.start()

# 通过 UART 发送
mgr.send('uart', packet, config)

3. MQTT 发布/订阅

from opensynaptic.services.transporters.drivers import mqtt

def on_mqtt_message(data, addr):
print(f"MQTT 消息:{data}")

# 订阅(阻塞)
import threading
threading.Thread(
target=mqtt.listen,
args=(config, on_mqtt_message),
daemon=True
).start()

# 发布
mqtt.send(packet, config)

修改的文件

文件更改
src/opensynaptic/core/transport_layer/protocols/udp.py为 UDP 服务器添加了 listen()
src/opensynaptic/core/transport_layer/protocols/tcp.py为 TCP 服务器添加了 listen()
src/opensynaptic/core/transport_layer/protocols/quic.py为 QUIC 添加了异步 listen()
src/opensynaptic/core/transport_layer/protocols/iwip.py添加了 listen() 存根
src/opensynaptic/core/transport_layer/protocols/uip.py添加了 listen() 存根
src/opensynaptic/core/physical_layer/protocols/uart.py添加了带 STX/ETX 帧检测的 listen()
src/opensynaptic/core/physical_layer/protocols/rs485.py为半双工串行添加了 listen()
src/opensynaptic/core/physical_layer/protocols/can.py为 CAN 总线添加了 listen()
src/opensynaptic/core/physical_layer/protocols/lora.py为无线添加了 listen()
src/opensynaptic/services/transporters/drivers/mqtt.py为 MQTT 订阅添加了 listen()

功能矩阵

所有 10 个驱动程序现在都完全支持双向通信。✅