[docs]classJlinkStream:"""This allows for transferring binary data between a Python script and a JLink-enabled embedded device via the debug interface See the source code on Github: `mltk/utils/jlink_stream <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/jlink_stream>`_ """
[docs]def__init__(self,options:JlinkStreamOptions=None):# import here to avoid circular importfrom.device_interfaceimportDeviceInterface# #pylint: disable=import-outside-toplevelifoptionsisNone:options=self.default_optionsself._ifc=DeviceInterface(options)self._is_connected=Event()self._data_available=Event()self._stream_lock=RLock()self._streams={}self.polling_period=options.polling_period
@propertydefdefault_options(self)->JlinkStreamOptions:"""Default configuration options"""returnJlinkStreamOptions()@propertydefis_connected(self)->bool:"""Return if the device is connected"""returnself._is_connected.is_set()
[docs]defconnect(self,threaded=True,reset_device=False):"""Open a connection to an embedded device via J-Link NOTE: The embedded device must be actively running the JLink library """ifself.is_connected:raiseException('Already connected')self._ifc.connect(reset_device=reset_device)self._is_connected.set()self._data_available.clear()ifthreaded:t=Thread(target=self._processing_thread_loop,name='Jlink Stream Data Loop',daemon=True)t.start()
[docs]defdisconnect(self):"""Close the connection to the embedded device"""ifnotself.is_connected:returnself._is_connected.clear()self._data_available.set()withself._stream_lock:for_,streaminself._streams.items():try:stream.close()except:passself._streams={}try:self._ifc.disconnect()except:pass
[docs]defopen(self,name:str,mode='r')->JLinkDataStream:"""Open a data stream to the embedded device"""withself._stream_lock:ifnameinself._streams:raiseException(f'Stream with name: {name} already opened')stream=self._ifc.open(name=name,mode=mode)# pylint: disable=protected-accessstream._set_notify_event(self._data_available)withself._stream_lock:self._streams[name]=streamself._data_available.set()returnstream
[docs]defclose(self,name:str):"""Close a device data stream"""withself._stream_lock:ifnameinself._streams:raiseException(f'Stream with name: {name} not found')stream=self._streams[name]delself._streams[name]stream.close()
[docs]defread(self,name:str,max_size:int=None,timeout:float=None)->bytes:"""Read data from a data stream opened for reading"""withself._stream_lock:ifnotnameinself._streams:raiseException(f'Stream with name: {name} not found')stream=self._streams[name]returnstream.read(max_size,timeout)
[docs]defwrite(self,name:str,data:bytes,timeout:float=None)->int:"""Write data to a data stream opened from writing"""withself._stream_lock:ifnotnameinself._streams:raiseException(f'Stream with name: {name} not found')stream=self._streams[name]returnstream.write(data,timeout)
[docs]defprocess(self):"""Process the device data streams This is periodically called in a separated thread if 'threaded=True' in the connect() API. Otherwise, this should be periodically called. """ifnotself.is_connected:raiseException('Not connected')# Read the buffer mask to immediately see if any more data is beingbuffer_status_mask=self._ifc.buffer_status_maskifbuffer_status_mask==0:# Wait for data to be available or for the polling period to expireself._data_available.wait(timeout=self.polling_period)ifnotself.is_connected:return# Clear the flag as we're now going to process all the streamsself._data_available.clear()ifbuffer_status_mask==0:# Read the data available mask from the devicebuffer_status_mask=self._ifc.buffer_status_mask# Acquire the streams lockwithself._stream_lock:closed_streams=[]# Process each opened streamforname,streaminself._streams.items():# If the stream is closed then add it to the listifnotstream.is_opened:closed_streams.append(name)continue# Process the streamtry:# pylint: disable=protected-accessstream._process(buffer_status_mask)except:traceback.print_exc()# Remove any closed streamsforstreaminclosed_streams:delself._streams[stream]
def_processing_thread_loop(self):"""Data stream processing loop This is periodically called in the Python thread. It polls the embedded device's data streams """# While the interface is activewhileself.is_connected:self.process()def__enter__(self):self.connect()returnselfdef__exit__(self,dtype,value,tb):self.disconnect()
Important: We use cookies only for functional and traffic analytics.
We DO NOT use cookies for any marketing purposes. By using our site you acknowledge you have read and understood our Cookie Policy.