python 自动识别并连接串口的实现

这个属于我项目中一个函数,跟大家分享一下我的思路及最终实现

在编写串口通信工具中,需要实现一个函数,自动找到对应com 口,并且连接该com口,保证后续通信正常
作为初始化过程的一部分。

思路

在win 设备管理器中,经常会出现多个com 口,但并不是每个com 口都是目标设备所链接的。
尝试打开每个com 口,输入enter 按键, 正确的com 口,会有ack log 返回,表明通信 正常

否则,没有任何log 返回,则判断为非目标设备所连接的com 口。

实现

尝试去打开所有com 口,然后发送enter, 如果在一段时间内有返回值,检查com 口收到的字节数,如果非零,则表明找到了对应的com 口。

完整测试代码如下:

import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime

# default value
baunRate = 115200
is_read = False
is_write = False
write_buff = []
sys_buff = []
mSerial = None
callback = None
is_opened = 0
is_registed = 0

class SerialPort:

  def __init__(self,port,buand):

    self.port = serial.Serial(port,buand)
    self.port.close()
    if not self.port.isOpen():
      self.port.open()

    #the index of data_bytes for read operation,私有属性
    #only used in read lines
    self.__read_ptr = 0
    self.__read_head = 0
    #store all read bytes
    # used in read date, read lines
    self.__data_bytes = bytearray()

  def port_open(self):
    if not self.port.isOpen():
      self.port.open()

  def port_close(self):
    self.port.close()

  def send(self):
    global is_write
    global write_buff

    while is_write:
      if len(write_buff):
        msg = write_buff.pop(0)
        msg = msg+"\n"
        cmd = msg.encode()
        try:

          self.port.write(cmd)
        except:
          write_buff.clear()
          is_write = False
    write_buff.clear()

  def read_data(self):
    global is_read
    global is_opened
    byte_cnt = 0
    while is_read:
      try:
        count = self.port.inWaiting()
        if count > 0:
          rec_str = self.port.read(count)
          self.__data_bytes = self.__data_bytes+rec_str
        #print("receive:",rec_str.decode())
          #print(rec_str)
          byte_cnt += count
          if not is_opened:
            is_opened = 1
      #print("累计收到:",byte_cnt)
      #time.sleep(0.5)
        self.read_lines()
      except:
        deinit()

  #将当前所有的数据都读出,读取位置不变,每次读取指针依次移动,不漏数据, 读取行为一直在进行
  def read_lines(self):
    #reset
    line_cnt = 0
    data_len = len(self.__data_bytes)
    #print ("")
    #print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len)
    if self.__read_ptr >=data_len:
      return
    #get all lines in current data_bytes
    while(self.__read_ptr < data_len-1):
      if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d):
        tmp = bytearray()
        tmp = self.__data_bytes[self.__read_head:self.__read_ptr]

        try:
          line = tmp.decode()
        except:
          self.__read_head = self.__read_ptr + 2
          self.__read_ptr = self.__read_head
          continue
        iprint(line)
        line_cnt += 1
        self.__read_head = self.__read_ptr + 2
        self.__read_ptr = self.__read_head
      else:
        self.__read_ptr = self.__read_ptr + 1

def auto_open_serial():
  global baunRate
  global mSerial
  global callback
  global is_registed
  global is_opened
  #reset
  deinit()
  # 列出所有当前的com口
  port_list = list(serial.tools.list_ports.comports())
  port_list_name = []
  #get all com
  if len(port_list) <= 0:
    iprint("the serial port can't find!")
    return False
  else:
    for itms in port_list:
      port_list_name.append(itms.device)
  #try open
  #print(port_list_name)
  for i in port_list_name:
    try:
      mSerial = SerialPort(i,baunRate)
      iprint("try open %s"%i)
      start_task()
      send("")
      #return True
      time.sleep(1)
      if is_opened:
        iprint("connect %s successfully"%i)
        return True
      else:
        deinit()
        if i == port_list_name[len(port_list_name)-1]:
          iprint("uart don't open")
          break
        continue
    except:
      iprint(" uart don't open")
  deinit()
  return False

def deinit():
  global mSerial
  global is_write
  global is_read
  global write_buff
  global is_opened

  if mSerial:
    mSerial.port_close()

  is_opened = 0
  is_read = False
  is_write = False
  write_buff = []

  mSerial = None
  time.sleep(1)

def init():
  global mSerial
  global callback
  global is_registed
  global is_opened
  global is_read
  global is_write
  #retry
  retry_time = 0
  while not auto_open_serial():
    if not is_opened:
      iprint("wait for uart connect, retry %s"%str(retry_time))
    else:
      return True
    retry_time += 1
    time.sleep(2)
    if retry_time == 10:
      iprint(" open uart fail")
      return False

def send(msg):
  global mSerial
  global is_write
  global write_buff
  if is_write:
    write_buff.append(msg)

def start_task():
  global mSerial
  global is_write
  global is_read

  if mSerial:
    is_write = True
    t1 = threading.Thread (target=mSerial.send)
    t1.setDaemon (False)
    t1.start ()

    is_read = True
    t2 = threading.Thread (target=mSerial.read_data)
    t2.setDaemon (False)
    t2.start ()

def iprint(msg):
  global callback
  global is_registed

  msg = "[Uart] "+str(msg)
  if is_registed:
    callback.append(msg)
  else:
    print(msg)

def start_sys_cmd():
  global is_registed
  if is_registed:
    t3 = threading.Thread (target=process_receive_sys_cmd)
    t3.setDaemon (False)
    t3.start()

