Source code for mltk.utils.jlink_stream.jlink_stream


import traceback
from threading import Thread, Event, RLock


from .data_stream import JLinkDataStream


[docs]class JlinkStreamOptions: """JLinkStream configuration options""" serial_number = None core = 'cortex-m4' interface = 'swd' clock = 12000 sram_address = 0x20000000 sram_size = -1 lib_path = None polling_period = 0.1
[docs]class JlinkStream: """This allows for transferring binary data between a Python script and a JLink-enabled embedded device via the debug interface See the source code on Github: `mltk/utils/jlink_stream <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/jlink_stream>`_ """
[docs] def __init__(self, options:JlinkStreamOptions = None): # import here to avoid circular import from .device_interface import DeviceInterface # #pylint: disable=import-outside-toplevel if options is None: options = self.default_options self._ifc = DeviceInterface(options) self._is_connected = Event() self._data_available = Event() self._stream_lock = RLock() self._streams = {} self.polling_period = options.polling_period
@property def default_options(self) -> JlinkStreamOptions: """Default configuration options""" return JlinkStreamOptions() @property def is_connected(self) -> bool: """Return if the device is connected""" return self._is_connected.is_set()
[docs] def connect(self, threaded=True, reset_device=False): """Open a connection to an embedded device via J-Link NOTE: The embedded device must be actively running the JLink library """ if self.is_connected: raise Exception('Already connected') self._ifc.connect(reset_device=reset_device) self._is_connected.set() self._data_available.clear() if threaded: t = Thread( target=self._processing_thread_loop, name='Jlink Stream Data Loop', daemon=True ) t.start()
[docs] def disconnect(self): """Close the connection to the embedded device""" if not self.is_connected: return self._is_connected.clear() self._data_available.set() with self._stream_lock: for _, stream in self._streams.items(): try: stream.close() except: pass self._streams = {} try: self._ifc.disconnect() except: pass
[docs] def open(self, name:str, mode='r') -> JLinkDataStream: """Open a data stream to the embedded device""" with self._stream_lock: if name in self._streams: raise Exception(f'Stream with name: {name} already opened') stream = self._ifc.open(name=name, mode=mode) # pylint: disable=protected-access stream._set_notify_event(self._data_available) with self._stream_lock: self._streams[name] = stream self._data_available.set() return stream
[docs] def close(self, name:str): """Close a device data stream""" with self._stream_lock: if name in self._streams: raise Exception(f'Stream with name: {name} not found') stream = self._streams[name] del self._streams[name] stream.close()
[docs] def read(self, name:str, max_size:int=None, timeout:float=None) -> bytes: """Read data from a data stream opened for reading""" with self._stream_lock: if not name in self._streams: raise Exception(f'Stream with name: {name} not found') stream = self._streams[name] return stream.read(max_size, timeout)
[docs] def write(self, name:str, data:bytes, timeout:float=None) -> int: """Write data to a data stream opened from writing""" with self._stream_lock: if not name in self._streams: raise Exception(f'Stream with name: {name} not found') stream = self._streams[name] return stream.write(data, timeout)
[docs] def process(self): """Process the device data streams This is periodically called in a separated thread if 'threaded=True' in the connect() API. Otherwise, this should be periodically called. """ if not self.is_connected: raise Exception('Not connected') # Read the buffer mask to immediately see if any more data is being buffer_status_mask = self._ifc.buffer_status_mask if buffer_status_mask == 0: # Wait for data to be available or for the polling period to expire self._data_available.wait(timeout=self.polling_period) if not self.is_connected: return # Clear the flag as we're now going to process all the streams self._data_available.clear() if buffer_status_mask == 0: # Read the data available mask from the device buffer_status_mask = self._ifc.buffer_status_mask # Acquire the streams lock with self._stream_lock: closed_streams = [] # Process each opened stream for name, stream in self._streams.items(): # If the stream is closed then add it to the list if not stream.is_opened: closed_streams.append(name) continue # Process the stream try: # pylint: disable=protected-access stream._process(buffer_status_mask) except: traceback.print_exc() # Remove any closed streams for stream in closed_streams: del self._streams[stream]
def _processing_thread_loop(self): """Data stream processing loop This is periodically called in the Python thread. It polls the embedded device's data streams """ # While the interface is active while self.is_connected: self.process() def __enter__(self): self.connect() return self def __exit__(self, dtype, value, tb): self.disconnect()