API

Modbus Constants

umodbus.const.ACKNOWLEDGE = <Mock name='mock.const()' id='140345175704576'>

Response is returned to prevent a timeout error from occurring in the client

umodbus.const.CRC16_TABLE = (0, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920, 51009, 1280, 50625, 50305, 1088, 52225, 3264, 3456, 52545, 3840, 53185, 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176, 51265, 55297, 6336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000, 56577, 7616, 7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760, 54849, 53761, 4800, 4992, 54081, 4352, 53697, 53377, 4160, 61441, 12480, 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361, 14144, 62721, 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089, 64001, 15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777, 10560, 60161, 11200, 10880, 59969, 60929, 11968, 12160, 61249, 11520, 60865, 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329, 59009, 9792, 8704, 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281, 25344, 41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728, 42049, 27648, 44225, 44417, 27968, 44801, 28608, 28288, 44609, 43521, 27328, 27520, 43841, 26880, 43457, 43137, 26688, 30720, 47297, 47489, 31040, 47873, 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257, 31808, 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953, 29504, 45313, 29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440, 21120, 37441, 38401, 22208, 22400, 38721, 21760, 38337, 38017, 21568, 39937, 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040, 39617, 39809, 23360, 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457, 19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985, 34177, 17728, 34561, 18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640, 33217, 32897, 16448)

CRC16 lookup table

umodbus.const.CRC_LENGTH = <Mock name='mock.const()' id='140345175704576'>

CRC length

umodbus.const.DEVICE_FAILED_TO_RESPOND = <Mock name='mock.const()' id='140345175704576'>

No response was obtained from the target device

umodbus.const.DIAGNOSTICS = <Mock name='mock.const()' id='140345175704576'>

Provide series of tests for checking the communication system (serial only)

umodbus.const.ERROR_BIAS = <Mock name='mock.const()' id='140345175704576'>

Error code offset

umodbus.const.ERROR_RESP_LEN = <Mock name='mock.const()' id='140345175704576'>

Error response length

umodbus.const.FIXED_RESP_LEN = <Mock name='mock.const()' id='140345175704576'>

Fixed response length

umodbus.const.GET_COM_EVENT_COUNTER = <Mock name='mock.const()' id='140345175704576'>

Get status word and an event count from the remote device com event counter

umodbus.const.GET_COM_EVENT_LOG = <Mock name='mock.const()' id='140345175704576'>

Get a status word, event count, message count, and a field of event bytes

umodbus.const.ILLEGAL_DATA_ADDRESS = <Mock name='mock.const()' id='140345175704576'>

Data address received in query is not an allowable address for the server

umodbus.const.ILLEGAL_DATA_VALUE = <Mock name='mock.const()' id='140345175704576'>

An unrecoverable error occurred while the server was attempting to perform the requested action

umodbus.const.ILLEGAL_FUNCTION = <Mock name='mock.const()' id='140345175704576'>

Function code received in query is not an allowable action for the server

umodbus.const.MASK_WRITE_REGISTER = <Mock name='mock.const()' id='140345175704576'>

Perform a combination of one read operation and one write operation in a single MODBUS transaction

umodbus.const.MBAP_HDR_LENGTH = <Mock name='mock.const()' id='140345175704576'>

Modbus Application Protocol High Data Response length

umodbus.const.MEMORY_PARITY_ERROR = <Mock name='mock.const()' id='140345175704576'>

Gateway was unable to allocate an internal communication path from the input port to the output port for processing the request

umodbus.const.READ_COILS = <Mock name='mock.const()' id='140345175704576'>

Read contiguous status of coils

umodbus.const.READ_DEVICE_IDENTIFICATION = <Mock name='mock.const()' id='140345175704576'>

Encapsulated Interface Transport

umodbus.const.READ_DISCRETE_INPUTS = <Mock name='mock.const()' id='140345175704576'>

Read contiguous status of discrete inputs

umodbus.const.READ_EXCEPTION_STATUS = <Mock name='mock.const()' id='140345175704576'>

Read the contents of eight Exception Status outputs

umodbus.const.READ_FIFO_QUEUE = <Mock name='mock.const()' id='140345175704576'>

