#!/usr/bin/env python
# Copyright (c) 2019, Pycom Limited.
# This software is licensed under the GNU GPL version 3 or any
# later version, with permitted additional terms. For more information
# see the Pycom Licence v1.0 document supplied with this file, or
# available at

# system packages
from machine import UART
from machine import Pin
import struct
import time
import machine

# custom packages
from . import const as Const
from . import functions
from .common import Request, CommonModbusFunctions
from .common import ModbusException
from .modbus import Modbus

# typing not natively supported on MicroPython
from .typing import List, Optional, Union

[docs]class ModbusRTU(Modbus): """ Modbus RTU client class :param addr: The address of this device on the bus :type addr: int :param baudrate: The baudrate, default 9600 :type baudrate: int :param data_bits: The data bits, default 8 :type data_bits: int :param stop_bits: The stop bits, default 1 :type stop_bits: int :param parity: The parity, default None :type parity: Optional[int] :param pins: The pins as list [TX, RX] :type pins: List[Union[int, Pin], Union[int, Pin]] :param ctrl_pin: The control pin :type ctrl_pin: int :param uart_id: The ID of the used UART :type uart_id: int """ def __init__(self, addr: int, baudrate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity: Optional[int] = None, pins: List[Union[int, Pin], Union[int, Pin]] = None, ctrl_pin: int = None, uart_id: int = 1): super().__init__( # set itf to Serial object, addr_list to [addr] Serial(uart_id=uart_id, baudrate=baudrate, data_bits=data_bits, stop_bits=stop_bits, parity=parity, pins=pins, ctrl_pin=ctrl_pin), [addr] )
[docs]class Serial(CommonModbusFunctions): def __init__(self, uart_id: int = 1, baudrate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity=None, pins: List[Union[int, Pin], Union[int, Pin]] = None, ctrl_pin: int = None): """ Setup Serial/RTU Modbus :param uart_id: The ID of the used UART :type uart_id: int :param baudrate: The baudrate, default 9600 :type baudrate: int :param data_bits: The data bits, default 8 :type data_bits: int :param stop_bits: The stop bits, default 1 :type stop_bits: int :param parity: The parity, default None :type parity: Optional[int] :param pins: The pins as list [TX, RX] :type pins: List[Union[int, Pin], Union[int, Pin]] :param ctrl_pin: The control pin :type ctrl_pin: int """ self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity, stop=stop_bits, # timeout_chars=2, # WiPy only # pins=pins # WiPy only tx=pins[0], rx=pins[1] ) if ctrl_pin is not None: self._ctrlPin = Pin(ctrl_pin, mode=Pin.OUT) else: self._ctrlPin = None self._t1char = (1000000 * (data_bits + stop_bits + 2)) // baudrate if baudrate <= 19200: # 4010us (approx. 4ms) @ 9600 baud self._t35chars = (3500000 * (data_bits + stop_bits + 2)) // baudrate else: self._t35chars = 1750 # 1750us (approx. 1.75ms)
[docs] def _calculate_crc16(self, data: bytearray) -> bytes: """ Calculates the CRC16. :param data: The data :type data: bytearray :returns: The crc 16. :rtype: bytes """ crc = 0xFFFF for char in data: crc = (crc >> 8) ^ Const.CRC16_TABLE[((crc) ^ char) & 0xFF] return struct.pack('<H', crc)
[docs] def _exit_read(self, response: bytearray) -> bool: """ Return on modbus read error :param response: The response :type response: bytearray :returns: State of basic read response evaluation :rtype: bool """ if response[1] >= Const.ERROR_BIAS: if len(response) < Const.ERROR_RESP_LEN: return False elif (Const.READ_COILS <= response[1] <= Const.READ_INPUT_REGISTER): expected_len = Const.RESPONSE_HDR_LENGTH + 1 + response[2] + Const.CRC_LENGTH if len(response) < expected_len: return False elif len(response) < Const.FIXED_RESP_LEN: return False return True
[docs] def _uart_read(self) -> bytearray: """ Read up to 40 bytes from UART :returns: Read content :rtype: bytearray """ response = bytearray() for x in range(1, 40): if self._uart.any(): # WiPy only # response.extend(self._uart.readall()) response.extend( # variable length function codes may require multiple reads if self._exit_read(response): break # wait for the maximum time between two frames time.sleep_us(self._t35chars) return response
[docs] def _uart_read_frame(self, timeout: Optional[int] = None) -> bytearray: """ Read a Modbus frame :param timeout: The timeout :type timeout: Optional[int] :returns: Received message :rtype: bytearray """ received_bytes = bytearray() # set timeout to at least twice the time between two frames in case the # timeout was set to zero or None if timeout == 0 or timeout is None: timeout = 2 * self._t35chars # in milliseconds start_us = time.ticks_us() # stay inside this while loop at least for the timeout time while (time.ticks_diff(time.ticks_us(), start_us) <= timeout): # check amount of available characters if self._uart.any(): # remember this time in microseconds last_byte_ts = time.ticks_us() # do not stop reading and appending the result to the buffer # until the time between two frames elapsed while time.ticks_diff(time.ticks_us(), last_byte_ts) <= self._t35chars: # WiPy only # r = self._uart.readall() r = # if something has been read after the first iteration of # this inner while loop (during self._t35chars time) if r is not None: # append the new read stuff to the buffer received_bytes.extend(r) # update the timestamp of the last byte being read last_byte_ts = time.ticks_us() # if something has been read before the overall timeout is reached if len(received_bytes) > 0: return received_bytes # return the result in case the overall timeout has been reached return received_bytes
[docs] def _send(self, modbus_pdu: bytes, slave_addr: int) -> None: """ Send Modbus frame via UART If a flow control pin has been setup, it will be controller accordingly :param modbus_pdu: The modbus Protocol Data Unit :type modbus_pdu: bytes :param slave_addr: The slave address :type slave_addr: int """ serial_pdu = bytearray() serial_pdu.append(slave_addr) serial_pdu.extend(modbus_pdu) crc = self._calculate_crc16(serial_pdu) serial_pdu.extend(crc) if self._ctrlPin: self._ctrlPin(1) time.sleep_us(1000) # wait until the control pin really changed send_start_time = time.ticks_us() self._uart.write(serial_pdu) if self._ctrlPin: total_frame_time_us = self._t1char * len(serial_pdu) while time.ticks_us() <= send_start_time + total_frame_time_us: machine.idle() self._ctrlPin(0)
[docs] def _send_receive(self, modbus_pdu: bytes, slave_addr: int, count: bool) -> bytes: """ Send a modbus message and receive the reponse. :param modbus_pdu: The modbus Protocol Data Unit :type modbus_pdu: bytes :param slave_addr: The slave address :type slave_addr: int :param count: The count :type count: bool :returns: Validated response content :rtype: bytes """ # flush the Rx FIFO self._send(modbus_pdu=modbus_pdu, slave_addr=slave_addr) return self._validate_resp_hdr(response=self._uart_read(), slave_addr=slave_addr, function_code=modbus_pdu[0], count=count)
[docs] def _validate_resp_hdr(self, response: bytearray, slave_addr: int, function_code: int, count: bool) -> bytes: """ Validate the response header. :param response: The response :type response: bytearray :param slave_addr: The slave address :type slave_addr: int :param function_code: The function code :type function_code: int :param count: The count :type count: bool :returns: Modbus response content :rtype: bytes """ if len(response) == 0: raise OSError('no data received from slave') resp_crc = response[-Const.CRC_LENGTH:] expected_crc = self._calculate_crc16( response[0:len(response) - Const.CRC_LENGTH] ) if ((resp_crc[0] is not expected_crc[0]) or (resp_crc[1] is not expected_crc[1])): raise OSError('invalid response CRC') if (response[0] != slave_addr): raise ValueError('wrong slave address') if (response[1] == (function_code + Const.ERROR_BIAS)): raise ValueError('slave returned exception code: {:d}'. format(response[2])) hdr_length = (Const.RESPONSE_HDR_LENGTH + 1) if count else \ Const.RESPONSE_HDR_LENGTH return response[hdr_length:len(response) - Const.CRC_LENGTH]
[docs] def send_response(self, slave_addr: int, function_code: int, request_register_addr: int, request_register_qty: int, request_data: list, values: Optional[list] = None, signed: bool = True) -> None: """ Send a response to a client. :param slave_addr: The slave address :type slave_addr: int :param function_code: The function code :type function_code: int :param request_register_addr: The request register address :type request_register_addr: int :param request_register_qty: The request register qty :type request_register_qty: int :param request_data: The request data :type request_data: list :param values: The values :type values: Optional[list] :param signed: Indicates if signed :type signed: bool """ modbus_pdu = functions.response( function_code=function_code, request_register_addr=request_register_addr, request_register_qty=request_register_qty, request_data=request_data, value_list=values, signed=signed ) self._send(modbus_pdu=modbus_pdu, slave_addr=slave_addr)
[docs] def send_exception_response(self, slave_addr: int, function_code: int, exception_code: int) -> None: """ Send an exception response to a client. :param slave_addr: The slave address :type slave_addr: int :param function_code: The function code :type function_code: int :param exception_code: The exception code :type exception_code: int """ modbus_pdu = functions.exception_response( function_code=function_code, exception_code=exception_code) self._send(modbus_pdu=modbus_pdu, slave_addr=slave_addr)
[docs] def get_request(self, unit_addr_list: List[int], timeout: Optional[int] = None) -> Union[Request, None]: """ Check for request within the specified timeout :param unit_addr_list: The unit address list :type unit_addr_list: Optional[list] :param timeout: The timeout :type timeout: Optional[int] :returns: A request object or None. :rtype: Union[Request, None] """ req = self._uart_read_frame(timeout=timeout) if len(req) < 8: return None if req[0] not in unit_addr_list: return None req_crc = req[-Const.CRC_LENGTH:] req_no_crc = req[:-Const.CRC_LENGTH] expected_crc = self._calculate_crc16(req_no_crc) if (req_crc[0] != expected_crc[0]) or (req_crc[1] != expected_crc[1]): return None try: request = Request(interface=self, data=req_no_crc) except ModbusException as e: self.send_exception_response( slave_addr=req[0], function_code=e.function_code, exception_code=e.exception_code) return None return request