Source code for mltk.utils.jlink_stream.jlink_command_stream

import struct
from .jlink_stream import JlinkStream, JlinkStreamOptions
from .data_stream import JLinkDataStream


[docs]class JlinkCommandStream: """Helper class for issuing a command/response to embedded device via J-Link"""
[docs] def __init__( self, command_stream='cmd', response_stream='res', options:JlinkStreamOptions = None, ): self._jlink_ifc = JlinkStream(options=options) self._command_stream_name = command_stream self._response_stream_name = response_stream self._command_stream:JLinkDataStream = None self._response_stream:JLinkDataStream = None
[docs] def open(self, reset_device=False): """Open the JLink connection and command streams""" self._jlink_ifc.connect(reset_device=reset_device) try: self._command_stream = self._jlink_ifc.open( name=self._command_stream_name, mode='w' ) self._response_stream = self._jlink_ifc.open( name=self._response_stream_name, mode='r' ) except: self._jlink_ifc.disconnect()
[docs] def close(self): """Close the JLink connection and streams""" try: self._jlink_ifc.disconnect() finally: self._command_stream = None self._response_stream = None
[docs] def issue(self, data:bytes, timeout:float=7.0, no_response=False) -> bytes: """Send a command to the device and receive the command response""" if self._command_stream is None: raise Exception('Not connected') cmd_length = len(data) cmd_data = bytearray(struct.pack('<L', cmd_length)) cmd_data.extend(data) self._command_stream.write(cmd_data, timeout=timeout, flush=True) if no_response: return None length_bytes = self._response_stream.read(4, timeout=timeout) if length_bytes is None or len(length_bytes) != 4: raise TimeoutError('Timed-out waiting for response') length = struct.unpack('<L', length_bytes)[0] return self._response_stream.read_all(length, timeout=timeout)
def __enter__(self): self.open() return self def __exit__(self, dtype, value, tb): self.close()