Read the contents of a First-In-First-Out (FIFO) queue of register

umodbus.const.READ_FILE_RECORD = <Mock name='mock.const()' id='140345175704576'>

Perform a file record read

umodbus.const.READ_HOLDING_REGISTERS = <Mock name='mock.const()' id='140345175704576'>

Read the contents of a contiguous block of holding registers

umodbus.const.READ_INPUT_REGISTER = <Mock name='mock.const()' id='140345175704576'>

Read contiguous input registers

umodbus.const.REPORT_SERVER_ID = <Mock name='mock.const()' id='140345175704576'>

Read the description of the type, the current status, and other informations

umodbus.const.RESPONSE_HDR_LENGTH = <Mock name='mock.const()' id='140345175704576'>

High Data Response length

umodbus.const.SERVER_DEVICE_BUSY = <Mock name='mock.const()' id='140345175704576'>

Server is engaged in processing a long duration program command

umodbus.const.WRITE_FILE_RECORD = <Mock name='mock.const()' id='140345175704576'>

Perform a file record write

umodbus.const.WRITE_MULTIPLE_COILS = <Mock name='mock.const()' id='140345175704576'>

Force each coil in a sequence of coils to either ON or OFF

umodbus.const.WRITE_MULTIPLE_REGISTERS = <Mock name='mock.const()' id='140345175704576'>

Modify the contents of a specified holding register using a combination of an AND mask, an OR mask, and the register’s current contents

umodbus.const.WRITE_SINGLE_COIL = <Mock name='mock.const()' id='140345175704576'>

Write a single coil output status to ON or OFF

umodbus.const.WRITE_SINGLE_REGISTER = <Mock name='mock.const()' id='140345175704576'>

Write a single holding register

Common module

class umodbus.common.CommonModbusFunctions[source]

Bases: object

Common Modbus functions

read_coils(slave_addr: int, starting_addr: int, coil_qty: int) None[source]

Read coils (COILS).

Parameters
  • slave_addr (int) – The slave address

  • starting_addr (int) – The coil starting address

  • coil_qty (int) – The amount of coils to read

Returns

State of read coils as list

Return type

List[bool]

read_discrete_inputs(slave_addr: int, starting_addr: int, input_qty: int) None[source]

Read discrete inputs (ISTS).

Parameters
  • slave_addr (int) – The slave address

  • starting_addr (int) – The discrete input starting address

  • input_qty (int) – The amount of discrete inputs to read

Returns

State of read discrete inputs as list

Return type

List[bool]

read_holding_registers(slave_addr: int, starting_addr: int, register_qty: int, signed: bool = True) None[source]

Read holding registers (HREGS).

Parameters
  • slave_addr (int) – The slave address

  • starting_addr (int) – The holding register starting address

  • register_qty (int) – The amount of holding registers to read

  • signed (bool) – Indicates if signed

Returns

State of read holding register as tuple

Return type

Tuple[int, …]

read_input_registers(slave_addr: int, starting_addr: int, register_qty: int, signed: bool = True) None[source]

Read input registers (IREGS).

Parameters
  • slave_addr (int) – The slave address

  • starting_addr (int) – The input register starting address

  • register_qty (int) – The amount of input registers to read

  • signed (bool) – Indicates if signed

Returns

State of read input register as tuple

Return type

Tuple[int, …]

write_multiple_coils(slave_addr: int, starting_address: int, output_values: None) bool[source]

Update multiple coils.

Parameters
  • slave_addr (int) – The slave address

  • starting_address (int) – The address of the first coil

  • output_values (List[Union[int, bool]]) – The output values

Returns

Result of operation

Return type

bool

write_multiple_registers(slave_addr: int, starting_address: int, register_values: None, signed: bool = True) bool[source]

Update multiple registers.

Parameters
  • slave_addr (int) – The slave address

  • starting_address (int) – The starting address

  • register_values (List[int]) – The register values

  • signed (bool) – Indicates if signed

Returns

Result of operation

Return type

bool

write_single_coil(slave_addr: int, output_address: int, output_value: None) bool[source]

Update a single coil.

Parameters
  • slave_addr (int) – The slave address

  • output_address (int) – The output address

  • output_value (Union[int, bool]) – The output value