def process_receive_sys_cmd():
  global sys_buff
  global is_registed
  global callback
  #print("process_receive_sys_cmd")
  while is_registed:
    #print ("wait,process_receive_sys_cmd")
    if len(sys_buff):
      #print ("receive,process_receive_sys_cmd")
      line = sys_buff.pop(0)
      if "init" in line:
        if is_opened and is_read and is_write:
          iprint("already open uart")
          break
        iprint("start init")
        init()
    if is_opened:
      break
  iprint("Eixt uart sys thread")

def register_cback(list):
  global callback
  global is_registed

  callback = list
  is_registed = 1

def unregister_cback():
  global callback
  callback.clear()

if __name__ == '__main__':

  receive = []
  register_cback(receive)
  sys_buff.append("init")
  start_sys_cmd()

  def process_receive_msg():
    global receive
    while True:
      #print("wait")
      if len(receive):
        #print("receive")
        print(receive.pop(0))

  t = threading.Thread(target=process_receive_msg)
  t.setDaemon(False)
  t.start()

到此这篇关于python 自动识别并连接串口的实现的文章就介绍到这了,更多相关python 自动识别并连接串口内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python3简单实现串口通信的方法

    如下所示: import serial import sys import os import time import re def wait_for_cmd_OK():     while True:         line = ser.readline()         try:             print(line.decode('utf-8'),end='')         except:             pass         if ( re.search(b'

  • 使用python3实现操作串口详解

    通过引用serial模块包,来操作串口. 1.查看串口名称 在Linux和Windows中,串口的名字规则不太一样. 需要事先查看. Linux下的查看串口命令 root@D2:~# ls -l /dev/ttyS* crw-rw---- 1 root dialout 4, 64 Dec 26 06:53 /dev/ttyS0 crw-rw---- 1 root dialout 4, 65 Dec 26 06:41 /dev/ttyS1 crw--w---- 1 root tty     4,

  • 对python3 Serial 串口助手的接收读取数据方法详解

    其实网上已经有许多python语言书写的串口,但大部分都是python2写的,没有找到一个合适的python编写的串口助手,只能自己来写一个串口助手,由于我只需要串口能够接收读取数据就可以了,故而这个串口助手只实现了数据的接收读取. 创建串口助手首先需要创建一个类,重构类的实现过程如下: #coding=gb18030 import threading import time import serial class ComThread: def __init__(self, Port='COM3

  • 在windows下使用python进行串口通讯的方法

    Windows版本下的python并没有内置串口通讯的pyserial的库,所以需要自己下载.参照了网上的教程,有许多用的pip的安装方式,但是试了几个都没有用,所以想到用GitHub下载库文件,步骤分为: 1.在Github下载python-serial的库 https://github.com/pyserial/pyserial 2.下载完成后解压压缩包,找到serial文件夹,并找到python的安装位置(右击IDLE,然后查看python安装位置).我的地址为:C:\Users\NI Y

  • python实现串口通信的示例代码

    1 硬件设备 TTL串口摄像头(VC0706) USB转TTL烧录器 2 serial安装 第一次安装的是serial的包导包的时候发现下载错了,正确应该是pyserial.安装后直接import就可以了. 3 实现串口通信 3.1 发现端口 Windows下为COM(N, N=1.2...), Ubuntu下为'/dev/ttyS0'.Windows初学者,可以给您一下两种方式确定端口号. 方法一:输入在终端(cmd)中输入 python -m serial.tools.list_ports

  • Python 串口读写的实现方法

    1.安装pyserial https://pypi.python.org/pypi/pyserial Doc:http://pythonhosted.org/pyserial/ 使用Python Package Index (PyPi) pip install pyserial-3.1.1-py2.py3-none-any.whl 2. Demo import serial from time import sleep def recv(serial): while True: data = s

  • Python 读取串口数据,动态绘图的示例

    最近工作需要把单片机读取的传感器电压数据实时在PC上通过曲线显示出来,刚好在看python, 就试着用了python 与uart端口通讯,并且通过matplotlib.pyplot 模块实时绘制图形出来. 1. 废话少说,上图 因为没有UI,运行时需要在提示符下输入串口相关参数,com端口,波特率... 代码如下: #-*- coding: utf-8 -*- # 串口测试程序 import serial import matplotlib.pyplot as plt import numpy

  • Python使用pyserial进行串口通信的实例

    安装pyserial pip install pyserial 查看可用的端口 # coding:utf-8 import serial.tools.list_ports plist = list(serial.tools.list_ports.comports()) if len(plist) <= 0: print("没有发现端口!") else: plist_0 = list(plist[0]) serialName = plist_0[0] serialFd = seri

  • 使用Python串口实时显示数据并绘图的例子

    使用pyserial进行串口传输 一.安装pyserial以及基本用法 在cmd下输入命令pip install pyserial 注:升级pip后会出现 "'E:\Anaconda3\Scripts\pip-script.py' is not present."错误 使用 easy_install pip命令就能解决,换一条重新能执行安装的命令 常用方法: ser = serial.Serial(0) 是打开第一个串口 print ser.portstr 能看到第一个串口的标识,wi

  • Python操作串口的方法

    本文实例讲述了Python操作串口的方法.分享给大家供大家参考.具体如下: 首先需确保安装了serial模块,如果没安装的话就安装一下python-pyserial. 一个Python实现的串口Echo import serial import sys try: ser = serial.Serial('/dev/ttyUSB0', 9600) except Exception, e: print 'open serial failed.' exit(1) print 'A Serial Ech

随机推荐