from typing import List, Tuple
import logging
import os
import json
import numpy as np
import tensorflow as tf
import sklearn
import matplotlib.pyplot as plt
from mltk.utils import gpu
from mltk.utils.python import prepend_exception_msg
from .model import (
MltkModel,
KerasModel,
TrainMixin,
DatasetMixin,
EvaluateAutoEncoderMixin,
load_tflite_or_keras_model
)
from .utils import get_mltk_logger
from .summarize_model import summarize_model
from .evaluation_results import EvaluationResults
[docs]class AutoEncoderEvaluationResults(EvaluationResults):
"""Auto-encoder evaluation results
.. seealso::
- :py:func:`~evaluate_autoencoder`
- :py:func:`mltk.core.evaluate_model`
"""
[docs] def __init__(self, *args, **kwargs):
EvaluationResults.__init__(self, *args, model_type='auto_encoder', **kwargs)
@property
def classes(self) -> List[str]:
"""List of class labels used by evaluated model"""
return self['classes']
@property
def overall_accuracy(self) -> float:
"""The overall, model accuracy"""
return self['overall_accuracy']
@property
def overall_precision(self) -> List[float]:
"""The overall, model precision as various thresholds"""
return self['overall_precision']
@property
def overall_recall(self) -> List[float]:
"""The overall, model recall at various thresholds"""
return self['overall_recall']
@property
def overall_pr_accuracy(self) -> float:
"""The overall, precision vs recall"""
return self['overall_pr_accuracy']
@property
def overall_tpr(self) -> List[float]:
"""The overall, true positive rate at various thresholds"""
return self['overall_tpr']
@property
def overall_fpr(self) -> List[float]:
"""The overall, false positive rate at various thresholds"""
return self['overall_fpr']
@property
def overall_roc_auc(self) -> List[float]:
"""The overall, area under curve of the receiver operating characteristic"""
return self['overall_roc_auc']
@property
def overall_thresholds(self) -> List[float]:
"""List of thresholds used to calcuate overall stats"""
return self['overall_thresholds']
@property
def class_stats(self) -> dict:
"""Dictionary of per class statistics"""
return self['class_stats']
[docs] def calculate(
self,
y:np.ndarray,
y_pred:np.ndarray,
all_scores: np.ndarray,
thresholds: List[float] = None
):
"""Calculate the evaluation results
Given the list of expected values and corresponding predicted values with scores,
calculate the evaluation metrics.
Args:
y: 1D array of expected class ids
y_pred: 1D array of scoring results, e.g. y_pred[i] = scoring_function(x[i], y[i])
all_scores: 2D [n_samples, n_classes] of scores comparing the input vs auto-encoder generated out for each class type (normal, and all abnormal cases)
thresholds: Optional, list of thresholds to use for calculating the TPR, FPR and AUC
"""
if thresholds is None:
thresholds = [x for x in np.amin(y_pred) + np.arange(0.0, 1.01, .01)*(np.amax(y_pred)-np.amin(y_pred))]
self['all_scores'] = all_scores
self['thresholds'] = thresholds
self['overall_accuracy'] = calculate_overall_accuracy(y_pred, y)
self['overall_precision'], self['overall_recall'], self['overall_pr_accuracy'] = calculate_overall_pr_accuracy(thresholds, y_pred, y)
self['overall_tpr'], self['overall_fpr'], self['overall_roc_auc'] = calculate_overall_roc_auc(thresholds, y_pred, y)
self['class_stats'] = calculate_class_stats(all_scores, self['classes'])
[docs] def generate_summary(self) -> str:
"""Generate and return a summary of the results as a string"""
s = super().generate_summary(include_all=False)
return s + '\n' + summarize_results(self)
[docs] def generate_plots(
self,
show=True,
output_dir:str=None,
logger: logging.Logger=None
):
"""Generate plots of the evaluation results
Args:
show: Display the generated plots
output_dir: Generate the plots at the specified directory. If omitted, generated in the model's logging directory
logger: Optional logger
"""
plot_results(
self,
logger=logger,
output_dir=output_dir,
show=show
)
[docs]def evaluate_autoencoder(
mltk_model:MltkModel,
tflite:bool=False,
weights:str=None,
max_samples_per_class:int=-1,
classes:List[str]=None,
dump: bool=False,
verbose: bool=None,
show: bool=False,
callbacks:list=None,
update_archive:bool=True
) -> AutoEncoderEvaluationResults:
"""Evaluate a trained auto-encoder model
Args:
mltk_model: MltkModel instance
tflite: If true then evalute the .tflite (i.e. quantized) model, otherwise evaluate the keras model
weights: Optional weights to load before evaluating (only valid for a keras model)
max_samples_per_class: Maximum number of samples per class to evaluate. This is useful for large datasets
classes: Specific classes to evaluate, if omitted, use the one defined in the given MltkModel, i.e. model specification
dump: If true, dump the model output of each sample with a side-by-side comparsion to the input sample
verbose: Enable verbose log messages
show: Show the evaluation results diagrams
callbacks: Optional callbacks to invoke while evaluating
update_archive: Update the model archive with the eval results
Returns:
Dictionary containing evaluation results
"""
if not isinstance(mltk_model, TrainMixin):
raise Exception('MltkModel must inherit TrainMixin')
if not isinstance(mltk_model, EvaluateAutoEncoderMixin):
raise Exception('MltkModel must inherit EvaluateAutoEncoderMixin')
if not isinstance(mltk_model, DatasetMixin):
raise Exception('MltkModel must inherit a DatasetMixin')
subdir = 'eval/tflite' if tflite else 'eval/h5'
eval_dir = mltk_model.create_log_dir(subdir, delete_existing=True)
dump_dir = mltk_model.create_log_dir(f'{subdir}/dumps')
logger = mltk_model.create_logger('eval', parent=get_mltk_logger())
gpu.initialize(logger=logger)
if update_archive:
update_archive = mltk_model.check_archive_file_is_writable()
scoring_function = mltk_model.get_scoring_function()
classes = classes or mltk_model.eval_classes
# Build the MLTK model's corresponding as a Keras model or .tflite
try:
built_model = load_tflite_or_keras_model(
mltk_model,
model_type='tflite' if tflite else 'h5',
weights=weights
)
except Exception as e:
prepend_exception_msg(e, 'Failed to build model')
raise
try:
summary = summarize_model(
mltk_model,
built_model=built_model
)
logger.info(summary)
except Exception as e:
logger.debug(f'Failed to generate model summary, err: {e}', exc_info=e)
logger.warning(f'Failed to generate model summary, err: {e}')
logger.info('Evaluating auto-encoder model ...')
all_scores = []
for class_label in classes:
logger.info(f'Loading dataset for class: {class_label}')
try:
mltk_model.load_dataset(
subset='evaluation',
max_samples_per_class=max_samples_per_class,
classes=[class_label],
logger=logger,
test=mltk_model.test_mode_enabled
)
except Exception as e:
prepend_exception_msg(e, 'Failed to load model evaluation dataset' )
raise
eval_data = _retrieve_data(mltk_model.x)
logger.info(f'Generating model predictions for {class_label} class ...')
if isinstance(built_model, KerasModel):
y_pred = built_model.predict(
x = eval_data,
callbacks=callbacks,
verbose=1 if verbose else 0,
)
else:
y_pred = built_model.predict(x = eval_data, y_dtype=np.float32)
# loop over all original images and their corresponding reconstructions
class_scores = np.empty((len(eval_data),), dtype=np.float32)
dump_count = 0
for i, (orig, decoded) in enumerate(zip(eval_data, y_pred)):
try:
class_scores[i] = scoring_function(orig, decoded)
except Exception as e:
prepend_exception_msg(e, 'Error executing scoring function')
raise
if dump and dump_count < 200: # Don't dump more than 200 samples
dump_count += 1
dump_path = f'{dump_dir}/{class_label}/{i}.png'
_save_decoded_image(dump_path, orig, decoded, class_scores[i])
all_scores.append(class_scores)
mltk_model.unload_dataset()
if dump:
logger.info(f'Decoded comparisons available at {dump_dir}')
normal_pred = all_scores[0]
for i in range(1, len(all_scores)):
abnormal_scores = all_scores[i]
y_pred = np.append(normal_pred, abnormal_scores)
y_true = np.append(np.zeros_like(normal_pred), np.ones_like(abnormal_scores))
results = AutoEncoderEvaluationResults(
name= mltk_model.name,
classes=classes,
)
results.calculate(
y = y_true,
y_pred = y_pred,
all_scores = all_scores
)
summarized_results = results.generate_summary()
eval_results_path = f'{eval_dir}/eval-results.json'
with open(eval_results_path, 'w') as f:
json.dump(results, f, default=_encode_ndarray)
logger.debug(f'Generated {eval_results_path}')
summary_path = f'{eval_dir}/summary.txt'
with open(summary_path, 'w') as f:
f.write(summarized_results)
logger.debug(f'Generated {summary_path}')
results.generate_plots(
logger=logger,
output_dir=eval_dir,
show=show
)
if update_archive:
try:
logger.info(f'Updating {mltk_model.archive_path}')
mltk_model.add_archive_dir(subdir)
except Exception as e:
logger.warning(f'Failed to add eval results to model archive, err: {e}', exc_info=e)
logger.close() # close the eval logger
if show:
plt.show(block=True)
return results
def summarize_results(results: AutoEncoderEvaluationResults) -> str:
"""Generate a summary of the evaluation results"""
s = ''
s += 'Overall accuracy: {:.3f}%\n'.format(results['overall_accuracy'] * 100)
s += 'Precision/recall accuracy: {:.3f}%\n'.format(results['overall_pr_accuracy'] * 100)
s += 'Overall ROC AUC: {:.3f}%\n'.format(results['overall_roc_auc'] * 100)
if len(results['class_stats']) > 1:
s += 'Individual class ROC AUC:\n'
for class_label, stats in results['class_stats'].items():
s += ' {}: {:.3f}%\n'.format(class_label, stats['auc'] * 100)
return s.strip()
def plot_results(results:AutoEncoderEvaluationResults, show=False, output_dir:str=None, logger: logging.Logger=None):
"""Use Matlibplot to generate plots of the evaluation results"""
plot_overall_roc(results, output_dir=output_dir, show=show, logger=logger)
plot_overall_precision_vs_recall(results, output_dir=output_dir, show=show, logger=logger)
plot_histogram(results, output_dir=output_dir, show=show, logger=logger)
plot_class_roc(results, output_dir=output_dir, show=show, logger=logger)
if show:
plt.show(block=True)
def calculate_overall_accuracy(y_pred, y_true) -> float:
""" Classifier overall accuracy calculation
y_pred contains the outputs of the network for the validation data
y_true are the correct answers (0.0 for normal, 1.0 for anomaly)
"""
thresholds = np.amin(y_pred) + np.arange(0.0, 1.0, .01)*(np.amax(y_pred)-np.amin(y_pred))
accuracy = 0.0
for threshold in thresholds:
y_pred_binary = (y_pred > threshold).astype(int)
correct = np.sum(y_pred_binary == y_true)
accuracy_tmp = correct / len(y_pred_binary)
if accuracy_tmp > accuracy:
accuracy = accuracy_tmp
return accuracy
def calculate_overall_pr_accuracy(thresholds, y_pred, y_true) -> Tuple[List[float], List[float], float]:
"""Classifier overall accuracy calculation
y_pred contains the outputs of the network for the validation data
y_true are the correct answers (0.0 for normal, 1.0 for anomaly)
this is the function that should be used for accuracy calculations
"""
# initialize all arrays
accuracy = 0
n_normal = np.sum(y_true == 0)
precision = [0.0 for _ in range(len(thresholds))]
recall = [0.0 for _ in range(len(thresholds))]
# Loop on all the threshold values
for threshold_item in range(len(thresholds)):
threshold = thresholds[threshold_item]
# Binarize the result
y_pred_binary = (y_pred > threshold).astype(int)
# Build matrix of TP, TN, FP and FN
#true_negative = np.sum((y_pred_binary[0:n_normal] == 0))
false_positive = np.sum((y_pred_binary[0:n_normal] == 1))
true_positive = np.sum((y_pred_binary[n_normal:] == 1))
false_negative = np.sum((y_pred_binary[n_normal:] == 0))
# Calculate and store precision and recall
precision[threshold_item] = true_positive / max(true_positive+false_positive, 1e-9)
recall[threshold_item] = true_positive / max(true_positive+false_negative, 1e-9)
# See if the accuracy has improved
accuracy_tmp = (precision[threshold_item]+recall[threshold_item]) / 2
if accuracy_tmp > accuracy:
accuracy = accuracy_tmp
return precision, recall, accuracy
def calculate_overall_roc_auc(thresholds, y_pred, y_true) -> Tuple[List[float], List[float], float]:
"""Autoencoder ROC AUC calculation
y_pred contains the outputs of the network for the validation data
y_true are the correct answers (0.0 for normal, 1.0 for anomaly)
this is the function that should be used for accuracy calculations
"""
# initialize all arrays
roc_auc = 0
n_normal = np.sum(y_true == 0)
tpr = [0.0 for _ in range(len(thresholds))]
fpr = [0.0 for _ in range(len(thresholds))]
# Loop on all the threshold values
for threshold_item in range(1,len(thresholds)):
threshold = thresholds[threshold_item]
# Binarize the result
y_pred_binary = (y_pred > threshold).astype(int)
# Build TP and FP
tpr[threshold_item] = np.sum((y_pred_binary[n_normal:] == 1))/float(len(y_true)-n_normal)
fpr[threshold_item] = np.sum((y_pred_binary[0:n_normal] == 1))/float(n_normal)
# Force boundary condition
fpr[0] = 1
tpr[0] = 1
# Integrate
for threshold_item in range(len(thresholds)-1):
roc_auc += .5*(tpr[threshold_item]+tpr[threshold_item+1])*(fpr[threshold_item]-fpr[threshold_item+1])
return tpr, fpr, roc_auc
def calculate_class_stats(all_scores, classes) -> dict:
"""Calculate stats for individual stats of each class"""
from sklearn.metrics import (precision_recall_curve, roc_curve, auc) # pylint: disable=import-outside-toplevel
stats = {}
normal_pred = all_scores[0]
total_scores = len(normal_pred)
for i in range(1, len(all_scores)):
abnormal_scores = all_scores[i]
total_scores += len(abnormal_scores)
y_pred = np.append(normal_pred, abnormal_scores)
y_true = np.append(np.zeros_like(normal_pred), np.ones_like(abnormal_scores))
fpr, tpr, thr = roc_curve(y_true, y_pred)
roc_auc = auc(fpr, tpr)
precision, recall, _ = precision_recall_curve(y_true, y_pred)
stats[classes[i]] = \
{
'fpr': fpr,
'tpr': tpr,
'thr': thr,
'auc': roc_auc,
'precision': precision,
'recall': recall
}
# If more than 2 classes were provided then generate a stat for:
# normal + <all other classes>
if len(classes) > 2:
y_pred = np.empty((total_scores,), dtype=np.float32)
y_true = np.empty((total_scores,), dtype=np.int32)
offset = 0
for i, class_scores in enumerate(all_scores):
n_samples = len(class_scores)
y_pred[offset : offset + n_samples] = class_scores
if i == 0:
y_true[offset : offset + n_samples] = np.zeros_like(class_scores)
else:
y_true[offset : offset + n_samples] = np.ones_like(class_scores)
offset += n_samples
fpr, tpr, thr = roc_curve(y_true, y_pred)
roc_auc = auc(fpr, tpr)
precision, recall, _ = precision_recall_curve(y_true, y_pred)
stats['all'] = \
{
'fpr': fpr,
'tpr': tpr,
'thr': thr,
'auc': roc_auc,
'precision': precision,
'recall': recall
}
return stats
def plot_overall_roc(results, output_dir:str, show:bool, logger: logging.Logger):
"""Generate a plot of the AUC ROC evaluation results"""
name = results['name']
fpr = results['overall_fpr']
tpr = results['overall_tpr']
roc_auc = results['overall_roc_auc']
title = f'Overall ROC: {name}'
fig = plt.figure(title)
plt.plot(fpr, tpr, label=f"auc: {roc_auc:0.3f}")
plt.xlim([0.0, 0.1])
plt.ylim([0.00, 1.01])
plt.legend(loc="lower right")
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(title)
plt.grid(which='major')
if output_dir:
output_path = output_dir + f'/{name}-overall_roc.png'
plt.savefig(output_path)
logger.debug(f'Generated {output_path}')
if show:
plt.show(block=False)
else:
fig.clear()
plt.close(fig)
def plot_overall_precision_vs_recall(results: dict, output_dir:str, show, logger: logging.Logger):
"""Generate a plot of the precision vs recall"""
name = results['name']
precision = results['overall_precision']
recall = results['overall_recall']
title = f'Precision vs Recall: {name}'
fig = plt.figure(title)
plt.plot(recall, precision)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.01])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title(title)
plt.grid()
if output_dir:
output_path = output_dir + f'/{name}-overall_precision_vs_recall.png'
plt.savefig(output_path)
logger.debug(f'Generated {output_dir}')
if show:
plt.show(block=False)
else:
fig.clear()
plt.close(fig)
def plot_histogram(results: dict, output_dir:str, show, logger: logging.Logger):
"""Generate a historgram image diagram from the evaluation scores"""
name = results['name']
all_scores = results['all_scores']
classes = results['classes']
min_pred = 1e10
max_pred = -1e10
for scores in all_scores:
if min(scores) < min_pred:
min_pred = min(scores)
if max(scores) > max_pred:
max_pred = max(scores)
fig, ax = plt.subplots(2,1,figsize=(10,5))
plt.subplots_adjust(hspace=.4)
ax[0].set_title('Loss')
for i, class_scores in enumerate(all_scores):
ax[0].plot(class_scores, label=classes[i])
ax[0].set_xlabel('Sample index')
ax[0].set_ylabel('Predicted value')
ax[0].legend()
ax[0].grid()
ax[1].set_title('Histogram')
kwargs = dict(histtype='stepfilled', alpha=0.5, density=True, range=[min_pred, max_pred], bins=100)
for i, class_scores in enumerate(all_scores):
ax[1].hist(class_scores,**kwargs, label=classes[i])
ax[1].set_xlabel('Predicted value')
ax[1].set_ylabel('Probability')
ax[1].legend()
if output_dir:
output_path = output_dir + f'/{name}-histogram.png'
plt.savefig(output_path)
logger.debug(f'Generated {output_dir}')
if show:
plt.show(block=False)
else:
fig.clear()
plt.close(fig)
def plot_class_roc(results:dict, output_dir:str, show, logger: logging.Logger):
"""Generate a plot of the AUC ROC evaluation results"""
name = results['name']
classes = results['classes']
class_stats = results['class_stats']
fig, ax = plt.subplots(2,1,figsize=(10,10))
ax[0].set_title(f'ROC: {name}')
for class_label, stat in class_stats.items():
auc = stat['auc']
if len(classes) > 2:
label=f'AUC {class_label}: {auc:0.4f}'
else:
label=f'AUC: {auc:0.4f}'
ax[0].plot(stat['fpr'], stat['tpr'], label=label)
ax[0].set_xlim([0.0, 1.0])
ax[0].set_ylim([0.0, 1.01])
ax[0].set_xlabel('False Positive Rate')
ax[0].set_ylabel('True Positive Rate')
ax[0].legend(loc="lower right")
ax[0].grid()
ax[1].set_title('Precision vs Recall')
for class_label, stat in class_stats.items():
ax[1].plot(stat['recall'], stat['precision'], label=class_label)
ax[1].set_xlim([0.0, 1.0])
ax[1].set_ylim([0.0, 1.01])
ax[1].set_xlabel('Recall')
ax[1].set_ylabel('Precision')
if len(classes) > 2:
ax[1].legend()
ax[1].grid()
if output_dir:
output_path = output_dir + f'/{name}-class_roc.png'
plt.savefig(output_path)
logger.debug(f'Generated {output_path}')
if show:
plt.show(block=False)
else:
fig.clear()
plt.close(fig)
def _retrieve_data(x):
if isinstance(x, np.ndarray):
return x
if isinstance(x, tf.Tensor):
return x.numpy()
data = []
if hasattr(x, 'max_samples') and x.max_samples > 0:
max_samples = x.max_samples
elif hasattr(x, 'samples') and x.samples > 0:
max_samples = x.samples
else:
max_samples = 10000
for batch_x, _ in x:
if len(data) >= max_samples:
break
for sample in batch_x:
data.append(sample)
if len(data) >= max_samples:
break
try:
x.reset()
except:
pass
return np.array(data)
def _save_decoded_image(out_path, orig, decoded, score):
# pylint: disable=no-member
try:
from cv2 import cv2
except:
try:
import cv2
except:
raise RuntimeError('Failed import cv2 Python package, try running: pip install opencv-python OR pip install silabs-mltk[full]')
os.makedirs(os.path.dirname(out_path), exist_ok=True)
shape = orig.shape
if len(shape) == 1:
plt.plot(orig, 'b')
plt.plot(decoded, 'r')
plt.fill_between(np.arange(shape[0]), decoded, orig, color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.suptitle('Score: {:1.7f}'.format(abs(score)))
plt.savefig(out_path)
plt.clf()
plt.close()
elif len(shape) == 2 or len(shape) == 3:
img1 = sklearn.preprocessing.minmax_scale(orig.ravel(), feature_range=(0,255)).reshape(shape)
img2 = sklearn.preprocessing.minmax_scale(decoded.ravel(), feature_range=(0,255)).reshape(shape)
# stack the original and reconstructed image side-by-side
output = np.hstack([img1, img2])
outputs = cv2.applyColorMap(output.astype(np.uint8), cv2.COLORMAP_HOT)
scale_factor = 200 / outputs.shape[1]
width = int(outputs.shape[1] * scale_factor)
height = int(outputs.shape[0] * scale_factor)
outputs = cv2.resize(outputs, (width, height), interpolation = cv2.INTER_AREA)
outputs = cv2.putText(outputs,
text='Score: {:1.7f}'.format(abs(score)),
org=(1, 12),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=.5,
color=(0,255,0))
cv2.imwrite(out_path, outputs)
else:
raise RuntimeError('Data shape not supported')
def _encode_ndarray(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
raise TypeError(repr(object) + " is not JSON serialized")