Returns

Result of operation

Return type

bool

write_single_register(slave_addr: int, register_address: int, register_value: int, signed: bool = True) bool[source]

Update a single register.

Parameters
  • slave_addr (int) – The slave address

  • register_address (int) – The register address

  • register_value (int) – The register value

  • signed (bool) – Indicates if signed

Returns

Result of operation

Return type

bool

exception umodbus.common.ModbusException(function_code: int, exception_code: int)[source]

Bases: Exception

Exception for signaling modbus errors

class umodbus.common.Request(interface, data: bytearray)[source]

Bases: object

Deconstruct request data received via TCP or Serial

send_exception(exception_code: int) None[source]

Send an exception response.

Parameters

exception_code (int) – The exception code

send_response(values: None = None, signed: bool = True) None[source]

Send a response via the configured interface.

Parameters
  • values (Optional[list]) – The values

  • signed (bool) – Indicates if signed values are used

Common functions

umodbus.functions.bin_to_float(binary: str) float[source]

Convert binary string to floating point value

Parameters

binary (str) – The binary string

Returns

Converted floating point value

Return type

float

umodbus.functions.bytes_to_bool(byte_list: bytes, bit_qty: None = 1) None[source]

Convert bytes to list of boolean values

Parameters
  • byte_list (bytes) – The byte list

  • bit_qty (Optional[int]) – Amount of bits received

Returns

Boolean representation

Return type

List[bool]

umodbus.functions.exception_response(function_code: int, exception_code: int) bytes[source]

Create Modbus exception response

Parameters
  • function_code (int) – The function code

  • exception_code (int) – The exception code

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.float_to_bin(num: float) bin[source]

Convert floating point value to binary

See IEEE 754

Parameters

num (float) – The number

Returns

Binary representation

Return type

bin

umodbus.functions.int_to_bin(num: int) str[source]

Convert integer to binary

Parameters

num (int) – The number

Returns

Binary representation of given input

Return type

str

umodbus.functions.read_coils(starting_address: int, quantity: int) bytes[source]

Create Modbus Protocol Data Unit for reading coils.

Parameters
  • starting_address (int) – The starting address

  • quantity (int) – Quantity of coils

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.read_discrete_inputs(starting_address: int, quantity: int) bytes[source]

Create Modbus Protocol Data Unit for reading discrete inputs.

Parameters
  • starting_address (int) – The starting address

  • quantity (int) – Quantity of coils

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.read_holding_registers(starting_address: int, quantity: int) bytes[source]

Create Modbus Protocol Data Unit for reading holding registers.

Parameters
  • starting_address (int) – The starting address

  • quantity (int) – Quantity of coils

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.read_input_registers(starting_address: int, quantity: int) bytes[source]

Create Modbus Protocol Data Unit for reading input registers.

Parameters
  • starting_address (int) – The starting address

  • quantity (int) – Quantity of coils

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.response(function_code: int, request_register_addr: int, request_register_qty: int, request_data: list, value_list: None = None, signed: bool = True) bytes[source]

Construct a Modbus response Protocol Data Unit

Parameters
  • function_code (int) – The function code

  • request_register_addr (int) – The request register address

  • request_register_qty (int) – The request register qty

  • request_data (list) – The request data

  • value_list (Optional[list]) – The values

  • signed (bool) – Indicates if signed

Returns

Protocol data unit

Return type

bytes

umodbus.functions.to_short(byte_array: bytes, signed: bool = True) bytes[source]

Convert bytes to tuple of integer values

Parameters
  • byte_array (bytes) – The byte array

  • signed (bool) – Indicates if signed

Returns

Integer representation

Return type

bytes

umodbus.functions.validate_resp_data(data: bytes, function_code: int, address: int, value: Optional[int] = None, quantity: Optional[int] = None, signed: bool = True) bool[source]

Validate the response data.

Parameters
  • data (bytes) – The data

  • function_code (int) – The function code

  • address (int) – The address

  • value (int) – The value

  • quantity (int) – The quantity

  • signed (bool) – Indicates if signed

Returns

True if valid, False otherwise

Return type

bool

umodbus.functions.write_multiple_coils(starting_address: int, value_list: None) bytes[source]

