#!/usr/bin/env python
#
# Copyright (c) 2019, Pycom Limited.
#
# This software is licensed under the GNU GPL version 3 or any
# later version, with permitted additional terms. For more information
# see the Pycom Licence v1.0 document supplied with this file, or
# available at https://www.pycom.io/opensource/licensing
#
# system packages
# import random
import struct
import socket
import time
# custom packages
from . import functions
from . import const as Const
from .common import Request, CommonModbusFunctions
from .common import ModbusException
from .modbus import Modbus
# typing not natively supported on MicroPython
from .typing import Optional, Tuple, Union
[docs]class ModbusTCP(Modbus):
"""Modbus TCP client class"""
def __init__(self):
super().__init__(
# set itf to TCPServer object, addr_list to None
TCPServer(),
None
)
[docs] def bind(self,
local_ip: str,
local_port: int = 502,
max_connections: int = 10) -> None:
"""
Bind IP and port for incomming requests
:param local_ip: IP of this device listening for requests
:type local_ip: str
:param local_port: Port of this device
:type local_port: int
:param max_connections: Number of maximum connections
:type max_connections: int
"""
self._itf.bind(local_ip, local_port, max_connections)
[docs] def get_bound_status(self) -> bool:
"""
Get the IP and port binding status.
:returns: The bound status, True if already bound, False otherwise.
:rtype: bool
"""
try:
return self._itf.get_is_bound()
except Exception:
return False
[docs]class TCP(CommonModbusFunctions):
"""
TCP class handling socket connections and parsing the Modbus data
:param slave_ip: IP of this device listening for requests
:type slave_ip: str
:param slave_port: Port of this device
:type slave_port: int
:param timeout: Socket timeout in seconds
:type timeout: float
"""
def __init__(self,
slave_ip: str,
slave_port: int = 502,
timeout: float = 5.0):
self._sock = socket.socket()
self.trans_id_ctr = 0
# print(socket.getaddrinfo(slave_ip, slave_port))
# [(2, 1, 0, '192.168.178.47', ('192.168.178.47', 502))]
self._sock.connect(socket.getaddrinfo(slave_ip, slave_port)[0][-1])
self._sock.settimeout(timeout)
[docs] def _create_mbap_hdr(self,
slave_addr: int,
modbus_pdu: bytes) -> Tuple[bytes, int]:
"""
Create a Modbus header.
:param slave_addr: The slave identifier
:type slave_addr: int
:param modbus_pdu: The modbus Protocol Data Unit
:type modbus_pdu: bytes
:returns: Modbus header and unique transaction ID
:rtype: Tuple[bytes, int]
"""
# only available on WiPy
# trans_id = machine.rng() & 0xFFFF
# use builtin function to generate random 24 bit integer
# trans_id = random.getrandbits(24) & 0xFFFF
# use incrementing counter as it's faster
trans_id = self.trans_id_ctr
self.trans_id_ctr += 1
mbap_hdr = struct.pack(
'>HHHB', trans_id, 0, len(modbus_pdu) + 1, slave_addr)
return mbap_hdr, trans_id
[docs] def _validate_resp_hdr(self,
response: bytearray,
trans_id: int,
slave_addr: int,
function_code: int,
count: bool = False) -> bytes:
"""
Validate the response header.
:param response: The response
:type response: bytearray
:param trans_id: The transaction identifier
:type trans_id: int
:param slave_addr: The slave identifier
:type slave_addr: int
:param function_code: The function code
:type function_code: int
:param count: The count
:type count: bool
:returns: Modbus response content
:rtype: bytes
"""
rec_tid, rec_pid, rec_len, rec_uid, rec_fc = struct.unpack(
'>HHHBB', response[:Const.MBAP_HDR_LENGTH + 1])
if (trans_id != rec_tid):
raise ValueError('wrong transaction ID')
if (rec_pid != 0):
raise ValueError('invalid protocol ID')
if (slave_addr != rec_uid):
raise ValueError('wrong slave ID')
if (rec_fc == (function_code + Const.ERROR_BIAS)):
raise ValueError('slave returned exception code: {:d}'.
format(rec_fc))
hdr_length = (Const.MBAP_HDR_LENGTH + 2) if count else \
(Const.MBAP_HDR_LENGTH + 1)
return response[hdr_length:]
[docs] def _send_receive(self,
slave_addr: int,
modbus_pdu: bytes,
count: bool) -> bytes:
"""
Send a modbus message and receive the reponse.
:param slave_addr: The slave identifier
:type slave_addr: int
:param modbus_pdu: The modbus PDU
:type modbus_pdu: bytes
:param count: The count
:type count: bool
:returns: Modbus data
:rtype: bytes
"""
mbap_hdr, trans_id = self._create_mbap_hdr(slave_addr=slave_addr,
modbus_pdu=modbus_pdu)
self._sock.send(mbap_hdr + modbus_pdu)
response = self._sock.recv(256)
modbus_data = self._validate_resp_hdr(response=response,
trans_id=trans_id,
slave_addr=slave_addr,
function_code=modbus_pdu[0],
count=count)
return modbus_data
[docs]class TCPServer(object):
"""Modbus TCP host class"""
def __init__(self):
self._sock = None
self._client_sock = None
self._is_bound = False
@property
def is_bound(self) -> bool:
"""
Get the IP and port binding status
:returns: True if bound to IP and port, False otherwise
:rtype: bool
"""
return self._is_bound
[docs] def get_is_bound(self) -> bool:
"""
Get the IP and port binding status, legacy support.
:returns: True if bound to IP and port, False otherwise
:rtype: bool
"""
return self._is_bound
[docs] def bind(self,
local_ip: str,
local_port: int = 502,
max_connections: int = 10):
"""
Bind IP and port for incomming requests
:param local_ip: IP of this device listening for requests
:type local_ip: str
:param local_port: Port of this device
:type local_port: int
:param max_connections: Number of maximum connections
:type max_connections: int
"""
if self._client_sock:
self._client_sock.close()
if self._sock:
self._sock.close()
self._sock = socket.socket()
# print(socket.getaddrinfo(local_ip, local_port))
# [(2, 1, 0, '192.168.178.47', ('192.168.178.47', 502))]
self._sock.bind(socket.getaddrinfo(local_ip, local_port)[0][-1])
self._sock.listen(max_connections)
self._is_bound = True
[docs] def _send(self, modbus_pdu: bytes, slave_addr: int) -> None:
"""
Send Modbus Protocol Data Unit to slave
:param modbus_pdu: The Modbus Protocol Data Unit
:type modbus_pdu: bytes
:param slave_addr: The slave address
:type slave_addr: int
"""
size = len(modbus_pdu)
fmt = 'B' * size
adu = struct.pack('>HHHB' + fmt, self._req_tid, 0, size + 1, slave_addr, *modbus_pdu)
self._client_sock.send(adu)
[docs] def send_response(self,
slave_addr: int,
function_code: int,
request_register_addr: int,
request_register_qty: int,
request_data: list,
values: Optional[list] = None,
signed: bool = True) -> None:
"""
Send a response to a client.
:param slave_addr: The slave address
:type slave_addr: int
:param function_code: The function code
:type function_code: int
:param request_register_addr: The request register address
:type request_register_addr: int
:param request_register_qty: The request register qty
:type request_register_qty: int
:param request_data: The request data
:type request_data: list
:param values: The values
:type values: Optional[list]
:param signed: Indicates if signed
:type signed: bool
"""
modbus_pdu = functions.response(function_code,
request_register_addr,
request_register_qty,
request_data,
values,
signed)
self._send(modbus_pdu, slave_addr)
[docs] def send_exception_response(self,
slave_addr: int,
function_code: int,
exception_code: int) -> None:
"""
Send an exception response to a client.
:param slave_addr: The slave address
:type slave_addr: int
:param function_code: The function code
:type function_code: int
:param exception_code: The exception code
:type exception_code: int
"""
modbus_pdu = functions.exception_response(function_code,
exception_code)
self._send(modbus_pdu, slave_addr)
[docs] def _accept_request(self,
accept_timeout: float,
unit_addr_list: list) -> Union[Request, None]:
"""
Accept, read and decode a socket based request
:param accept_timeout: The socket accept timeout
:type accept_timeout: float
:param unit_addr_list: The unit address list
:type unit_addr_list: list
"""
self._sock.settimeout(accept_timeout)
new_client_sock = None
try:
new_client_sock, client_address = self._sock.accept()
except OSError as e:
if e.args[0] != 11: # 11 = timeout expired
raise e
if new_client_sock is not None:
if self._client_sock is not None:
self._client_sock.close()
self._client_sock = new_client_sock
# recv() timeout, setting to 0 might lead to the following error
# "Modbus request error: [Errno 11] EAGAIN"
# This is a socket timeout error
self._client_sock.settimeout(0.5)
if self._client_sock is not None:
try:
req = self._client_sock.recv(128)
if len(req) == 0:
return None
req_header_no_uid = req[:Const.MBAP_HDR_LENGTH - 1]
self._req_tid, req_pid, req_len = struct.unpack('>HHH', req_header_no_uid)
req_uid_and_pdu = req[Const.MBAP_HDR_LENGTH - 1:Const.MBAP_HDR_LENGTH + req_len - 1]
except OSError:
# MicroPython raises an OSError instead of socket.timeout
# print("Socket OSError aka TimeoutError: {}".format(e))
return None
except Exception:
# print("Modbus request error:", e)
self._client_sock.close()
self._client_sock = None
return None
if (req_pid != 0):
# print("Modbus request error: PID not 0")
self._client_sock.close()
self._client_sock = None
return None
if ((unit_addr_list is not None) and (req_uid_and_pdu[0] not in unit_addr_list)):
return None
try:
return Request(self, req_uid_and_pdu)
except ModbusException as e:
self.send_exception_response(req[0],
e.function_code,
e.exception_code)
return None
[docs] def get_request(self,
unit_addr_list: Optional[list] = None,
timeout: int = None) -> Union[Request, None]:
"""
Check for request within the specified timeout
:param unit_addr_list: The unit address list
:type unit_addr_list: Optional[list]
:param timeout: The timeout
:type timeout: int
:returns: A request object or None.
:rtype: Union[Request, None]
:raises Exception: If no socket is configured and bound
"""
if self._sock is None:
raise Exception('Modbus TCP server not bound')
if timeout > 0:
start_ms = time.ticks_ms()
elapsed = 0
while True:
if self._client_sock is None:
accept_timeout = None if timeout is None else (timeout - elapsed) / 1000
else:
accept_timeout = 0
req = self._accept_request(accept_timeout, unit_addr_list)
if req:
return req
elapsed = time.ticks_diff(start_ms, time.ticks_ms())
if elapsed > timeout:
return None
else:
return self._accept_request(0, unit_addr_list)