Source code for mltk.core.tflite_micro.tflite_micro


import os
import sysconfig
import logging
import importlib
import copy
import threading
import inspect
import functools
from typing import Union, List, Dict, Tuple, Callable
import numpy as np

from mltk.core.tflite_model import TfliteModel, TfliteLayer
from mltk.core.utils import get_mltk_logger
from mltk.utils.python import (as_list, get_case_insensitive, import_module_at_path, append_exception_msg)
from mltk.utils.path import (fullpath, get_user_setting)
from .tflite_micro_accelerator import (TfliteMicroAccelerator, PlaceholderTfliteMicroAccelerator)
from .tflite_micro_model import TfliteMicroModel, TfliteMicroModelDetails



[docs]class TfliteMicro: """This class wraps the TF-Lite Micro C++ library This class allows for loading a .tflite model file into the TF-Lite Micro (TFLM) C++ library and running inference using either the TFLM reference kernels or hardware accelerated kernels. """ _model_lock = threading.Lock() _wrapper = None _logger:logging.Logger = None _logged_errors:List[str] = [] _accelerators:Dict[str,TfliteMicroAccelerator] = {} _accelerator_paths:List[str] = []
[docs] @staticmethod def git_hash() -> str: """Return the GIT hash of the MLTK repo used to compile the wrapper library""" wrapper = TfliteMicro._load_wrapper() return wrapper.git_hash()
[docs] @staticmethod def api_version() -> int: """Return the TFLM API version number. This is used to ensure accelerator wrappers are compatible with this TFLM wrapper""" wrapper = TfliteMicro._load_wrapper() return wrapper.api_version()
[docs] @staticmethod def set_log_level(level: str) -> str: """Set the C++ wrapper logging level NOTE: This sets the level in the C++ wrapper, NOT the Python logger. Increasing the logging level can help with throughput as each log generated by the wrapper needs to be forwarded to the Python logger. Returns: The previous log level """ wrapper = TfliteMicro._load_wrapper() prev_level = wrapper.get_log_level() if not wrapper.set_log_level(level): raise RuntimeError(f'Failed to set MLTK log level to {level}') return prev_level
[docs] @staticmethod def get_log_level() -> str: """Return the C++ wrapper's logging level NOTE: This returns the C++ wrapper's logging level, NOT the Python logger. """ wrapper = TfliteMicro._load_wrapper() return wrapper.get_log_level()
[docs] @staticmethod def set_logger(logger: logging.Logger): """Set the wrapper's Python logger This logger will be invoked by the C++ wrapper's logging callback. """ TfliteMicro._logger = logger
[docs] @staticmethod def get_logger() -> logging.Logger: """Return the wrapper's Python logger""" # Just use the MLTK logger if no logger has been specified if TfliteMicro._logger is None: logger = get_mltk_logger() TfliteMicro._logger = logger return TfliteMicro._logger
[docs] @staticmethod def normalize_accelerator_name(accelerator:str) -> str: """Given a case-insensitive accelerator name, normalize the name to the format used by the C++ library Returns: Normalized name of accelerator or None if accelerator is unknown """ TfliteMicro._load_wrapper() if accelerator is None: return None return get_case_insensitive(accelerator, TfliteMicro._accelerators)
[docs] @staticmethod def get_supported_accelerators() -> List[str]: """Return a list of supported accelerators by name""" TfliteMicro._load_wrapper() return [x for x in TfliteMicro._accelerators]
[docs] @staticmethod def accelerator_is_supported(accelerator:str) -> bool: """Return if the given accelerator is supported""" TfliteMicro._load_wrapper() return get_case_insensitive(accelerator, TfliteMicro._accelerators) is not None
[docs] @staticmethod def load_tflite_model( model: Union[str, TfliteModel], accelerator:str=None, enable_profiler=False, enable_recorder=False, enable_tensor_recorder=False, force_buffer_overlap=False, runtime_buffer_size:int=None, **kwargs ) -> TfliteMicroModel: """Load the TF-Lite Micro interpreter with the given .tflite model NOTE: - Only 1 model may be loaded at a time - You must call unload_model() when the model is no longer needed """ wrapper = TfliteMicro._load_wrapper() if accelerator is not None: tflm_accelerator = TfliteMicro.get_accelerator(accelerator) if hasattr(tflm_accelerator, 'init_variant'): tflm_accelerator.init_variant() else: tflm_accelerator = None TfliteMicro._model_lock.acquire() try: tflite_model = _load_tflite_model(model) runtime_buffer_sizes = _retrieve_runtime_buffer_sizes( tflite_model, runtime_buffer_size=runtime_buffer_size, accelerator=accelerator ) tflm_model = TfliteMicroModel( tflm_wrapper=wrapper, tflm_accelerator=tflm_accelerator, flatbuffer_data=tflite_model.flatbuffer_data, enable_profiler=enable_profiler, enable_recorder=enable_recorder, enable_tensor_recorder=enable_tensor_recorder, force_buffer_overlap=force_buffer_overlap, runtime_buffer_sizes=runtime_buffer_sizes, ) except: # Release the model lock if an exception occurred while loading it TfliteMicro._model_lock.release() raise return tflm_model
[docs] @staticmethod def unload_model(model: TfliteMicroModel): """Unload a previously loaded model""" accelerator = model.accelerator if accelerator is not None: if hasattr(accelerator, 'deinit_variant'): accelerator.deinit_variant() # pylint: disable=protected-access if model._model_wrapper: model._model_wrapper.unload() del model TfliteMicro._model_lock.release()
[docs] @staticmethod def profile_model( model: Union[str, TfliteModel], accelerator:str=None, return_estimates=False, disable_simulator_backend=False, runtime_buffer_size=-1, # If runtime_buffer_size not given, determine the optimal memory size input_data: Union[np.ndarray,List[np.ndarray]]=None, **kwargs ): # -> ProfilingModelResults """Profile the given model in the simulator and optionally determine metric estimates """ from mltk.core.profiling_results import ProfilingModelResults, ProfilingLayerResult tflite_model = _load_tflite_model(model) tflm_model = TfliteMicro.load_tflite_model( model=tflite_model, accelerator=accelerator, enable_profiler=True, enable_recorder=True, runtime_buffer_size=runtime_buffer_size ) try: renable_simulator_backend = False disable_calculate_accelerator_cycles_only = False tflm_accelerator = tflm_model.accelerator if disable_simulator_backend and \ tflm_accelerator is not None and \ hasattr(tflm_accelerator, 'set_simulator_backend_enabled'): renable_simulator_backend = True tflm_accelerator.set_simulator_backend_enabled(False) if hasattr(tflm_accelerator, 'set_calculate_accelerator_cycles_only_enabled'): # For profiling, we only need the accelerator cycles # The simulator does not need to actually calculate valid output data # This greatly improves simulation latency disable_calculate_accelerator_cycles_only = True tflm_accelerator.set_calculate_accelerator_cycles_only_enabled(True) tflm_model_details = tflm_model.details if input_data is not None: if isinstance(input_data, list): for i, v in enumerate(input_data): tflm_model.input(index=i, value=v) else: tflm_model.input(value=input_data) else: for i in range(tflm_model.input_size): input_tensor = tflm_model.input(i) empty_tensor = np.zeros_like(input_tensor) tflm_model.input(i, value=empty_tensor) tflm_model.invoke() tflm_results = tflm_model.get_profiling_results() recorded_data = tflm_model.get_recorded_data() if renable_simulator_backend: tflm_accelerator.set_simulator_backend_enabled(True) if disable_calculate_accelerator_cycles_only: tflm_accelerator.set_calculate_accelerator_cycles_only_enabled(False) recorded_layers = recorded_data['layers'] layer_results = [] for layer_index, tflm_layer_result in enumerate(tflm_results): tflite_layer = tflite_model.layers[layer_index] layer_err = tflm_model.get_layer_error(layer_index) layer_err_msg = None if layer_err is None else layer_err.msg del tflm_layer_result['name'] layer_result = ProfilingLayerResult( tflite_layer=tflite_layer, error_msg=layer_err_msg, **tflm_layer_result ) layer_recorded_data = recorded_layers[layer_index] if layer_index < len(recorded_layers) else {} updated = True while updated: updated = False for key, value in layer_recorded_data.items(): if not isinstance(value, (int,float,str)): tflite_layer.metadata[key] = value layer_recorded_data.pop(key) updated = True break layer_result.update(layer_recorded_data) layer_results.append(layer_result) finally: TfliteMicro.unload_model(tflm_model) model_details = tflm_model.details _add_memory_plan( tflite_model=tflite_model, recorded_data=recorded_data, model_details=model_details ) results = ProfilingModelResults( model=tflite_model, accelerator=accelerator, runtime_memory_bytes=tflm_model_details.runtime_memory_size, layers=layer_results, model_details=model_details ) # If we want to return estimates for metrics like: # CPU cycles and energy if return_estimates: # If accelerator=none # then just use the MVP accelerator's 'none' (i.e. CMSIS-only) estimators if tflm_accelerator is None and 'mvp' in TfliteMicro._accelerators: tflm_accelerator = TfliteMicro._accelerators['mvp'] if tflm_accelerator is not None: tflm_accelerator.estimate_profiling_results( results=results, **kwargs ) return results
[docs] @staticmethod def record_model( model: Union[str, TfliteModel], input_data: Union[np.ndarray,List[np.ndarray]]=None, accelerator:str=None, enable_accelerator_recorder=False, disable_simulator_backend=False, enable_tensor_recorder=True, return_model_details=False, update_input_model=False, layer_callback:Callable[[TfliteLayer], bool]=None, ) -> Union[List[TfliteLayer], Tuple[List[TfliteLayer],TfliteMicroModelDetails]]: """Run one inference and record each model layer's input/output tensors Args: model: path to .tflite model file or TfliteModel instance input_data: Model input0 data as numpy array or list of numpy arrays for each model input accelerator: Optional accelerator to use for inference enable_accelerator_recorder: If enabled, record the data/instructions generated by the hardware accelerator The recorded data with be stored in each layers' metadata property, .e.g.: ``layer.metadata['accelerator_data']``. Each layers' recorded data is a dictionary with the entries specific to the hardware accelerator. disable_simulator_backend: Disable the simulator backend while running the accelerator recorder. This can greatly improve execution time, however, the generated data output (i.e. output tensors) is invalid enable_tensor_recorder: Record the input/output tensors of each layer return_model_details: Also return the recorded model's TfliteMicroModelDetails Return: If return_model_details=False, then return a list of TfliteLayers with the tensor data updated with the recorded values from the previous inference If return_model_details=True, then return a tuple(list(TfliteLayers), TfliteMicroModelDetails) """ if update_input_model and not isinstance(model, TfliteModel): raise ValueError('Input model must be a TfliteModel instance to use update_input_model=True') tflite_model = _load_tflite_model(model) tflm_model = TfliteMicro.load_tflite_model( model=tflite_model, accelerator=accelerator, enable_recorder=True, enable_tensor_recorder=enable_tensor_recorder, enable_profiler=False, runtime_buffer_size=16*1024*1024, # 16MB ) disable_program_recorder = False if enable_accelerator_recorder: if tflm_model.accelerator is None: raise ValueError('Must provide accelerator when using enable_accelerator_recorder') tflm_model.accelerator.set_program_recorder_enabled(True) disable_program_recorder = True reenable_simulator_backend = False reenable_simulator_accelerator_cycles = False if disable_simulator_backend: if tflm_model.accelerator is None: raise ValueError('Must provide accelerator when using disable_simulator_backend') if hasattr(tflm_model.accelerator, 'set_simulator_backend_enabled'): reenable_simulator_backend = True tflm_model.accelerator.set_simulator_backend_enabled(False) if hasattr(tflm_model.accelerator, 'set_calculate_accelerator_cycles_only_enabled'): tflm_model.accelerator.set_calculate_accelerator_cycles_only_enabled(True) if layer_callback: tflm_model.set_layer_callback(functools.partial( _layer_callback_handler, tflite_model=tflite_model, callback=layer_callback )) try: if input_data is not None: if isinstance(input_data, list): for i, v in enumerate(input_data): tflm_model.input(index=i, value=v) else: tflm_model.input(value=input_data) else: for i, inp in enumerate(tflite_model.inputs): d = np.zeros_like(inp.data) tflm_model.input(index=i, value=d) tflm_model.invoke() recorded_data = tflm_model.get_recorded_data() if reenable_simulator_backend: tflm_model.accelerator.set_simulator_backend_enabled(True) if reenable_simulator_accelerator_cycles: tflm_model.accelerator.set_calculate_accelerator_cycles_only_enabled(False) if disable_program_recorder: tflm_model.accelerator.set_program_recorder_enabled(False) retval = [] for layer_index, recorded_layer_data in enumerate(recorded_data.get('layers', [])): # pylint: disable=protected-access tf_layer = tflite_model.layers[layer_index] if update_input_model \ else copy.deepcopy(tflite_model.layers[layer_index]) retval.append(tf_layer) layer_err = tflm_model.get_layer_error(layer_index) tf_layer.metadata['error_msg'] = None if layer_err is None else layer_err.msg for input_index, input_bytes in enumerate(recorded_layer_data.get('inputs', [])): if input_index >= tf_layer.n_inputs: break input_tensor = tf_layer.inputs[input_index] if input_tensor is None: continue input_buf = np.frombuffer(input_bytes, dtype=input_tensor.dtype) if input_tensor.shape.flat_size > 0: tf_layer.inputs[input_index]._data = np.reshape(input_buf, newshape=input_tensor.shape) else: tf_layer.inputs[input_index]._data = input_buf for output_index, output_bytes in enumerate(recorded_layer_data.get('outputs', [])): output_tensor = tf_layer.outputs[output_index] output_buf = np.frombuffer(output_bytes, dtype=output_tensor.dtype) if output_tensor.shape.flat_size > 0: tf_layer.outputs[output_index]._data = np.reshape(output_buf, newshape=output_tensor.shape) else: tf_layer.outputs[output_index]._data = output_buf for key, value in recorded_layer_data.items(): if key not in ('inputs', 'outputs'): tf_layer.metadata[key] = value if return_model_details: model_details = tflm_model.details _add_memory_plan( tflite_model=tflite_model, recorded_data=recorded_data, model_details=model_details ) finally: TfliteMicro.unload_model(tflm_model) if return_model_details: return retval, model_details return retval
[docs] @staticmethod def add_accelerator_path(path:str): """Add an accelerator search path""" TfliteMicro._accelerator_paths.append(path)
[docs] @staticmethod def register_accelerator(accelerator:TfliteMicroAccelerator): """Register a TFLM accelerator instance""" try: acc_api_version = accelerator.api_version except Exception as e: # pylint:disable=raise-missing-from raise RuntimeError( f'Failed to load accelerator: {accelerator.name}, ' + \ f'failed to retrieve api version from wrapper, err: {e}') tflm_api_version = TfliteMicro.api_version() if tflm_api_version != acc_api_version: raise RuntimeError( f'Accelerator: {accelerator.name} not compatible, ' + \ f'accelerator API version ({acc_api_version}) != TFLM wrapper version ({tflm_api_version})' ) for variant in accelerator.variants: if TfliteMicro.accelerator_is_supported(variant): raise RuntimeError(f'Accelerator "{variant}" has already been registered') acc = copy.deepcopy(accelerator) acc.active_variant = variant TfliteMicro._accelerators[variant] = acc
[docs] @staticmethod def get_accelerator(name:str) -> TfliteMicroAccelerator: """Return an instance to the specified accelerator wrapper""" TfliteMicro._load_wrapper() norm_accelerator = TfliteMicro.normalize_accelerator_name(name) if norm_accelerator is None: raise ValueError(f'Unknown accelerator: {name}. Known accelerators are: {", ".join(TfliteMicro.get_supported_accelerators())}') return TfliteMicro._accelerators[norm_accelerator]
@staticmethod def _load_wrapper(): """Load the TFLM C++ wrapper and return a refernce to the loaded module""" if TfliteMicro._wrapper is not None: return TfliteMicro._wrapper # Add this wrapper directory to the env PATH # This way, the wrapper DLL can find additional DLLs as necessary wrapper_dir = os.path.dirname(os.path.abspath(__file__)) os.environ['PATH'] = wrapper_dir + os.pathsep + os.environ['PATH'] if hasattr(os, 'add_dll_directory'): os.add_dll_directory(wrapper_dir) # Import the TFLM C++ python wrapper # For more details, see: # <mltk root>/cpp/tflite_micro_wrapper try: TfliteMicro._wrapper = importlib.import_module('mltk.core.tflite_micro._tflite_micro_wrapper') except (ImportError, ModuleNotFoundError) as e: append_exception_msg(e, f'Failed to import the tflite_micro_wrapper C++ shared library.\n' \ 'If you built the MLTK from source then this could mean you need to re-build the mltk package (e.g. "pip install -e .").\n' \ 'If you\'re running from a pre-built MLTK package (e.g. "pip install silabs-mltk"),\n' \ f'ensure that the _tflite_micro_wrapper file exists at {wrapper_dir}.\n' \ 'If the file does not exist, try installing, e.g.: pip install silabs-mltk --force-reinstall\n\n' ) raise # Initialize the wrapper TfliteMicro._wrapper.init() # Set the callback that will be invoked by the C++ library # log messages TfliteMicro._wrapper.set_logger_callback(TfliteMicro._wrapper_logger_callback) TfliteMicro._load_accelerators() return TfliteMicro._wrapper @staticmethod def _load_accelerators(): """Load all the TFLM accelerators found in the search paths""" curdir = os.path.dirname(os.path.abspath(__file__)) search_paths = [] search_paths.extend(TfliteMicro._accelerator_paths) search_paths.extend(as_list(get_user_setting('accelerator_paths'))) search_paths.append(f'{curdir}/accelerators/mvp') # Check if any "<accelerator name>_mltk_accelerator.pth" files are found in the Python Libs directory python_libs_dir = sysconfig.get_path('purelib') if os.path.exists(python_libs_dir): for fn in os.listdir(python_libs_dir): if not fn.endswith('_mltk_accelerator.pth'): continue pth_path = f'{python_libs_dir}/{fn}' with open(pth_path, 'r') as f: accelerator_package_base_dir = f.readline().strip() accelerator_name = fn[:-len('_mltk_accelerator.pth')] accelerator_dir = f'{accelerator_package_base_dir}/{accelerator_name}' # If the file does exist, # then add its path to the accelerator search path if os.path.exists(accelerator_dir): search_paths.append(accelerator_dir) elif os.path.exists(f'{accelerator_dir}_wrapper'): search_paths.append(f'{accelerator_dir}_wrapper') for search_path in search_paths: search_path = fullpath(search_path) init_py_path = f'{search_path}/__init__.py' if not os.path.exists(init_py_path): continue TfliteMicro._load_accelerator(search_path) TfliteMicro.register_accelerator(PlaceholderTfliteMicroAccelerator('cmsis')) @staticmethod def _load_accelerator(accelerator_dir:str) -> bool: """Attempt to load an accelerator Python module in the given directory""" logger = TfliteMicro.get_logger() try: accelerator_module = import_module_at_path(accelerator_dir) except Exception as e: logger.debug(f'Failed to import {accelerator_dir}, err: {e}', exc_info=e) return False tflm_accelerator = None for key in dir(accelerator_module): value = getattr(accelerator_module, key) if inspect.isclass(value) and issubclass(value, TfliteMicroAccelerator): # Create an accelerator instance try: tflm_accelerator = value() break except Exception as e: logger.warning(f'Accelerator module: {accelerator_dir} failed to initialize, err: \n{e}') return False if tflm_accelerator is None: logger.debug(f'Accelerator module: {accelerator_dir} does not contain a TfliteMicroAccelerator class definition') return False try: TfliteMicro.register_accelerator(tflm_accelerator) except Exception as e: logger.warning(f'Failed to register accelerator: {accelerator_dir}, err: {e}') return False return True @staticmethod def _clear_logged_errors(): """Clear errors generated by C++ wrapper. This is used internally by the wrapper""" TfliteMicro._load_wrapper() TfliteMicro._logged_errors.clear() @staticmethod def _get_logged_errors() -> List[str]: """Return errors generated by C++ wrapper as a list. This is used internally by the wrapper""" TfliteMicro._load_wrapper() return TfliteMicro._logged_errors @staticmethod def _get_logged_errors_str() -> str: """Return errors generated by C++ wrapper as a string. This is used internally by the wrapper""" return "\n".join(TfliteMicro._get_logged_errors()) @staticmethod def _wrapper_logger_callback(msg: str): """ This callback will be invoked by the TFLM C++ wrapper when it internally issues a log msg""" l = TfliteMicro.get_logger() if l is None: return errs = TfliteMicro._logged_errors level = msg[:2].strip() msg = msg[2:].strip() if level == 'D': l.debug(msg) elif level == 'I': l.info(msg) elif level == 'W': l.warning(msg) errs.append(msg) elif level == 'E': l.error(msg) errs.append(msg)
def _load_tflite_model(model:Union[str,TfliteModel]) -> TfliteModel: if isinstance(model, TfliteModel): return model elif isinstance(model, str): if not model.endswith('.tflite') or not os.path.exists(model): raise ValueError('Provided model must be a path to an existing .tflite file') return TfliteModel.load_flatbuffer_file(model) else: raise RuntimeError('Must provide TfliteModel or path to .tflite file') def _add_memory_plan( tflite_model:TfliteModel, recorded_data:List, model_details:TfliteMicroModelDetails ): from .tflite_micro_memory_plan import TfliteMicroMemoryPlan recorded_data_layers = recorded_data.get('layers', []) model_details._memory_plan = TfliteMicroMemoryPlan.create( # pylint: disable=protected-access memory_plan=recorded_data.get('memory_plan', []), tflite_model=tflite_model, total_persistent_runtime_size=recorded_data.get('total_persistent_runtime_size', 0), temp_runtime_sizes=list(x.get('temp_memory_used', 0) for x in recorded_data_layers), persistent_runtime_sizes=list(x.get('persistent_memory_used', 0) for x in recorded_data_layers) ) def _layer_callback_handler( tflite_model:TfliteModel, callback:Callable[[TfliteLayer],bool], index:int, outputs:List[bytes] ) -> bool: tf_layer = copy.deepcopy(tflite_model.layers[index]) for output_index, output_bytes in enumerate(outputs): output_tensor = tf_layer.outputs[output_index] output_buf = np.frombuffer(output_bytes, dtype=output_tensor.dtype) if output_tensor.shape.flat_size > 0: tf_layer.outputs[output_index]._data = np.reshape(output_buf, newshape=output_tensor.shape) # pylint: disable=protected-access else: tf_layer.outputs[output_index]._data = output_buf # pylint: disable=protected-access return callback(tf_layer) def _retrieve_runtime_buffer_sizes( tflite_model:TfliteModel, accelerator:str, runtime_buffer_size:int, ) -> List[int]: from mltk.core.tflite_model_parameters import TfliteModelParameters runtime_buffer_sizes = [0] try: memory_spec = TfliteModelParameters.load_from_tflite_model( tflite_model, tag=f'{accelerator}_memory_spec' ) except: memory_spec = {} if memory_spec: runtime_buffer_sizes = memory_spec.get('sizes', [0]) if runtime_buffer_size: runtime_buffer_sizes[0] = runtime_buffer_size return runtime_buffer_sizes