Create Modbus message to update multiple coils

Parameters
  • starting_address (int) – The starting address

  • value_list (List[Union[int, bool]]) – The list of output values

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.write_multiple_registers(starting_address: int, register_values: None, signed: bool = True) bytes[source]

Create Modbus message to update multiple coils

Parameters
  • starting_address (int) – The starting address

  • register_values (List[int, bool]) – The list of output value

  • signed (bool) – Flag whether data is signed or not

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.write_single_coil(output_address: int, output_value: None) bytes[source]

Create Modbus message to update single coil

Parameters
  • output_address (int) – The output address

  • output_value (Union[int, bool]) – The output value

Returns

Packed Modbus message

Return type

bytes

umodbus.functions.write_single_register(register_address: int, register_value: int, signed: bool = True) bytes[source]

Create Modbus message to writes a single register

Parameters
  • register_address (int) – The register address

  • register_value (int) – The register value

  • signed (bool) – Flag whether data is signed or not

Returns

Packed Modbus message

Return type

bytes

Modbus client module

Modbus register abstraction class

Used to add, remove, set and get values or states of a register or coil. Additional helper properties and functions like getters for changed registers are available as well.

This class is inherited by the Modbus client implementations umodbus.serial.ModbusRTU and umodbus.tcp.ModbusTCP

class umodbus.modbus.Modbus(itf, addr_list: None)[source]

Bases: object

Modbus register abstraction

Parameters
  • itf (Callable) – Abstraction interface

  • addr_list (List[int]) – List of addresses

_check_valid_register(reg_type: str) bool[source]

Check register type to be a valid modbus register

Parameters

reg_type (str) – The register type

Returns

Flag whether register type is valid

Return type

bool

_create_response(request: Request, reg_type: str) None[source]

Create a response.

Parameters
  • request (Request) – The request

  • reg_type (str) – The register type

Returns

Values of this register

Return type

Union[List[bool], List[int]]

_get_reg_in_dict(reg_type: str, address: int) None[source]

Get the register value from the dictionary of registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

Raises

KeyError – No register at specified address found

Returns

Register value

Return type

Union[bool, int, List[bool], List[int]]

_get_regs_of_dict(reg_type: str) dict_keys[source]

Get all configured registers of specified register type.

Parameters

reg_type (str) – The register type

Raises

KeyError – No register at specified address found

Returns

The configured registers of the specified register type.

Return type

dict_keys

_process_read_access(request: Request, reg_type: str) None[source]

Process read access to register

Parameters
  • request (Request) – The request

  • reg_type (str) – The register type

_process_write_access(request: Request, reg_type: str) None[source]

Process write access to register

Parameters
  • request (Request) – The request

  • reg_type (str) – The register type

_remove_changed_register(reg_type: str, address: int, timestamp: int) bool[source]

Remove the register from the dictionary of changed registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

  • timestamp (int) – The timestamp of the change in milliseconds

Raises

KeyError – No register at specified address found

Returns

Result of removing register from dict

Return type

bool

_remove_reg_from_dict(reg_type: str, address: int) None[source]

Remove the register from the dictionary of registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

Raises

KeyError – No register at specified address found

Returns

Register value, None if register did not exist in dict

Return type

Union[None, bool, int, List[bool], List[int]]

_set_changed_register(reg_type: str, address: int, value: None) None[source]

Set the register value in the dictionary of changed registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

  • value (Union[bool, int, List[bool], List[int]]) – The value

Raises

KeyError – Register can not be changed externally

_set_reg_in_dict(reg_type: str, address: int, value: None, on_set_cb: None = None, on_get_cb: None = None) None[source]

Set the register value in the dictionary of registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

  • value (Union[bool, int, List[bool], List[int]]) – The value(s) of the register(s)

  • on_set_cb – Callback on setting the register

  • on_get_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on getting the register

Raises

KeyError – No register at specified address found

_set_single_reg_in_dict(reg_type: str, address: int, value: None, on_set_cb: None = None, on_get_cb: None = None) None[source]

Set a register value in the dictionary of registers.

