SoFunction
Updated on 2024-11-16

Python serial communication (pyserial) process analysis

The pyserial module encapsulates access to the serial port and is compatible with various platforms.

mounting

pip insatll pyserial

initialization

Simple Initialization Example

import serial
ser = ('com1', 9600, timeout=1)

All parameters

ser = (
port=None,       # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
# if no port is specified an unconfigured
# an closed serial port object is created
baudrate=9600,     # baud rate
bytesize=EIGHTBITS,   # number of databits
parity=PARITY_NONE,   # enable parity checking
stopbits=STOPBITS_ONE, # number of stopbits
timeout=None,      # set a timeout value, None for waiting forever
xonxoff=0,       # enable software flow control
rtscts=0,        # enable RTS/CTS flow control
interCharTimeout=None  # Inter-character timeout, None to disable
)

Initialization for different platforms

ser=("/dev/ttyUSB0",9600,timeout=0.5) # Use USB to connect to the serial port
ser=("/dev/ttyAMA0",9600,timeout=0.5) # Use the Raspberry Pi's GPIO port to connect to the serial port
ser=(1,9600,timeout=0.5)#winsows system use com1 port to connect serial port
ser=("com1",9600,timeout=0.5)#winsows system use com1 port to connect serial port
ser=("/dev/ttyS1",9600,timeout=0.5)#Linuxsystem utilizationcom1Port Connection Serial Port

Classes (additionally initialized methods)

class ()
{
  def __init__(port=None, baudrate=9600, bytesize=EIGHTBITS,parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=None, xonxoff=False, rtscts=False, writeTimeout=None, dsrdtr=False, interCharTimeout=None)
}

ser object properties

name:device name
port:Read or Write Ports
baudrate:baud
bytesize:byte size
parity:check digit
stopbits:stop bit
timeout:Read Timeout Setting
writeTimeout:write timeout
xonxoff:software flow control
rtscts:hardware flow control
dsrdtr:hardware flow control
interCharTimeout:Character Interval Timeout

Common methods of the ser object

():Check if the port is open。
() :Open the port‘。
():Shut down the port。
():Read byte data from port。default (setting)1bytes。
ser.read_all():Receive all data from the port。
("hello"):Write data to port。
():Read a line of data。
():Read multiple rows of data。
in_waiting():Returns the number of bytes in the receive cache。
flush():Wait for all data to be written。
flushInput():Discard all data in the receive cache。
flushOutput():Terminate the current write operation,and discard the data in the send cache。

Package Reference

import serial
import .list_ports

class Communication():

  # Initialization
  def __init__(self,com,bps,timeout):
     = com
     = bps
     =timeout
    global Ret
    try:
      # Open the serial port and get the serial port object
       self.main_engine= (,,timeout=)
      # Determine if opening was successful
       if (self.main_engine.is_open):
        Ret = True
    except Exception as e:
      print("--- anomalies ---:", e)

  # Printing device basic information
  def Print_Name(self):
    print(self.main_engine.name) # Device name
    print(self.main_engine.port)# Read or write ports
    print(self.main_engine.baudrate)# baud rate
    print(self.main_engine.bytesize)# Byte size
    print(self.main_engine.parity)# check digits
    print(self.main_engine.stopbits)# stop bits
    print(self.main_engine.timeout)#Read timeout settings
    print(self.main_engine.writeTimeout)#Write timeout
    print(self.main_engine.xonxoff)# Software flow control
    print(self.main_engine.rtscts)# Software flow control
    print(self.main_engine.dsrdtr)# Hardware flow control
    print(self.main_engine.interCharTimeout)# Character interval timeout

  # Open the serial port
  def Open_Engine(self):
    self.main_engine.open()

  # Close the serial port
  def Close_Engine(self):
    self.main_engine.close()
    print(self.main_engine.is_open) # Check if the serial port is open

  # Print a list of available serial ports
  @staticmethod
  def Print_Used_Com():
    port_list = list(.list_ports.comports())
    print(port_list)





  # Receive data of a specified size
  # Reads SIZE bytes from the serial port. If a timeout is specified, fewer bytes may be returned after the timeout; if no timeout is specified, it waits until the specified number of bytes has been collected.
  def Read_Size(self,size):
    return self.main_engine.read(size=size)

  # Receive a line of data
  # When using readline() you should be careful: you should specify a timeout when opening the serial port, otherwise it will wait if the serial port does not receive a new line.
  # If there is no timeout, readline will report an exception.
  def Read_Line(self):
    return self.main_engine.readline()

  # of data sent
  def Send_data(self,data):
    self.main_engine.write(data)

  #More Examples
  # self.main_engine.write(chr(0x06).encode("utf-8")) # hexadecimal sends a data
  # print(self.main_engine.read().hex()) # # Hexadecimal reading reads one byte
  # print(self.main_engine.read()) # read a byte
  # print(self.main_engine.read(10).decode("gbk")) # read ten bytes
  # print(self.main_engine.readline().decode("gbk"))#read a line
  # print(self.main_engine.readlines()) # read multiple lines, return list, must match timeout to use
  # print(self.main_engine.in_waiting)# Get the number of bytes left in the input buffer
  # print(self.main_engine.out_waiting)# Get the number of bytes in the output buffer
  # print(self.main_engine.readall())# Read all characters.

  #Receive data
  # An integer takes up two bytes
  # A character takes up one byte

  def Recive_data(self,way):
    # Loop to receive data, this is a dead loop, can be realized in threads.
    print("Start receiving data:")
    while True:
      try:
        # Receive it byte by byte
        if self.main_engine.in_waiting:
          if(way == 0):
            for i in range(self.main_engine.in_waiting):
              print("Receive ascii data:"+str(self.Read_Size(1)))
              data1 = self.Read_Size(1).hex()# to hexadecimal
              data2 = int(data1,16)# Convert to decimal
              if (data2 == "exit"): # Exit sign
                break
              else:
                 print("Data received hex:"+data1+" Data received in decimal: "+str(data2))
          if(way == 1):
            # Overall reception
            # data = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8")#Mode 1
            data = self.main_engine.read_all()# Mode II
            if (data == "exit"): # Exit sign
              break
            else:
               print("Receive ascii data:", data)
      except Exception as e:
        print("Exception reported:",e)


Communication.Print_Used_Com()
Ret =False # Create a success flag or not

Engine1 = Communication("com12",115200,0.5)
if (Ret):
  Engine1.Recive_data(0)
while(1)
  {
   //Send test
   uint8_t a = 61;
   delayms(300);
   printf("%c", a);
}
Start receiving data:
reception (of transmitted signal)asciidigital:b'='
收到digital十六进制:3d 收到digital十进制:61

This is the whole content of this article, I hope it will help you to learn more.