1. 主页 > 大智慧

Python工业控制实战:3步解决传感器串口数据采集难题

<场景设定>
某智能工厂新部署的温湿度传感器频繁出现数据中断,运维工程师小王需要快速实现:

  1. 通过485串口实时采集传感器数据
  2. 异常数据自动重发采集指令
  3. 数据格式转换与本地存储

一、环境搭建避坑指南

python复制
# 特别注意Windows/Linux环境差异
!pip install pyserial  # 安装核心库

import serial
from serial.tools import list_ports

# 自动检测可用串口(解决设备识别问题)
def find_serial():
    ports = list_ports.comports()
    for port in ports:
        if "USB-SERIAL" in port.description:  # 关键过滤条件
            return port.device
    raise Exception("未检测到有效串口设备")

二、工业级数据采集方案

步骤1:稳定连接配置

python复制
ser = serial.Serial(
    port=find_serial(),
    baudrate=9600,       # 典型工业设备速率
    bytesize=serial.EIGHTBITS,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    timeout=1            # 超时设置避免死锁
)

步骤2:异常处理读写

python复制
def safe_read(ser):
    try:
        if ser.in_waiting > 0:
            data = ser.read_all()
            return data.decode('utf-8').strip()
        return None
    except serial.SerialException as e:
        print(f"采集异常: {str(e)}")
        ser.close()
        ser.open()  # 自动重连机制

步骤3:数据帧解析

python复制
# 处理典型工业格式:STX+数据+ETX
def parse_sensor_data(raw):
    if not raw.startswith(b'\x02') or not raw.endswith(b'\x03'):
        return None
    
    values = raw[1:-1].decode().split(',')
    return {
        'temp': float(values[0]), 
        'humidity': float(values[1]),
        'timestamp': datetime.now().isoformat()
    }

三、生产环境验证方案

python复制
# 压力测试脚本(连续运行24小时)
with open('sensor_log.csv', 'a') as f:
    while True:
        ser.write(b'\x01\x03\x00\x00\x00\x02\xC4\x0B')  # 标准MODBUS指令
        raw = safe_read(ser)
        if parsed := parse_sensor_data(raw):
            f.write(f"{parsed['temp']},{parsed['humidity']}\n")
        time.sleep(1)

<工业场景常见问题>

  1. ??权限问题??:Linux环境下添加用户到dialout组
  2. ??数据错位??:增加CRC校验模块
  3. ??响应延迟??:动态调整timeout参数
  4. ??多设备冲突??:采用RS485转USB集线器

<代码模板>

python复制
# 工业级串口通信模板(含异常重试机制)
class IndustrialSerial:
    def __init__(self, max_retry=3):
        self.max_retry = max_retry
        
    def execute_command(self, cmd):
        for _ in range(self.max_retry):
            try:
                self.ser.write(cmd)
                return self._parse_response()
            except Exception as e:
                print(f"第{_+1}次重试,错误:{str(e)}")
                self._reconnect()
        raise TimeoutError("超过最大重试次数")

这种写法特点:

  1. 基于真实工业场景设计代码结构
  2. 重点解决数据采集稳定性问题
  3. 包含可直接复用的生产级代码模板
  4. 突出工业环境特有的异常处理逻辑
  5. 符合MES/SCADA系统集成需求

本文由嘻道妙招独家原创,未经允许,严禁转载