Parameters
  • reg_type (str) – The register type

  • address (int) – The address (ID) of the register

  • value (Union[bool, int]) – The value of the register

  • on_set_cb – Callback on setting the register

  • on_get_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on getting the register

add_coil(address: int, value: None = False, on_set_cb: None = None, on_get_cb: None = None) None[source]

Add a coil to the modbus register dictionary.

Parameters
  • address (int) – The address (ID) of the register

  • value (Union[bool, List[bool]], optional) – The default value

  • on_set_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on setting the coil

  • on_get_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on getting the coil

add_hreg(address: int, value: None = 0, on_set_cb: None = None, on_get_cb: None = None) None[source]

Add a holding register to the modbus register dictionary.

Parameters
  • address (int) – The address (ID) of the register

  • value (Union[int, List[int]], optional) – The default value

  • on_set_cb (Callable[[str, int, List[int]], None]) – Callback on setting the holding register

  • on_get_cb (Callable[[str, int, List[int]], None]) – Callback on getting the holding register

add_ireg(address: int, value: None = 0, on_get_cb: None = None) None[source]

Add an input register to the modbus register dictionary.

Parameters
  • address (int) – The address (ID) of the register

  • value (Union[int, List[int]], optional) – The default value

  • on_get_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on getting the input register

add_ist(address: int, value: None = False, on_get_cb: None = None) None[source]

Add a discrete input register to the modbus register dictionary.

Parameters
  • address (int) – The address (ID) of the register

  • value (bool or list of bool, optional) – The default value

  • on_get_cb (Callable[ [str, int, Union[List[bool], List[int]]], None ]) – Callback on getting the discrete input register

property changed_coils: dict

Get the changed coil registers.

Returns

The changed coil registers.

Return type

dict

property changed_hregs: dict

Get the changed holding registers.

Returns

The changed holding registers.

Return type

dict

property changed_registers: dict

Get the changed registers.

Returns

The changed registers.

Return type

dict

property coils: dict_keys

Get the configured coils.

Returns

The dictionary keys.

Return type

dict_keys

get_coil(address: int) None[source]

Get the coil value.

Parameters

address (bool) – The address (ID) of the register

Returns

Coil value

Return type

Union[bool, List[bool]]

get_hreg(address: int) None[source]

Get the holding register value.

Parameters

address (int) – The address (ID) of the register

Returns

Holding register value

Return type

Union[int, List[int]]

get_ireg(address: int) None[source]

Get the input register value.

Parameters

address (int) – The address (ID) of the register

Returns

Input register value

Return type

Union[int, List[int]]

get_ist(address: int) None[source]

Get the discrete input register value.

Parameters

address (int) – The address (ID) of the register

Returns

Discrete input register value

Return type

Union[bool, List[bool]]

property hregs: dict_keys

Get the configured holding registers.

Returns

The dictionary keys.

Return type

dict_keys

property iregs: dict_keys

Get the configured input registers.

Returns

The dictionary keys.

Return type

dict_keys

property ists: dict_keys

Get the configured discrete input registers.

Returns

The dictionary keys.

Return type

dict_keys

process() bool[source]

Process the Modbus requests.

Returns

Result of processing, True on success, False otherwise

Return type

bool

remove_coil(address: int) None[source]

Remove a coil from the modbus register dictionary.

Parameters

address (int) – The address (ID) of the register

Returns

Register value, None if register did not exist in dict

Return type

Union[None, bool, List[bool]]

remove_hreg(address: int) None[source]

Remove a holding register from the modbus register dictionary.

Parameters

address (int) – The address (ID) of the register

Returns

Register value, None if register did not exist in dict

Return type

Union[None, int, List[int]]

remove_ireg(address: int) None[source]

Remove an input register from the modbus register dictionary.

Parameters

address (int) – The address (ID) of the register

Returns

Register value, None if register did not exist in dict

Return type

Union[None, int, List[int]]

remove_ist(address: int) None[source]

Remove a discrete input register from the modbus register dictionary.

Parameters

address (int) – The address (ID) of the register

Returns

Register value, None if register did not exist in dict

Return type

Union[None, bool, List[bool]]

set_coil(address: int, value: None = False) None[source]

Set the coil value.

Parameters
  • address (int) – The address (ID) of the register

  • value (Union[bool, List[bool]], optional) – The default value

