from typing import List, Union, Tuple
import os
from collections import OrderedDict, defaultdict
import json
import functools
import io
import csv
from prettytable import PrettyTable
from mltk.utils.string_formatting import format_units as format_units_func
from .tflite_model import (TfliteLayer, TfliteModel, TfliteOpCode)
[docs]class ProfilingLayerResult(defaultdict):
"""Profiling results for an individual layer of a model"""
BASIC_KEYS = ['tflite_layer', 'ops', 'macs', 'cpu_cycles', 'accelerator_cycles', 'time', 'energy', 'error_msg']
[docs] def __init__(
self,
tflite_layer:TfliteLayer,
ops:int = 0,
macs:int = 0,
cpu_cycles:int = 0,
accelerator_cycles:int = 0,
time:float = 0.0,
energy:float = 0.0,
error_msg:str=None,
**kwargs
):
defaultdict.__init__(self, lambda: 0, **kwargs)
self['tflite_layer'] = tflite_layer
self['ops'] = ops
self['macs'] = macs
self['cpu_cycles'] = cpu_cycles
self['accelerator_cycles'] = accelerator_cycles
self['time'] = time
self['energy'] = energy
self['error_msg'] = error_msg
@property
def is_accelerated(self) -> bool:
"""Return true if this layer was executed on the accelerator"""
return self.accelerator_cycles > 0
@property
def is_unsupported(self) -> bool:
"""Return true if this layer should have been accelerated but exceeds the limits of the accelerator"""
return self.error_msg is not None
@property
def error_msg(self) -> str:
"""Error message generated by accelerator if layer was not supported"""
return self['error_msg']
@property
def tflite_layer(self) -> TfliteLayer:
"""Associated TF-Lite layer"""
return self['tflite_layer']
@property
def index(self) -> int:
"""Index of this layer in the model"""
return self.tflite_layer.index
@property
def name(self) -> str:
"""Name of current layer as: Op<index>-<OpCodeStr>"""
return self.tflite_layer.name
@property
def opcode_str(self) -> str:
"""OpCode as a string"""
return self.tflite_layer.opcode_str
@property
def opcode(self) -> TfliteOpCode:
"""OpCode"""
return self.tflite_layer.opcode
@property
def macs(self) -> int:
"""Number of Multiple-Accumulate operations required by this layer"""
return self['macs']
@property
def ops(self) -> int:
"""Number of operations required by this layer"""
return self['ops']
@property
def accelerator_cycles(self) -> int:
"""Number of accelerator clock cycles required by this layer"""
return self['accelerator_cycles']
@property
def cpu_cycles(self) -> int:
"""Number of CPU clock cycles required by this layer"""
return self['cpu_cycles']
@property
def time(self) -> float:
"""Time in seconds required by this layer"""
return self['time']
@property
def energy(self) -> float:
"""Energy in Joules required by this layer
The energy is relative to the 'baseline' energy (i.e. energy used while the device was idling)
"""
return self['energy']
@property
def options_str(self) -> str:
"""Layer configuration options as a string"""
return f'{self.tflite_layer.options}'
@property
def input_shape_str(self) -> str:
"""Layer input shape(s) as a string"""
return ','.join([f'{x.shape}' for x in self.tflite_layer.inputs])
@property
def input_dtype_str(self) -> str:
"""Layer input data type(s) as a string"""
return ','.join([x.dtype_str for x in self.tflite_layer.inputs])
@property
def output_shape_str(self) -> str:
"""Layer output shape(s) as a string"""
return ','.join([f'{x.shape}' for x in self.tflite_layer.outputs])
@property
def output_dtype_str(self) -> str:
"""Layer output data type(s) as a string"""
return ','.join([f'{x.dtype_str}' for x in self.tflite_layer.outputs])
[docs] def get_summary(
self,
include_labels=False,
format_units=False,
excluded_columns:List[str]=None,
full_summary=False
) -> dict:
"""Return a summary of the layer profiling results as a dictionary"""
excluded_columns = excluded_columns or []
unit_formatter = functools.partial(format_units_func, add_space=False, precision=1) if format_units else lambda x: x
summary = OrderedDict()
summary['index'] = ('Index', self.index)
summary['opcode'] = ('OpCode', self.opcode_str)
summary['ops'] = ('# Ops', unit_formatter(self.ops))
summary['macs'] = ('# MACs', unit_formatter(self.macs))
summary['accelerator_cycles'] = ('Acc Cycles', unit_formatter(self.accelerator_cycles))
if full_summary:
for key, value in self.items():
if key not in self.BASIC_KEYS:
label = key.replace('_', ' ').title()
summary[key] = (label, value)
summary['cpu_cycles'] = ('CPU Cycles', unit_formatter(self.cpu_cycles))
summary['energy'] = ('Energy (J)', unit_formatter(self.energy))
summary['time'] = ('Time (s)', unit_formatter(self.time))
summary['input_shape'] = ('Input Shape', self.input_shape_str)
summary['output_shape'] = ('Output Shape', self.output_shape_str)
summary['options'] = ('Options', self.options_str)
summary['supported'] = ('Supported?', not self.is_unsupported)
summary['error_msg'] = ('Error Msg', self.error_msg if self.error_msg or not format_units else '')
for col in excluded_columns:
del summary[col]
if not include_labels:
for key, value in summary.items():
summary[key] = value[1]
return summary
[docs]class ProfilingModelResults:
"""Results from profiling model for specific accelerator"""
[docs] def __init__(
self,
model:TfliteModel,
accelerator:str=None,
platform:str=None,
cpu_clock_rate:int = 0,
runtime_memory_bytes:int = 0,
layers: List[ProfilingLayerResult] = None,
is_simulated = True,
):
self._model:TfliteModel = model
self._model_name:str = None
self._accelerator:str = accelerator
self._platform:str = platform
self._cpu_clock_rate:int = cpu_clock_rate
self._runtime_memory_bytes:int = runtime_memory_bytes
self._layers: List[ProfilingLayerResult] = layers or []
self._is_simulated = is_simulated
@property
def name(self) -> str:
"""Name of the profiled model"""
return self._model_name
@property
def tflite_model(self) -> TfliteModel:
"""Associated TfliteModel"""
return self._model
@property
def accelerator(self) -> str:
"""Name of accelerator used for profiling"""
return self._accelerator
@property
def platform(self) -> str:
"""Platform model was profiled on"""
return self._platform
@property
def is_simulated(self) -> bool:
"""True if the simulator was used to generate the results, else False if an embedded device was used"""
return self._is_simulated
@property
def cpu_clock_rate(self) -> int:
"""Clock rate in hertz"""
return self._cpu_clock_rate
@property
def runtime_memory_bytes(self) -> int:
"""Total SRAM in bytes required by ML library
NOTE: This only include the ML run-time memory, it does NOT
include the memory required by the user application or external
pre-processing libraries (e.g. DSP)
"""
return self._runtime_memory_bytes
@property
def flatbuffer_size(self) -> int:
"""Total size in bytes required by ML model
This is the size the .tflite flatbuffer file
"""
return self._model.flatbuffer_size
@property
def layers(self) -> List[ProfilingLayerResult]:
"""Profiling details of each model layer"""
return self._layers
@property
def n_layers(self) -> int:
"""Number of layers in model"""
return len(self._layers)
@property
def input_shape_str(self) -> str:
"""Model input shape(s) as a string"""
return ','.join([f'{x.shape}' for x in self._model.inputs])
@property
def input_dtype_str(self) -> str:
"""Model input data type(s) as a string"""
return ','.join([x.dtype_str for x in self._model.inputs])
@property
def output_shape_str(self) -> str:
"""Model output shape(s) as a string"""
return ','.join([f'{x.shape}' for x in self._model.outputs])
@property
def output_dtype_str(self) -> str:
"""Model output data type(s) as a string"""
return ','.join([f'{x.dtype_str}' for x in self._model.outputs])
@property
def ops(self) -> int:
"""The total number of ops to execute one model inference"""
return sum([x.ops for x in self.layers])
@property
def macs(self) -> int:
"""The total number of multiply-accumulate operations to execute one model inference"""
return sum([x.macs for x in self.layers])
@property
def accelerator_cycles(self) -> int:
"""The total number of accelerator cycles to execute one model inference"""
return sum([x.accelerator_cycles for x in self.layers])
@property
def cpu_cycles(self) -> int:
"""The total number of CPU cycles to execute one model inference"""
return sum([x.cpu_cycles for x in self.layers])
@property
def time(self) -> float:
"""The total time in seconds required to execute one model inference"""
return sum([x.time for x in self.layers])
@property
def energy(self) -> float:
"""The total energy required to execute one model inference"""
return sum([x.energy for x in self.layers])
@property
def cpu_utilization(self) -> float:
"""Percentage of the CPU used to execute the model"""
total_time = self.time
used_cpu_cycles = self.cpu_cycles
clock_rate = self._cpu_clock_rate
all_cycles = clock_rate*total_time
return 0.0 if all_cycles == 0 else min(used_cpu_cycles / all_cycles, 1.0)
@property
def n_unsupported_layers(self) -> int:
"""The number of layers not supported by the accelerator"""
return len(self.unsupported_layers)
@property
def unsupported_layers(self) -> List[ProfilingLayerResult]:
"""Return layers not supported by accelerator"""
return [x for x in self.layers if x.is_unsupported]
[docs] def stat_total(self, name:str) -> Union[float,int]:
"""Return the total sum of the all the layers for the given stat"""
return sum([x[name] for x in self.layers])
[docs] def get_summary(
self,
include_labels=False,
format_units=False,
exclude_null=True,
full_summary=False
) -> dict:
"""Return a summary of the profiling results as a dictionary"""
unit_formatter = functools.partial(format_units_func, add_space=False, precision=1) if format_units else lambda x: x
summary = OrderedDict(
name = ('Name', self.name),
accelerator = ('Accelerator', self.accelerator or 'None'),
input_shape = ('Input Shape', self.input_shape_str),
input_dtype = ('Input Data Type', self.input_dtype_str),
output_shape = ('Output Shape', self.output_shape_str),
output_dtype = ('Output Data Type', self.output_dtype_str),
tflite_size = ('Flash, Model File Size (bytes)', unit_formatter(self.flatbuffer_size)),
runtime_memory_size = ('RAM, Runtime Memory Size (bytes)', unit_formatter(self.runtime_memory_bytes)),
ops = ('Operation Count', unit_formatter(self.ops)),
macs = ('Multiply-Accumulate Count', unit_formatter(self.macs)),
n_layers = ('Layer Count', self.n_layers),
n_unsupported_layers= ('Unsupported Layer Count', self.n_unsupported_layers)
)
if self.accelerator_cycles > 0 or not exclude_null:
summary['accelerator_cycles'] = ('Accelerator Cycle Count', unit_formatter(self.accelerator_cycles))
if self.cpu_cycles > 0 or not exclude_null:
summary['cpu_cycles'] = ('CPU Cycle Count', unit_formatter(self.cpu_cycles))
if self.cpu_cycles > 0 or not exclude_null:
formatted_percentage = self.cpu_utilization*100
if format_units:
formatted_percentage = f'{formatted_percentage:.1f}'
summary['cpu_utilization'] = ('CPU Utilization (%)', formatted_percentage)
if self.cpu_clock_rate > 0 or not exclude_null:
summary['cpu_clock_rate'] = ('Clock Rate (hz)', unit_formatter(self.cpu_clock_rate))
if self.time > 0 or not exclude_null:
summary['time'] = ('Time (s)', unit_formatter(self.time))
if self.energy > 0 or not exclude_null:
summary['energy'] = ('Energy (J)', unit_formatter(self.energy))
if self.energy > 0 or not exclude_null:
j_per_op = 0 if self.ops == 0 else self.energy/self.ops
j_per_mac = 0 if self.macs == 0 else self.energy/self.macs
summary['j_per_op'] = ('J/Op', unit_formatter(j_per_op))
summary['j_per_mac'] = ('J/MAC', unit_formatter(j_per_mac))
if self.time > 0 or not exclude_null:
op_per_s = 0 if self.time == 0 else self.ops/self.time
mac_per_s = 0 if self.time == 0 else self.macs/self.time
inf_per_sec = 0 if self.time == 0 else 1/self.time
summary['op_per_s'] = ('Ops/s', unit_formatter(op_per_s))
summary['mac_per_s'] = ('MACs/s', unit_formatter(mac_per_s))
if format_units:
inf_per_sec = f'{inf_per_sec:.1f}'
summary['inf_per_sec'] = ('Inference/s', inf_per_sec)
if full_summary:
all_layer_numeric_keys = []
for layer in self.layers:
for key in layer:
if key not in all_layer_numeric_keys and \
not isinstance(layer[key], bool) and \
isinstance(layer[key], (int,float)):
all_layer_numeric_keys.append(key)
for layer in self.layers:
for key in all_layer_numeric_keys:
if key not in layer:
layer[key] = 0
for name in all_layer_numeric_keys:
if name not in ProfilingLayerResult.BASIC_KEYS:
total = self.stat_total(name)
if total == 0 and exclude_null:
continue
summary[name] = (name.replace('_', ' ').title(), unit_formatter(total))
if not include_labels:
for key, value in summary.items():
summary[key] = value[1]
return summary
[docs] def generate_report(
self,
output_dir:str,
format_units:bool,
full_summary=False
):
"""Generate a profiling report in the given directory"""
os.makedirs(output_dir, exist_ok=True)
acc = self.accelerator or 'none'
simulated = 'simulated' if self.is_simulated else 'embedded'
base_path = f'{output_dir}/profiling_report-{acc.lower()}-{simulated}'
txt_path = f'{base_path}.txt'
json_path = f'{base_path}.json'
with open(txt_path, 'w') as f:
f.write(self.to_string(format_units=format_units, full_summary=full_summary))
with open(json_path, 'w') as f:
f.write(self.to_json(format_units=format_units, full_summary=full_summary))
[docs] def to_dict(
self,
format_units=False,
exclude_null=True,
full_summary=False
) -> dict:
"""Return profiling results as dictionary
Arguments
format_units: Format number values to a string with associated units, e.g. 0.0234 -> 23.4m
exclude_null: Exclude columns with all number values (e.g. don't include energy if not energy numbers were provided)
full_summary: Return all profiled stats. If this this false, then only the basic stats are returned
"""
summary = self.get_summary(
include_labels=True,
format_units=format_units,
exclude_null=exclude_null,
full_summary=full_summary
)
summary_labels = {}
for key, value in summary.items():
summary_labels[key] = value[0]
summary[key] = value[1]
excluded_columns = []
if exclude_null:
if self.accelerator_cycles == 0:
excluded_columns.append('accelerator_cycles')
if self.cpu_cycles == 0:
excluded_columns.append('cpu_cycles')
if self.energy == 0:
excluded_columns.append('energy')
if self.time == 0:
excluded_columns.append('time')
if self.n_unsupported_layers == 0:
excluded_columns.append('error_msg')
excluded_columns.append('supported')
layer_labels = {}
for layer in self.layers:
for key, e in layer.get_summary(
include_labels=True,
excluded_columns=excluded_columns,
full_summary=full_summary
).items():
if key not in layer_labels:
layer_labels[key] = e[0]
layers = []
for layer in self.layers:
layer_summary = layer.get_summary(
include_labels=False,
format_units=format_units,
excluded_columns=excluded_columns,
full_summary=full_summary,
)
for key in layer_labels:
if key not in layer_summary:
layer_summary[key] = None
layers.append(layer_summary)
return dict(
summary=summary,
summary_labels=summary_labels,
layers=layers,
layer_labels=layer_labels
)
[docs] def to_json(
self,
indent=2,
format_units=False,
exclude_null=True,
full_summary=False
) -> str:
"""Return profiling results as JSON string
JSON Format:
.. code-block:: text
{
"summary": { key/value summary of profiling },
"summary_labels": { key/value of printable labeles for each summary field }
"layers": [ {<model layer results>}, ... ]
"layers_labels": { key/value of printable labeles for each layer field }
}
Where the "summary" member contains:
.. code-block:: text
"summary": {
"name" : "<Name of model>",
"accelerator" : "<Accelerator used>",
"input_shape" : "<Model input shapes>",
"input_dtype" : "<Model input data types>",
"output_shape" : "<Model output shapes>",
"output_dtype" : "<Model output data types>",
"tflite_size" : <.tflite file size>,
"runtime_memory_size" : <Estimated TFLM arena size>,
"ops" : <Total # operations>,
"macs" : <Total # multiply-accumulate ops>,
"accelerator_cycles" : <Total # accelerator cycles>,
"cpu_cycles" : <Total estimated CPU cycles>,
"cpu_utilization" : <Percentage of CPU required to run an inference>,
"cpu_clock_rate" : <CPU clock rate hz>,
"energy" : <Total estimated energy in Joules>,
"time" : <Total estimated inference time>,
"n_layers" : <# of layers in model>,
"n_unsupported_layers": <# layers unsupported by accelerator>,
"j_per_op" : <Joules per operation>,
"j_per_mac" : <Joules per multiply-accumulate>,
"op_per_s" : <Operations per second>,
"mac_per_s" : <Multiply-accumulates per second>,
"inf_per_s" : <Inference per second>
}
Where the "layers" member contains:
.. code-block:: text
"layers": [ {
"index" : <layer index>,
"opcode" : "<kernel opcode>",
"options" : "<layer options>",
"ops" : <# operations>,
"macs" : <# of multiple-accumulate operations>,
"accelerator_cycles" : <# accelerator cycles>,
"cpu_cycles" : <estimated CPU cycles>,
"energy" : <estimated energy in Joules>,
"time" : <estimated layer execution time>,
"supported" : <true/false>,
"err_msg" : "<error msg if not supported by accelerator>"
},
...
]
Arguments
indent: Amount of indentation to use in JSON formatting
format_units: Format number values to a string with associated units, e.g. 0.0234 -> 23.4m
exclude_null: Exclude columns with all number values (e.g. don't include energy if not energy numbers were provided)
full_summary: Return all profiled stats. If this this false, then only the basic stats are returned
Returns
JSON formated string
"""
return json.dumps(
self.to_dict(
format_units=format_units,
exclude_null=exclude_null,
full_summary=full_summary
),
indent=indent
)
[docs] def to_csv(
self,
format_units=False,
exclude_null=True,
full_summary=False,
include_header=True,
dialect:Union[str,csv.Dialect]='excel',
) -> Tuple[str,str]:
"""Return profiling results as CSV string
This returns a tuple of two CSV formatted strings.
The first string contains the profiling summary.
The second string contain the profiling results for the individual layers.
Arguments:
format_units: Format number values to a string with associated units, e.g. 0.0234 -> 23.4m
exclude_null: Exclude columns with all number values (e.g. don't include energy if not energy numbers were provided)
full_summary: Return all profiled stats. If this this false, then only the basic stats are returned
include_header: Include the header row. If false then only the results are returned with no labels in the first row.
dialect: CSV dialect Default is excel. See https://docs.python.org/3/library/csv.html for more details.
"""
results_dict = self.to_dict(
format_units=format_units,
exclude_null=exclude_null,
full_summary=full_summary
)
summary = results_dict['summary']
summary_labels = results_dict['summary_labels']
layers = results_dict['layers']
layer_labels = results_dict['layer_labels']
summary_buf = io.StringIO()
summary_writer = csv.writer(summary_buf, dialect=dialect)
if include_header:
summary_writer.writerow(summary_labels)
summary_writer.writerow(summary.values())
layers_buf = io.StringIO()
layers_writer = csv.writer(layers_buf, dialect=dialect)
if include_header:
layers_writer.writerow(layer_labels)
for layer in layers:
layers_writer.writerow(layer.values())
return summary_buf.getvalue(), layers_buf.getvalue()
[docs] def to_string(
self,
format_units=True,
exclude_null=True,
full_summary=False
) -> str:
"""Return the profiling results as a string
Arguments
format_units: Format number values to a string with associated units, e.g. 0.0234 -> 23.4m
exclude_null: Exclude columns with all number values (e.g. don't include energy if not energy numbers were provided)
full_summary: Return all profiled stats. If this this false, then only the basic stats are returned
"""
results = self.to_dict(
format_units,
exclude_null=exclude_null,
full_summary=full_summary
)
summary = results['summary']
summary_labels = results['summary_labels']
layers = results['layers']
layer_labels = results['layer_labels']
s = 'Profiling Summary\n'
for key in summary:
s += f'{summary_labels[key]}: {summary[key]}\n'
s += '\nModel Layers\n'
x = PrettyTable(layer_labels.values())
for layer in layers:
x.add_row([x for x in layer.values()])
x.align = 'l'
s += x.get_string()
return s
def __str__(self):
return self.to_string()