mltk.utils.uart_stream.UartStream

class UartStream[source]

Allows for streaming binary data between a Python script and embedded device via UART

Features:

  • Asynchronous reception of binary data

  • Data flow control

  • C++ library (see <mltk repo>/cpp/shared/uart_stream)

  • Send/receive “commands”

See the source code on Github: mltk/utils/uart_stream.py

See the corresponding example C++ app on Github: cpp/shared/uart_stream/examples/data_test

Parameters:
  • port (str) – Name of serial COM port, if starts with “regex:” then try to find a matching port by listing all ports

  • baud (int) – Baud rate

  • rx_buffer_length (int) – Size of the RX buffer in bytes

  • open_synchronize_timeout (bool) – Number of seconds to wait for the link to synchronize with the device

Properties

baud

Serial BAUD rate

is_open

Return if the serial connection is opened

is_synchronized

Return if the data link is synchronized

port

Serial port

rx_bytes_available

The number of received bytes in the RX buffer, ready to read by the application

tx_bytes_available

The maximum of bytes that may be immediately written by the application

Methods

__init__

close

Close the serial COM port

flush_input

Drop any data in the RX buffer

list_ports

Return a list of COM ports

open

Open the a connection to the serial port

read

Read binary data

read_all

Read binary data

read_command

Read a command sent from the other side

resolve_port

List the COM ports and try to find the given port in the list

synchronize

Synchronize UART link with other size

synchronize_blocking

Synchronize UART link with other size

wait

Wait for a data link event

write

Write binary data

write_all

Write binary data

write_command

Send a command to the other side

__init__(port='regex:JLink CDC UART Port', baud=115200, rx_buffer_length=4096, open_synchronize_timeout=60.0)[source]
Parameters:
  • port (str) –

  • baud (int) –

  • rx_buffer_length (int) –

  • open_synchronize_timeout (bool) –

property baud: int

Serial BAUD rate

Return type:

int

property port: str

Serial port

Return type:

str

property is_open: bool

Return if the serial connection is opened

Return type:

bool

property rx_bytes_available: int

The number of received bytes in the RX buffer, ready to read by the application

Return type:

int

property tx_bytes_available: int

The maximum of bytes that may be immediately written by the application

Return type:

int

property is_synchronized: bool

Return if the data link is synchronized

Return type:

bool

static list_ports()[source]

Return a list of COM ports

Return type:

List[Dict[str, str]]

static resolve_port(port)[source]

List the COM ports and try to find the given port in the list

Return type:

str

Parameters:

port (str) –

open(timeout=None)[source]

Open the a connection to the serial port

Parameters:

timeout (float) –

close()[source]

Close the serial COM port

synchronize(timeout=None, ack_read_data=True)[source]

Synchronize UART link with other size

Before data transfer may behind, both sides need to “synchronize”. This should be periodically called until it returns true indicating that the link is sync’d.

NOTE: This is non-blocking

Return type:

bool

Parameters:

timeout (float) –

synchronize_blocking(timeout=None)[source]

Synchronize UART link with other size

Before data transfer may behind, both sides need to “synchronize”. This will block until the link is synchronized.

Parameters:

timeout (float) – Maximum amount of time to wait for synchronization

Return type:

bool

Returns:

Status of the synchronization

read(max_size=None)[source]

Read binary data

Read any data that is immediately available in the RX buffer.

Parameters:

max_size (int) – The maximum amount of data to read. If None, the read rx_bytes_available

Return type:

bytes

Returns:

Read binary data, up to min(rx_bytes_available, max_size)

read_all(length=None, timeout=None)[source]

Read binary data

Read the specified amount of binary, blocking until all the data is read.

Parameters:
  • length (int) – The amount of data to read

  • timeout (float) – The maximum time to wait for the data, if None then block forever until all data is read

Return type:

bytes

Returns:

Read binary data which may be less than length if the timeout is exceeded

flush_input(timeout=0.1)[source]

Drop any data in the RX buffer

This will block for up to timeout while the data is read and dropped

Parameters:

timeout (float) –

write(data)[source]

Write binary data

Write any data that is immediately available to be sent to the other side. This will only write up to min(tx_bytes_available, len(data))

Parameters:

data (bytes) – The binary data to write

Return type:

int

Returns:

The number of bytes written. -1 if the link is not synchronized

write_all(data, timeout=None)[source]

Write binary data

Write all the given binary data, blocking until all data is written or the given timeout is exceeded.

Parameters:
  • data (bytes) – The binary data to write

  • timeout (float) – The maximum amount of time to wait. If None then block forever until all the data is written

Return type:

int

Returns:

The number of bytes written

write_command(code, payload=None)[source]

Send a command to the other side

A “command” consists of an unsigned, 8-bit code and an optional, 6-byte payload. While the command is guaranteed to be sent the other side. Reception at the other side’s application-level is not guaranteed.

NOTE: Unread commands on the other side will be silently dropped

Return type:

bool

Parameters:
  • code (int) –

  • payload (bytes) –

read_command()[source]

Read a command sent from the other side

A “command” consists of an unsigned, 8-bit code and an optional, 6-byte payload. If the other side sends commands, then the application should periodically call this API to receive the command.

NOTE: Unread commands will be silently dropped

If a command is available, then the command.code will be > 0

Return type:

Command

wait(timeout=None, synchronize=True)[source]

Wait for a data link event

This blocks on a Threading.Condition until an event occurs on the data link.

If synchronize=True, then this ensure the data link is synchronized

Return type:

bool

Parameters:

timeout (float) –