set_hreg(address: int, value: None = 0) None[source]

Set the holding register value.

Parameters
  • address (int) – The address (ID) of the register

  • value (int or list of int, optional) – The default value

set_ireg(address: int, value: None = 0) None[source]

Set the input register value.

Parameters
  • address (int) – The address (ID) of the register

  • value (Union[int, List[int]], optional) – The default value

set_ist(address: int, value: bool = False) None[source]

Set the discrete input register value.

Parameters
  • address (int) – The address (ID) of the register

  • value (bool or list of bool, optional) – The default value

setup_registers(registers: dict = {}, use_default_vals: None = False) None[source]

Setup all registers of the client

Parameters
  • registers (dict) – The registers

  • use_default_vals (Optional[bool]) – Flag to use dummy default values

Serial

class umodbus.serial.ModbusRTU(addr: int, baudrate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity: None = None, pins: None = None, ctrl_pin: Optional[int] = None, uart_id: int = 1)[source]

Bases: Modbus

Modbus RTU client class

Parameters
  • addr (int) – The address of this device on the bus

  • baudrate (int) – The baudrate, default 9600

  • data_bits (int) – The data bits, default 8

  • stop_bits (int) – The stop bits, default 1

  • parity (Optional[int]) – The parity, default None

  • pins (List[Union[int, Pin], Union[int, Pin]]) – The pins as list [TX, RX]

  • ctrl_pin (int) – The control pin

  • uart_id (int) – The ID of the used UART

class umodbus.serial.Serial(uart_id: int = 1, baudrate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity=None, pins: None = None, ctrl_pin: Optional[int] = None)[source]

Bases: CommonModbusFunctions

_calculate_crc16(data: bytearray) bytes[source]

Calculates the CRC16.

Parameters

data (bytearray) – The data

Returns

The crc 16.

Return type

bytes

_exit_read(response: bytearray) bool[source]

Return on modbus read error

Parameters

response (bytearray) – The response

Returns

State of basic read response evaluation, True if entire response has been read

Return type

bool

_send(modbus_pdu: bytes, slave_addr: int) None[source]

Send Modbus frame via UART

If a flow control pin has been setup, it will be controlled accordingly

Parameters
  • modbus_pdu (bytes) – The modbus Protocol Data Unit

  • slave_addr (int) – The slave address

_send_receive(modbus_pdu: bytes, slave_addr: int, count: bool) bytes[source]

Send a modbus message and receive the reponse.

Parameters
  • modbus_pdu (bytes) – The modbus Protocol Data Unit

  • slave_addr (int) – The slave address

  • count (bool) – The count

Returns

Validated response content

Return type

bytes

_uart_read() bytearray[source]

Read incoming slave response from UART

Returns

Read content

Return type

bytearray

_uart_read_frame(timeout: None = None) bytearray[source]

Read a Modbus frame

Parameters

timeout (Optional[int]) – The timeout

Returns

Received message

Return type

bytearray

_validate_resp_hdr(response: bytearray, slave_addr: int, function_code: int, count: bool) bytes[source]

Validate the response header.

Parameters
  • response (bytearray) – The response

  • slave_addr (int) – The slave address

  • function_code (int) – The function code

  • count (bool) – The count

Returns

Modbus response content

Return type

bytes

get_request(unit_addr_list: None, timeout: None = None) None[source]

Check for request within the specified timeout

Parameters
  • unit_addr_list (Optional[list]) – The unit address list

  • timeout (Optional[int]) – The timeout

Returns

A request object or None.

Return type

Union[Request, None]

send_exception_response(slave_addr: int, function_code: int, exception_code: int) None[source]

Send an exception response to a client.

Parameters
  • slave_addr (int) – The slave address

  • function_code (int) – The function code

  • exception_code (int) – The exception code

send_response(slave_addr: int, function_code: int, request_register_addr: int, request_register_qty: int, request_data: list, values: None = None, signed: bool = True) None[source]

Send a response to a client.

Parameters
  • slave_addr (int) – The slave address

  • function_code (int) – The function code

  • request_register_addr (int) – The request register address

  • request_register_qty (int) – The request register qty

  • request_data (list) – The request data

  • values (Optional[list]) – The values

  • signed (bool) – Indicates if signed

