Source code for mltk.core.preprocess.utils.tf_dataset

"""Utilities for processing `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_"""
import os
from typing import Callable, Tuple, List, Dict, Union
import wave
import tensorflow as tf
import numpy as np

from mltk.core import get_mltk_logger
from mltk.utils.path import create_tempdir
from mltk.utils.process_pool import (ProcessPool, calculate_n_jobs, get_cpu_count)


from .list_directory import list_dataset_directory
from .audio import read_audio_file
from .image import read_image_file


[docs]def load_audio_directory( directory:str, classes:List[str], unknown_class_percentage:float=1.0, silence_class_percentage:float=1.0, class_counts:Dict[str,int] = None, onehot_encode=False, shuffle:bool=False, seed=None, split:Tuple[float,float]=None, max_samples_per_class:int=-1, sample_rate_hz:int=None, return_audio_data=False, return_audio_sample_rate=False, white_list_formats:List[str]=None, follow_links=False, shuffle_index_directory:str=None, list_valid_filenames_in_directory_function:Callable=None, process_samples_function:Callable[[str,Dict[str,str]],None]=None ) -> Tuple[tf.data.Dataset, tf.data.Dataset]: """Load a directory of audio samples and return a tuple of `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ (samples, label_ids) The given audio directory should have the structure:: <class1>/sample1.wav <class1>/sample2.wav ... <class1>/optional sub directory/sample9.wav <class2>/sample1.wav <class2>/sample2.wav ... <class3>/sample1.wav <class3>/sample2.wav Where each <class> is found in the given ``classes`` argument. .. seealso:: See the `Tensor Dataset API <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ for more details of how to use the returned datasets Args: directory: Directory path to audio dataset classes: List of class labels to include in the returned dataset * If ``_unknown_`` is added as an entry to the ``classes``, then this API will automatically add an 'unknown' class to the generated batches. Unused classes in the dataset directory will be randomly selected and used as an 'unknown' class. Use the ``unknown_class_percentage`` setting to control the size of this class. * If ``_silence_`` is added as an entry to the ``classes``, then this API will automatically add 'silence' samples with all zeros. Use the ``silence_class_percentage`` setting to control the size of this class. unknown_class_percentage: If an ``_unknown_`` class is added to the class list, then 'unknown' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and unknown_class_percentage=0.8, then the number of 'unknown' class samples generated will be 800. Set this parameter to None to disable this feature silence_class_percentage: If a ``_silence_`` class is added to the class list, then 'silence' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and silence_class_percentage=0.8, then the number of 'silence' class samples generated will be 800. Set this parameter to None to disable this feature class_counts: Dictionary which will be populated with the sample counts for each class onehot_encode: If true then the audio labels are onehot-encoded If false, then only the class id (corresponding to it index in the ``classes`` argument) is returned shuffle: If true, then shuffle the dataset seed: The seed to use for shuffling the dataset split: A tuple indicating the (start,stop) percentage of the dataset to return, e.g. (.75, 1.0) -> return last 25% of dataset If omitted then return the entire dataset max_samples_per_class: Maximum number of samples to return per class, this can be useful for debug to limit the dataset size sample_rate_hz: Sample rate to convert audio samples, if omitted then return native sample rate return_audio_data: If true then the audio file data is returned, if false then only the audio file path is returned return_audio_sample_rate: If true and return_audio_data is True, then the audio file data and corresponding sample rate is returned, if false then only the audio file data is return white_list_formats: List of file extensions to include in the search. If omitted then only return ``.wav`` files follow_links: If true then follow symbolic links when recursively searching the given dataset directory shuffle_index_directory: Path to directory to hold generated index of the dataset If omitted, then an index is generated at <directory>/.index list_valid_filenames_in_directory_function: This is a custom dataset processing callback. It should return a list of valid file names for the given class. It has the following function signature: .. highlight:: python .. code-block:: python def list_valid_filenames_in_directory( base_directory:str, search_class:str, white_list_formats:List[str], split:Tuple[float,float], follow_links:bool, shuffle_index_directory:str ) -> Tuple[str, List[str]] ... return search_class, filenames process_samples_function: This allows for processing the samples BEFORE they're returned by this API. This allows for adding/removing samples. It has the following function signature: .. highlight:: python .. code-block:: python def process_samples( directory:str, # The provided directory to this API sample_paths:Dict[str,str], # A dictionary: <class name>, [<sample paths relative to directory>] split:Tuple[float,float], follow_links:bool, white_list_formats:List[str], shuffle:bool, seed:int, **kwargs ) ... Returns: Returns a tuple of two tf.data.Dataset, (samples, label_ids) """ white_list_formats = white_list_formats or ('.wav',) sample_paths, sample_labels = list_dataset_directory( directory=directory, classes=classes, max_samples_per_class=max_samples_per_class, list_valid_filenames_in_directory_function=list_valid_filenames_in_directory_function, shuffle_index_directory=shuffle_index_directory, unknown_class_percentage=unknown_class_percentage, unknown_class_label='_unknown_', empty_class_percentage=silence_class_percentage, empty_class_label = '_silence_', split=split, shuffle=shuffle, seed=seed, white_list_formats=white_list_formats, return_absolute_paths=True, follow_links=follow_links, class_counts=class_counts, process_samples_function=process_samples_function ) _update_silence_samples(sample_paths, sample_rate_hz) label_ds = tf.data.Dataset.from_tensor_slices(np.array(sample_labels, dtype=np.int32)) path_ds = tf.data.Dataset.from_tensor_slices(sample_paths) if return_audio_data: feature_ds = path_ds.map( lambda x: read_audio_file(x, return_numpy=False, return_sample_rate=return_audio_sample_rate) ) else: feature_ds = path_ds if onehot_encode: label_ds = label_ds.map( lambda x: tf.one_hot(x, depth=len(classes), dtype=tf.int32), ) return feature_ds, label_ds
[docs]def load_image_directory( directory:str, classes:List[str], unknown_class_percentage:float=1.0, class_counts:Dict[str,int] = None, onehot_encode=False, shuffle:bool=False, seed=None, split:Tuple[float,float]=None, max_samples_per_class:int=-1, return_image_data = False, white_list_formats:List[str]=None, follow_links=False, shuffle_index_directory:str=None, list_valid_filenames_in_directory_function=None, process_samples_function=None, ) -> Tuple[tf.data.Dataset, tf.data.Dataset]: """Load a directory of images samples and return a tuple of `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ (samples, label_ids) The given images directory should have the structure:: <class1>/sample1.png <class1>/sample2.jpg ... <class1>/optional sub directory/sample9.png <class2>/sample1.jpg <class2>/sample2.jpg ... <class3>/sample1.jpg <class3>/sample2.jpg Where each <class> is found in the given ``classes`` argument. .. seealso:: See the `Tensor Dataset API <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ for more details of how to use the returned datasets Args: directory: Directory path to images dataset classes: List of class labels to include in the returned dataset * If ``_unknown_`` is added as an entry to the ``classes``, then this API will automatically add an 'unknown' class to the generated batches. Unused classes in the dataset directory will be randomly selected and used as an 'unknown' class. Use the ``unknown_class_percentage`` setting to control the size of this class. unknown_class_percentage: If an ``_unknown_`` class is added to the class list, then 'unknown' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and unknown_class_percentage=0.8, then the number of 'unknown' class samples generated will be 800. Set this parameter to None to disable this feature class_counts: Dictionary which will be populated with the sample counts for each class onehot_encode: If true then the audio labels are onehot-encoded If false, then only the class id (corresponding to it index in the ``classes`` argument) is returned shuffle: If true, then shuffle the dataset seed: The seed to use for shuffling the dataset split: A tuple indicating the (start,stop) percentage of the dataset to return, e.g. (.75, 1.0) -> return last 25% of dataset If omitted then return the entire dataset max_samples_per_class: Maximum number of samples to return per class, this can be useful for debug to limit the dataset size return_image_data: If true then the image file data is returned, if false then only the image file path is returned white_list_formats: List of file extensions to include in the search. If omitted then only return ``.png``, ``.jpg`` files follow_links: If true then follow symbolic links when recursively searching the given dataset directory shuffle_index_directory: Path to directory to hold generated index of the dataset If omitted, then an index is generated at <directory>/.index list_valid_filenames_in_directory_function: This is a custom dataset processing callback. It should return a list of valid file names for the given class. It has the following function signature: .. highlight:: python .. code-block:: python def list_valid_filenames_in_directory( base_directory:str, search_class:str, white_list_formats:List[str], split:Tuple[float,float], follow_links:bool, shuffle_index_directory:str ) -> Tuple[str, List[str]] ... return search_class, filenames process_samples_function: This allows for processing the samples BEFORE they're returned by this API. This allows for adding/removing samples. It has the following function signature: .. highlight:: python .. code-block:: python def process_samples( directory:str, # The provided directory to this API sample_paths:Dict[str,str], # A dictionary: <class name>, [<sample paths relative to directory>] split:Tuple[float,float], follow_links:bool, white_list_formats:List[str], shuffle:bool, seed:int, **kwargs ) ... Returns: Returns a tuple of two tf.data.Dataset, (samples, label_ids) """ white_list_formats = white_list_formats or ('.png','.jpg') sample_paths, sample_labels = list_dataset_directory( directory=directory, classes=classes, max_samples_per_class=max_samples_per_class, list_valid_filenames_in_directory_function=list_valid_filenames_in_directory_function, shuffle_index_directory=shuffle_index_directory, unknown_class_percentage=unknown_class_percentage, unknown_class_label='_unknown_', split=split, shuffle=shuffle, seed=seed, white_list_formats=white_list_formats, return_absolute_paths=True, follow_links=follow_links, class_counts=class_counts, process_samples_function=process_samples_function ) label_ds = tf.data.Dataset.from_tensor_slices(np.array(sample_labels, dtype=np.int32)) path_ds = tf.data.Dataset.from_tensor_slices(sample_paths) if return_image_data: feature_ds = path_ds.map(lambda x: read_image_file(x, return_numpy=False, target_channels=0)) else: feature_ds = path_ds if onehot_encode: label_ds = label_ds.map( lambda x: tf.one_hot(x, depth=len(classes), dtype=tf.int32), ) return feature_ds, label_ds
[docs]def parallel_process( dataset:tf.data.Dataset, callback:Callable, dtype:Union[np.dtype, Tuple[np.dtype]]=np.float32, n_jobs:Union[int,float]=4, job_batch_size:int=None, pool:ProcessPool=None, name='ParallelProcess', env:Dict[str,str]=None, disable_gpu_in_subprocesses=True, ) -> Tuple[tf.data.Dataset, ProcessPool]: """Parallel process the dataset This will invoke the given ``callback`` across the available CPUs in the system which can greatly improve throughput. .. note:: This uses the `tf.numpy_function <https://www.tensorflow.org/api_docs/python/tf/numpy_function>`_ API which can slow processing in some instances. Args: dataset: The Tensorflow dataset to parallel process callback: The callback to invoke in parallel processes This callback must be at the root of the python module (i.e. it cannot be nested or a class method) dtype: The data type that the ``callback`` returns, this can also be a list of dtypes if the callback returns multiple np.ndarrays n_jobs: The number of jobs (i.e. CPU cores) to use for processing This can either be an integer or a float between (0,1.0] job_batch_size: This size of the batches to use for processing If omitted, then use the calculated ``n_jobs`` pool: An existing processing pool. If omitted then create a new pool name: The prefix to use in the model graph env: Optional OS environment variables to export in the parallel subprocesses disable_gpu_in_subprocesses: By default the GPU is disabled in the parallel subprocesses Returns: (tf.data.Dataset, ProcessPool), a tuple of the updated dataset with the parallel processing and the associated process pool """ n_jobs = calculate_n_jobs(n_jobs) job_batch_size = job_batch_size or n_jobs process_pool = pool or ProcessPool( entry_point=callback, n_jobs=n_jobs, name=name, env=env, disable_gpu_in_subprocesses=disable_gpu_in_subprocesses, logger=get_mltk_logger() ) def _np_parallel_process(*args): pool_batch = process_pool.create_batch(len(args[0])) if len(args) == 1: for x in args[0]: process_pool(x, pool_batch=pool_batch) else: for i in range(pool_batch.size): batch_x = [] for a in args: batch_x.append(a[i]) process_pool(*batch_x, pool_batch=pool_batch) results = pool_batch.wait() if isinstance(dtype, (list,tuple)): retval = [] r0 = results[0] for i, t in enumerate(dtype): r0_i = r0[i] r0_i_shape = (pool_batch.size,) + r0_i.shape retval.append(np.empty(r0_i_shape, dtype=t)) for batch_i, r in enumerate(results): for r_i, x in enumerate(r): retval[r_i][batch_i] = x return retval else: return np.array(results, dtype=dtype) @tf.function#(autograph=False) def _tf_parallel_process(*args): return tf.numpy_function( _np_parallel_process, args, dtype ) ds = dataset.batch(job_batch_size) ds = ds.map(_tf_parallel_process) ds = ds.unbatch() return ds, process_pool
[docs]def enable_numpy_behavior() -> bool: """Enable NumPy behavior on Tensors. NOTE: This requires Tensorflow 2.5+ Enabling NumPy behavior has three effects: - It adds to tf.Tensor some common NumPy methods such as T, reshape and ravel. - It changes dtype promotion in tf.Tensor operators to be compatible with NumPy. For example, tf.ones([], tf.int32) + tf.ones([], tf.float32) used to throw a "dtype incompatible" error, but after this it will return a float64 tensor (obeying NumPy's promotion rules). - It enhances tf.Tensor's indexing capability to be on par with NumPy's. Refer to the `Tensorflow docs <https://tensorflow.org/versions/r2.5/api_docs/python/tf/experimental/numpy/experimental_enable_numpy_behavior>`_ for more details. Returns: True if the numpy behavior was enabled, False else """ enabled_numpy_behavior = globals().get('enabled_numpy_behavior', None) if enabled_numpy_behavior is None: try: enabled_numpy_behavior = True from tensorflow.python.ops.numpy_ops import np_config np_config.enable_numpy_behavior() except: try: tf.experimental.numpy.experimental_enable_numpy_behavior() except: enabled_numpy_behavior = False globals()['enabled_numpy_behavior'] = enabled_numpy_behavior return enabled_numpy_behavior
def _update_silence_samples(sample_paths:List[str], sample_rate_hz:int): """This will create a "silence" audio sample and add it to the dataset where necessary""" if sample_rate_hz == -1 or not sample_rate_hz: sample_rate_hz = 16000 silence_wav_path = create_tempdir('temp') + f'/silence_{sample_rate_hz}hz.wav' if not os.path.exists(silence_wav_path): with wave.open(silence_wav_path, mode='wb') as wav: nchannels = 1 nframes = sample_rate_hz * 1 comptype = 'NONE' compname = 'not compressed' wav.setparams((nchannels, 2, sample_rate_hz, nframes, comptype, compname)) wav.writeframes(bytearray([0] * nframes * 2)) for i, p in enumerate(sample_paths): if p is None: sample_paths[i] = silence_wav_path