Source code for mltk.utils.serial_reader

from typing import List, Dict, Tuple, Union, Callable, TextIO
import sys
import time
import re
import threading
import queue
from dataclasses import dataclass

import serial
from .python import as_list

[docs]class SerialReader: """Utility for reading data from a serial COM port NOTE: For the regex args below, if a string is provided then it will automatically be converted to an ``re.Pattern`` with the ``flags re.MULTILINE|re.DOTALL|re.IGNORECASE`` Args: port: Name of serial COM port, if starts with "regex:" then try to find a matching port by listing all ports baud: Baud rate outfile: File-like object to write received serial data, use sys.stdout if omitted mode: outfile mode, if mode='r' then write as ASCII (ignore char > 127 and \\r), if mode='rb' then write as binary start_regex: Regex or list of Regex to use match against received serial data before writing to the captured_data buffer during read() stop_regex: Regex or list of Regex to use match against received serial data to stop writing to the captured_data buffer and read() returns fail_regex: Regex or list of Regex to use match against received serial data to abort read() callback_regex: (Regex,Callback) or list of (Regex,Callback) which will invoke the given callback with the re.Match for each regex See the source code on Github: `mltk/utils/ <>`_ """
[docs] def __init__( self, port: str, baud=115200, outfile:TextIO=None, mode = 'r', start_regex: Union[str, re.Pattern, List[re.Pattern]] = None, stop_regex: Union[str, re.Pattern, List[re.Pattern]] = None, fail_regex: Union[str, re.Pattern, List[re.Pattern]] = None, callback_regex: List[Tuple[re.Pattern,Callable[[re.Match],None]]] = None, ignore_chars:List[int]=None ): self.port = port self.baud = baud self.mode = mode self.outfile:TextIO = outfile or sys.stdout self.start_regex = start_regex self.stop_regex = stop_regex self.fail_regex = fail_regex self.ignore_chars = ignore_chars or [13] + list(range(127,256)) # Non-ASCII and \r if callback_regex and not isinstance(callback_regex, list): raise ValueError('callback_regex must be a list of tuples') self.callback_regex = callback_regex self._handle : serial.Serial = None self._started = False self._stopped = False self._failed = False self._start_regex:List[re.Pattern] = None self._stop_regex:List[re.Pattern] = None self._fail_regex:List[re.Pattern] = None self._callback_regex:List[_CallbackRegexContext] = None self._captured_data = '' self._error_message = '' self._rx_thread_active = threading.Event() self._rx_thread:threading.Thread = None self._rx_queue = queue.Queue()
@property def is_open(self) -> bool: """Return if the serial connection is opened""" return self._handle is not None and self._handle.is_open @property def started(self) -> bool: """Return if the start_regex condition has been found""" return self._started @property def stopped(self) -> bool: """Return if the stop_regex condition has been found""" return self._stopped @property def failed(self) -> bool: """Return if the fail_regex condition has been found""" return self._failed @property def captured_data(self) -> str: """Data received by read() between the start_regex and stop_regex conditions""" return self._captured_data @property def error_message(self) -> str: """Data received after fail_regex was found""" return self._error_message
[docs] @staticmethod def list_ports() -> List[Dict[str, str]]: """Retrun a list of COM ports""" retval = [] for port, desc, hwid in sorted( retval.append(dict(port=port, desc=desc, hwid=hwid)) return retval
[docs] @staticmethod def resolve_port(port: str) -> str: """List the COM ports and try to find the given port in the list""" if not port: raise ValueError('Null port provided') ports = SerialReader.list_ports() port_re = None if port.startswith('regex:'): port_re = re.compile(port[len('regex:'):], re.IGNORECASE) for port_details in ports: if port_re is not None: if port_re.match(port_details['desc']): return port_details['port'] if port_re.match(port_details['hwid']): return port_details['port'] continue if port_details['port'].lower() == port.lower(): return port_details['port'] if ports: available_ports = 'Available COM ports:\n' + '\n'.join([x['port'] for x in ports]) else: available_ports = 'No serial COM ports available' raise RuntimeError( f'Serial COM port not found: {port}\n' \ f'{available_ports}\n\n' \ 'Is the development board on and properly enumerated?\n' \ 'Are any other programs connected to the board\'s COM port?' )
[docs] def open(self): """Open the a connection to the serial port""" port = SerialReader.resolve_port(self.port) if not port: raise ValueError('Invalid serial port') try: self._handle = serial.Serial( port=port, baudrate=self.baud, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS ) except Exception as e: raise RuntimeError( # pylint: disable=raise-missing-from f'Failed to open COM port: {port}\n' \ 'Ensure the development board is on and properly enumerated.\n' \ 'Also ensure no other serial terminals are connected to the COM port.\n' \ f'Error details: {e}' ) self.flush() def _map_pattern(s): if isinstance(s, str): return re.compile(s, flags=re.MULTILINE|re.DOTALL|re.IGNORECASE) if isinstance(s, re.Pattern): return s raise ValueError(f'Invalid regex: {s}, must be string or re.Pattern') self._start_regex = [_map_pattern(r) for r in as_list(self.start_regex)] self._stop_regex = [_map_pattern(r) for r in as_list(self.stop_regex)] self._fail_regex = [_map_pattern(r) for r in as_list(self.fail_regex)] self._callback_regex = [_CallbackRegexContext(_map_pattern(r), cb) for (r, cb) in as_list(self.callback_regex)] self._captured_data = '' self._started = False self._stopped = False self._failed = False if not self._start_regex and (self._stop_regex or self._fail_regex): self._started = True self._rx_thread_active.clear() self._rx_thread = threading.Thread( name='SerialRx', target=self._read_loop, daemon=True ) self._rx_thread.start()
[docs] def close(self): """Close the serial COM port""" if self._rx_thread is not None: self._rx_thread_active.set() self._rx_thread.join(5) self._rx_thread = None while not self._rx_queue.empty(): self._rx_queue.get() if self.is_open: self._handle.close() self._handle = None
[docs] def flush(self, timeout:float=None): """Flush any received data""" start_time = time.time() while True: if not self.is_open: raise RuntimeError('Connection not opened') self._handle.reset_input_buffer() self._handle.reset_output_buffer() self._captured_data = '' while not self._rx_queue.empty(): self._rx_queue.get() if not timeout or (time.time() - start_time) > timeout: break
[docs] def read( self, timeout:float=None, activity_timeout:float=None, abort_event:threading.Event=None ) -> bool: """Read data for the given timeout or until stop_regex or fail_regex have been found in the received data. The captured_data property will contain the received data between the start_regex and stop_regex conditions. The error_message property will contain any data received after the fail_regex conditions. NOTE: The outfile will contain ALL received data, regardless of the start_regex/stop_regex conditions Args: timeout: Maximum time in seconds to receive serial data. If None then read until stop_regex/fail_regex found. activity_timeout: Maximum amount of time to wait without any new data received from the serial point Returns: True if the stop_regex or fail_regex were found in the received data. False on on timeout. """ if not self.is_open: raise RuntimeError('Connection not opened') # Wait forever if not timeout is given timeout = timeout or 1e9 abort_event = abort_event or threading.Event() previous_activity_timestamp = time.time() start_time = time.time() saved_terminators = None if hasattr(self.outfile, 'set_terminator'): saved_terminators = self.outfile.set_terminator('') try: while (time.time() - start_time) < timeout: if abort_event.is_set(): return False if self._buffer_data(): previous_activity_timestamp = time.time() if self._check_for_fail_condition(): return True if not self._wait_for_start_condition(): continue if self._check_for_stop_condition(): return True if activity_timeout and (time.time() - previous_activity_timestamp) > activity_timeout: break time.sleep(0.250) finally: if saved_terminators: self.outfile.set_terminator(saved_terminators) return False
[docs] def write( self, data:str, check_fail_condition=True, check_start_condition=True, check_stop_condition=True, delay_per_char:float=None ): """Write the given amount of data""" if not self.is_open: raise RuntimeError('Connection not opened') if check_fail_condition and self._check_for_fail_condition(): return -1 if check_start_condition and not self._wait_for_start_condition(): return -1 if check_stop_condition and self._check_for_stop_condition(): return -1 data = data.encode('utf-8') if delay_per_char: for c in data: self._handle.write(bytes([c])) self._handle.flush() time.sleep(delay_per_char) else: self._handle.write(data) self._handle.flush()
def _buffer_data(self) -> bool: """Receive data from the COM port and write the the outfile and _captured_data buffer """ if not self.is_open: raise RuntimeError('Connection not opened') new_data = '' while not self._rx_queue.empty(): data = self._rx_queue.get() if self.mode == 'rb': new_data = data elif self.mode == 'r': for d in data: if d in self.ignore_chars: continue new_data += chr(d) if new_data and self.outfile is not None: self.outfile.write(new_data) self.outfile.flush() if self._callback_regex or self._start_regex or self._stop_regex or self._fail_regex: self._captured_data += new_data for ctx in self._callback_regex: data_subset = self._captured_data[ctx.offset:] match = if match: ctx.offset += match.end() ctx.callback(match) return len(new_data) > 0 def _read_loop(self): """Thread loop to read the COM port""" while True: if self._handle.in_waiting > 0: data = self._rx_queue.put(data) if self._rx_thread_active.wait(0.005): break def _wait_for_start_condition(self): """Determine if a start condition is found in the _captured_data buffer If so, reset the _captured_data buffer and set _started=True. This immediately returns if the start condition was previously found. """ if not self._start_regex or self._started: return True found = False for regex in self._start_regex: match = if match is not None: found = True break if not found: return False self._started = True self._captured_data = self._captured_data[match.end(0)+1:] return True def _check_for_stop_condition(self): """Check if the stop_regex is found in the _captured_data buffer""" if not self._stop_regex: return False found = False for regex in self._stop_regex: match = if match is not None: found = True break if not found: return False self._stopped = True self._captured_data = self._captured_data[:match.start(0)] return True def _check_for_fail_condition(self): """Check if a fail_regex is found in the _captured_data buffer""" if not self._fail_regex: return False found = False for regex in self._fail_regex: match = if match is not None: found = True break if not found: return False self._failed = True start_index = match.start(0) self._error_message = self._captured_data[start_index:] return True def __enter__ (self): return self def __exit__ (self, *args, **kwargs): self.close()
@dataclass class _CallbackRegexContext: regex:re.Pattern callback:Callable[[re.Match],None] offset:int=0