TCP

class umodbus.tcp.ModbusTCP[source]

Bases: Modbus

Modbus TCP client class

bind(local_ip: str, local_port: int = 502, max_connections: int = 10) None[source]

Bind IP and port for incomming requests

Parameters
  • local_ip (str) – IP of this device listening for requests

  • local_port (int) – Port of this device

  • max_connections (int) – Number of maximum connections

get_bound_status() bool[source]

Get the IP and port binding status.

Returns

The bound status, True if already bound, False otherwise.

Return type

bool

class umodbus.tcp.TCP(slave_ip: str, slave_port: int = 502, timeout: float = 5.0)[source]

Bases: CommonModbusFunctions

TCP class handling socket connections and parsing the Modbus data

Parameters
  • slave_ip (str) – IP of this device listening for requests

  • slave_port (int) – Port of this device

  • timeout (float) – Socket timeout in seconds

_create_mbap_hdr(slave_addr: int, modbus_pdu: bytes) None[source]

Create a Modbus header.

Parameters
  • slave_addr (int) – The slave identifier

  • modbus_pdu (bytes) – The modbus Protocol Data Unit

Returns

Modbus header and unique transaction ID

Return type

Tuple[bytes, int]

_send_receive(slave_addr: int, modbus_pdu: bytes, count: bool) bytes[source]

Send a modbus message and receive the reponse.

Parameters
  • slave_addr (int) – The slave identifier

  • modbus_pdu (bytes) – The modbus PDU

  • count (bool) – The count

Returns

Modbus data

Return type

bytes

_validate_resp_hdr(response: bytearray, trans_id: int, slave_addr: int, function_code: int, count: bool = False) bytes[source]

Validate the response header.

Parameters
  • response (bytearray) – The response

  • trans_id (int) – The transaction identifier

  • slave_addr (int) – The slave identifier

  • function_code (int) – The function code

  • count (bool) – The count

Returns

Modbus response content

Return type

bytes

class umodbus.tcp.TCPServer[source]

Bases: object

Modbus TCP host class

_accept_request(accept_timeout: float, unit_addr_list: list) None[source]

Accept, read and decode a socket based request

Parameters
  • accept_timeout (float) – The socket accept timeout

  • unit_addr_list (list) – The unit address list

_send(modbus_pdu: bytes, slave_addr: int) None[source]

Send Modbus Protocol Data Unit to slave

Parameters
  • modbus_pdu (bytes) – The Modbus Protocol Data Unit

  • slave_addr (int) – The slave address

bind(local_ip: str, local_port: int = 502, max_connections: int = 10)[source]

Bind IP and port for incomming requests

Parameters
  • local_ip (str) – IP of this device listening for requests

  • local_port (int) – Port of this device

  • max_connections (int) – Number of maximum connections

get_is_bound() bool[source]

Get the IP and port binding status, legacy support.

Returns

True if bound to IP and port, False otherwise

Return type

bool

get_request(unit_addr_list: None = None, timeout: Optional[int] = None) None[source]

Check for request within the specified timeout

Parameters
  • unit_addr_list (Optional[list]) – The unit address list

  • timeout (int) – The timeout

Returns

A request object or None.

Return type

Union[Request, None]

Raises

Exception – If no socket is configured and bound

property is_bound: bool

Get the IP and port binding status

Returns

True if bound to IP and port, False otherwise

Return type

bool

send_exception_response(slave_addr: int, function_code: int, exception_code: int) None[source]

Send an exception response to a client.

Parameters
  • slave_addr (int) – The slave address

  • function_code (int) – The function code

  • exception_code (int) – The exception code

send_response(slave_addr: int, function_code: int, request_register_addr: int, request_register_qty: int, request_data: list, values: None = None, signed: bool = True) None[source]

Send a response to a client.

Parameters
  • slave_addr (int) – The slave address

  • function_code (int) – The function code

  • request_register_addr (int) – The request register address

  • request_register_qty (int) – The request register qty

  • request_data (list) – The request data

  • values (Optional[list]) – The values

  • signed (bool) – Indicates if signed