#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Fakes of several MicroPython classes which are not available in UNIX"""
import _thread
try:
import ulogging as logging
from umodbus.typing import Optional, Union
except ImportError:
import logging
from typing import Optional, Union
from queue import Queue
import socket
import sys
import time
[docs]class MachineError(Exception):
"""Base class for exceptions in this module."""
pass
[docs]class UART(object):
"""
Fake Micropython UART class
See https://docs.micropython.org/en/latest/library/machine.UART.html
"""
def __init__(self,
uart_id: int,
baudrate: int = 9600,
bits: int = 8,
parity: int = None,
stop: int = 1,
tx: int = 1,
rx: int = 2,
timeout: int = 0) -> None:
self._uart_id = uart_id
if timeout == 0:
timeout = 5.0
self._timeout = timeout
self.logger = self._configure_logger()
# Standard loopback interface address (localhost)
# Port to listen on (non-privileged ports are > 1023)
if sys.implementation.name.lower() == 'micropython':
# on MicroPython connect to the (potentially) running TCP server
# see docker-compose-rtu-test.yaml at micropython-client-rtu
self._host = '172.25.0.2'
else:
# on non MicroPython system connect to localhost
self._host = '127.0.0.1'
self._port = 65433
self._sock = None
self._server_lock = _thread.allocate_lock()
self._send_queue = Queue(maxsize=10)
self._receive_queue = Queue(maxsize=10)
self._is_server = self.no_socket_server_exists(
host=self._host,
port=self._port)
self.logger.debug('Timeout is: {}'.format(self._timeout))
if self._is_server:
self._sock = socket.socket()
if sys.implementation.name.lower() == 'micropython':
# MicroPython requires a bytearray
# https://docs.micropython.org/en/v1.18/library/socket.html#socket.getaddrinfo
addr_info = socket.getaddrinfo(self._host, self._port)[0][-1]
else:
addr_info = ((self._host, self._port))
self._sock.bind(addr_info)
self._sock.listen(10)
# self._sock.settimeout(self._timeout)
self._sock.setblocking(False)
self.logger.debug('Bound to {} on {}'.
format(self._host, self._port))
# start the socket server thread as soon as init is done
self.serving = True
else:
self._sock = socket.socket()
self._sock.settimeout(self._timeout)
# self._sock.setblocking(False)
if sys.implementation.name.lower() == 'micropython':
# MicroPython requires a bytearray
# https://docs.micropython.org/en/v1.18/library/socket.html#socket.socket.connect
addr_info = socket.getaddrinfo(self._host, self._port)[0][-1]
else:
addr_info = ((self._host, self._port))
self._sock.connect(addr_info)
self.logger.debug('Connecting to {} on {}'.
format(self._host, self._port))
self.logger.debug('Will act as {}'.
format('server' if self._is_server else 'client'))
[docs] def no_socket_server_exists(self,
host: str,
port: int,
timeout: Optional[float] = 1.0) -> bool:
"""
Determines if no socket server exists.
:param host: The host IP
:type host: str
:param port: The port
:type port: int
:param timeout: The timeout
:type timeout: float
:returns: True if no socket server exists, False otherwise.
:rtype: bool
"""
become_server = True
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
if sys.implementation.name.lower() == 'micropython':
# MicroPython requires a bytearray
# https://docs.micropython.org/en/v1.18/library/socket.html#socket.socket.connect
addr_info = socket.getaddrinfo(host, port)[0][-1]
else:
addr_info = ((host, port))
sock.connect(addr_info)
except Exception:
self.logger.debug('Exception during potential server connect')
self.logger.debug('Failed to connect to {} on {}'.
format(host, port))
sock.close()
# assume no server is available
# this device shall thereby become a server itself
return become_server
self.logger.debug('Connection to server successful')
# send test message to server
sock.send(b'Ping. Hello UART server?') # not the best message
self.logger.debug('Sent ping to potential server')
# wait for specified timeout for a response from the server
try:
received_data = sock.recv(127)
if received_data:
self.logger.info('Received as response to ping: {}'.
format(received_data))
if received_data == b'pong':
# a potential server shall respond with b'pong'.
# The return value shall thereby by "False", as the
# check for no socket server existance failed
become_server = False
except OSError as e:
# 11 = timeout expired (MicroPython)
# 35 = Resource temporarily unavailable (Python OS X)
# "timed out" (Python Windows)
if e.args[0] in [11, 35, 'timed out']:
self.logger.debug('Timeout waiting for server response')
else:
raise e
sock.close()
return become_server
@property
def is_server(self) -> bool:
"""
Determines if this object acts as server aka UART host.
:returns: True if server, False otherwise.
:rtype: bool
"""
return self._is_server
@property
def serving(self) -> bool:
"""
Get the server status.
:returns: Flag whether socket server is running or not.
:rtype: bool
"""
return self._server_lock.locked()
@serving.setter
def serving(self, value: bool) -> None:
"""
Start or stop running the socket server.
:param value: The value
:type value: bool
"""
if value and (not self._server_lock.locked()):
# start socket server if not already running
self._server_lock.acquire()
# parameters of the _serve function
params = (
self._sock,
self._send_queue,
self._receive_queue,
self._server_lock,
self.logger
)
_thread.start_new_thread(self._serve, params)
self.logger.info('Socket {} started'.
format('server' if self._is_server else 'client'))
elif (value is False) and self._server_lock.locked():
# stop server if not already stopped
self._server_lock.release()
self.logger.info('Socket {} lock released'.
format('server' if self._is_server else 'client'))
[docs] def _serve(self,
sock: socket,
send_queue: Queue,
receive_queue: Queue,
lock: int,
logger: logging.Logger) -> None:
"""
Internal socket server function
:param sock: The socket
:type sock: socket
:param send_queue: The send queue
:type send_queue: Queue
:param receive_queue: The receive queue
:type receive_queue: Queue
:param lock: The lock flag
:type lock: int
:param logger: The logger
:type logger: logging.Logger
"""
if sock is None:
raise Exception('Server not bound')
else:
logger.info('Acting as server')
_client_sock = None
while lock.locked():
new_client_sock = None
try:
# either a connection is made from a client or
# an OSError is thrown after the configured timeout
new_client_sock, addr = sock.accept()
except OSError as e:
# 11 = timeout expired (MicroPython)
# 35 = Resource temporarily unavailable (Python OS X)
# "timed out" (Python Windows)
if e.args[0] not in [11, 35, 'timed out']:
logger.warning('OSError {}: {}'.
format(e, e.args[0]))
if new_client_sock is not None:
logger.debug('Connection from {}'.format(addr))
if _client_sock is not None:
logger.debug('Closed connection to last client')
_client_sock.close()
_client_sock = new_client_sock
_client_sock.settimeout(0.5)
if _client_sock is not None:
# get client message
try:
received_data = _client_sock.recv(127)
# log data and send response
if received_data:
logger.info('Received from client: {}'.
format(received_data))
if received_data == b'Ping. Hello UART server?':
_client_sock.send(b'pong')
logger.info('Responded with "pong"')
else:
receive_queue.put_nowait(received_data)
# grant host enough time to process received
# data, prepare a response for the client and put
# it into the send_queue
# Sleep time shall be less than client timeout
# specified during UART init
# Approx. process time is below 100ms
time.sleep(0.1)
except Exception as e:
logger.debug('Error during connection: {}'.format(e))
_client_sock.close()
_client_sock = None
if send_queue.qsize():
send_data = send_queue.get_nowait()
logger.info('Responding to client: {}'.
format(send_data))
_client_sock.send(send_data)
[docs] def deinit(self) -> None:
"""Turn off the UART bus"""
raise MachineError('Not yet implemented')
[docs] def any(self) -> int:
"""
Return number of characters available.
Mock does not return the actual number of characters but the elements
in the receive queue. Which is usually "0" or "1"
:returns: Number of characters available
:rtype: int
"""
available_data_amount = 0
if self._is_server:
available_data_amount = self._receive_queue.qsize()
else:
# patch available data, as reading the socket buffer would drain it
available_data_amount = 1
return available_data_amount
[docs] def read(self, nbytes: Optional[int] = None) -> Union[None, bytes]:
"""
Read characters
:param nbytes: The amount of bytes to read
:type nbytes: int
:returns: Bytes read, None on timeout
:rtype: Union[None, bytes]
"""
data = None
if self._is_server:
if self._receive_queue.qsize():
data = self._receive_queue.get_nowait()
else:
try:
data = self._sock.recv(127)
except OSError as e:
self.logger.warning('OSError {}: {}'.format(e, e.args[0]))
return data
[docs] def readinto(self,
buf: bytes,
nbytes: Optional[int] = None) -> Union[None, bytes]:
"""
Read bytes into the buffer.
If nbytes is specified then read at most that many bytes. Otherwise,
read at most len(buf) bytes
:param buf: The buffer
:type buf: bytes
:param nbytes: The nbytes
:type nbytes: Optional[int]
:returns: Number of bytes read and stored in buffer, None on timeout
:rtype: Union[None, bytes]
"""
raise MachineError('Not yet implemented')
[docs] def readline(self) -> Union[None, str]:
"""
Read a line, ending in a newline character
:returns: The line read, None on timeout.
:rtype: Union[None, str]
"""
raise MachineError('Not yet implemented')
[docs] def write(self, buf: bytes) -> Union[None, int]:
"""
Write the buffer of bytes to the bus
:param buf: The buffer
:type buf: bytes
:returns: Number of bytes written, None on timeout
:rtype: Union[None, int]
"""
if self._is_server:
self._send_queue.put_nowait(buf)
else:
self._sock.send(buf)
return len(buf)
[docs] def sendbreak(self) -> None:
"""Send a break condition on the bus"""
raise MachineError('Not yet implemented')
'''
# flush introduced in MicroPython v1.20.0
# use manual timing calculation for testing
def flush(self) -> None:
"""
Waits until all data has been sent
In case of a timeout, an exception is raised
Only available with newer versions than 1.19
"""
raise MachineError('Not yet implemented')
'''
[docs] def txdone(self) -> bool:
"""
Tells whether all data has been sent or no data transfer is happening
:returns: If data transmission is ngoing return False, True otherwise
:rtype: bool
"""
raise MachineError('Not yet implemented')
[docs]class Pin(object):
"""
Fake Micropython Pin class
See https://docs.micropython.org/en/latest/library/machine.Pin.html
"""
IN = 1
OUT = 2
def __init__(self, pin: int, mode: int):
self._pin = pin
self._mode = mode
self._value = False
[docs] def value(self, val: Optional[Union[int, bool]] = None) -> Optional[bool]:
"""
Set or get the value of the pin
:param val: The value
:type val: Optional[Union[int, bool]]
:returns: State of the pin if no value specifed, None otherwise
:rtype: Optional[bool]
"""
if val is not None and self._mode == Pin.OUT:
# set pin state
self._value = bool(val)
else:
# get pin state
return self._value
[docs] def on(self) -> None:
"""Set pin to "1" output level"""
if self._mode == Pin.OUT:
self.value(val=True)
[docs] def off(self) -> None:
"""Set pin to "0" output level"""
if self._mode == Pin.OUT:
self.value(val=False)