Source code for mltk.core.preprocess.utils.tf_dataset
"""Utilities for processing `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_"""importosfromtypingimportCallable,Tuple,List,Dict,Unionimportwaveimporttensorflowastfimportnumpyasnpfrommltk.coreimportget_mltk_loggerfrommltk.utils.pathimportcreate_tempdirfrommltk.utils.process_poolimport(ProcessPool,calculate_n_jobs,get_cpu_count)from.list_directoryimportlist_dataset_directoryfrom.audioimportread_audio_filefrom.imageimportread_image_file
[docs]defload_audio_directory(directory:str,classes:List[str],unknown_class_percentage:float=1.0,silence_class_percentage:float=1.0,class_counts:Dict[str,int]=None,onehot_encode=False,shuffle:bool=False,seed=None,split:Tuple[float,float]=None,max_samples_per_class:int=-1,sample_rate_hz:int=None,return_audio_data=False,return_audio_sample_rate=False,white_list_formats:List[str]=None,follow_links=False,shuffle_index_directory:str=None,list_valid_filenames_in_directory_function:Callable=None,process_samples_function:Callable[[str,Dict[str,str]],None]=None)->Tuple[tf.data.Dataset,tf.data.Dataset]:"""Load a directory of audio samples and return a tuple of `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ (samples, label_ids) The given audio directory should have the structure:: <class1>/sample1.wav <class1>/sample2.wav ... <class1>/optional sub directory/sample9.wav <class2>/sample1.wav <class2>/sample2.wav ... <class3>/sample1.wav <class3>/sample2.wav Where each <class> is found in the given ``classes`` argument. .. seealso:: See the `Tensor Dataset API <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ for more details of how to use the returned datasets Args: directory: Directory path to audio dataset classes: List of class labels to include in the returned dataset * If ``_unknown_`` is added as an entry to the ``classes``, then this API will automatically add an 'unknown' class to the generated batches. Unused classes in the dataset directory will be randomly selected and used as an 'unknown' class. Use the ``unknown_class_percentage`` setting to control the size of this class. * If ``_silence_`` is added as an entry to the ``classes``, then this API will automatically add 'silence' samples with all zeros. Use the ``silence_class_percentage`` setting to control the size of this class. unknown_class_percentage: If an ``_unknown_`` class is added to the class list, then 'unknown' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and unknown_class_percentage=0.8, then the number of 'unknown' class samples generated will be 800. Set this parameter to None to disable this feature silence_class_percentage: If a ``_silence_`` class is added to the class list, then 'silence' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and silence_class_percentage=0.8, then the number of 'silence' class samples generated will be 800. Set this parameter to None to disable this feature class_counts: Dictionary which will be populated with the sample counts for each class onehot_encode: If true then the audio labels are onehot-encoded If false, then only the class id (corresponding to it index in the ``classes`` argument) is returned shuffle: If true, then shuffle the dataset seed: The seed to use for shuffling the dataset split: A tuple indicating the (start,stop) percentage of the dataset to return, e.g. (.75, 1.0) -> return last 25% of dataset If omitted then return the entire dataset max_samples_per_class: Maximum number of samples to return per class, this can be useful for debug to limit the dataset size sample_rate_hz: Sample rate to convert audio samples, if omitted then return native sample rate return_audio_data: If true then the audio file data is returned, if false then only the audio file path is returned return_audio_sample_rate: If true and return_audio_data is True, then the audio file data and corresponding sample rate is returned, if false then only the audio file data is return white_list_formats: List of file extensions to include in the search. If omitted then only return ``.wav`` files follow_links: If true then follow symbolic links when recursively searching the given dataset directory shuffle_index_directory: Path to directory to hold generated index of the dataset If omitted, then an index is generated at <directory>/.index list_valid_filenames_in_directory_function: This is a custom dataset processing callback. It should return a list of valid file names for the given class. It has the following function signature: .. highlight:: python .. code-block:: python def list_valid_filenames_in_directory( base_directory:str, search_class:str, white_list_formats:List[str], split:Tuple[float,float], follow_links:bool, shuffle_index_directory:str ) -> Tuple[str, List[str]] ... return search_class, filenames process_samples_function: This allows for processing the samples BEFORE they're returned by this API. This allows for adding/removing samples. It has the following function signature: .. highlight:: python .. code-block:: python def process_samples( directory:str, # The provided directory to this API sample_paths:Dict[str,str], # A dictionary: <class name>, [<sample paths relative to directory>] split:Tuple[float,float], follow_links:bool, white_list_formats:List[str], shuffle:bool, seed:int, **kwargs ) ... Returns: Returns a tuple of two tf.data.Dataset, (samples, label_ids) """white_list_formats=white_list_formatsor('.wav',)sample_paths,sample_labels=list_dataset_directory(directory=directory,classes=classes,max_samples_per_class=max_samples_per_class,list_valid_filenames_in_directory_function=list_valid_filenames_in_directory_function,shuffle_index_directory=shuffle_index_directory,unknown_class_percentage=unknown_class_percentage,unknown_class_label='_unknown_',empty_class_percentage=silence_class_percentage,empty_class_label='_silence_',split=split,shuffle=shuffle,seed=seed,white_list_formats=white_list_formats,return_absolute_paths=True,follow_links=follow_links,class_counts=class_counts,process_samples_function=process_samples_function)_update_silence_samples(sample_paths,sample_rate_hz)label_ds=tf.data.Dataset.from_tensor_slices(np.array(sample_labels,dtype=np.int32))path_ds=tf.data.Dataset.from_tensor_slices(sample_paths)ifreturn_audio_data:feature_ds=path_ds.map(lambdax:read_audio_file(x,return_numpy=False,return_sample_rate=return_audio_sample_rate))else:feature_ds=path_dsifonehot_encode:label_ds=label_ds.map(lambdax:tf.one_hot(x,depth=len(classes),dtype=tf.int32),)returnfeature_ds,label_ds
[docs]defload_image_directory(directory:str,classes:List[str],unknown_class_percentage:float=1.0,class_counts:Dict[str,int]=None,onehot_encode=False,shuffle:bool=False,seed=None,split:Tuple[float,float]=None,max_samples_per_class:int=-1,return_image_data=False,white_list_formats:List[str]=None,follow_links=False,shuffle_index_directory:str=None,list_valid_filenames_in_directory_function=None,process_samples_function=None,)->Tuple[tf.data.Dataset,tf.data.Dataset]:"""Load a directory of images samples and return a tuple of `Tensorflow Datasets <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ (samples, label_ids) The given images directory should have the structure:: <class1>/sample1.png <class1>/sample2.jpg ... <class1>/optional sub directory/sample9.png <class2>/sample1.jpg <class2>/sample2.jpg ... <class3>/sample1.jpg <class3>/sample2.jpg Where each <class> is found in the given ``classes`` argument. .. seealso:: See the `Tensor Dataset API <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ for more details of how to use the returned datasets Args: directory: Directory path to images dataset classes: List of class labels to include in the returned dataset * If ``_unknown_`` is added as an entry to the ``classes``, then this API will automatically add an 'unknown' class to the generated batches. Unused classes in the dataset directory will be randomly selected and used as an 'unknown' class. Use the ``unknown_class_percentage`` setting to control the size of this class. unknown_class_percentage: If an ``_unknown_`` class is added to the class list, then 'unknown' class samples will automatically be added to batches. This specifies the percentage of of samples to generate relative the smallest number of samples of other classes. For instance, if another class has 1000 samples and unknown_class_percentage=0.8, then the number of 'unknown' class samples generated will be 800. Set this parameter to None to disable this feature class_counts: Dictionary which will be populated with the sample counts for each class onehot_encode: If true then the audio labels are onehot-encoded If false, then only the class id (corresponding to it index in the ``classes`` argument) is returned shuffle: If true, then shuffle the dataset seed: The seed to use for shuffling the dataset split: A tuple indicating the (start,stop) percentage of the dataset to return, e.g. (.75, 1.0) -> return last 25% of dataset If omitted then return the entire dataset max_samples_per_class: Maximum number of samples to return per class, this can be useful for debug to limit the dataset size return_image_data: If true then the image file data is returned, if false then only the image file path is returned white_list_formats: List of file extensions to include in the search. If omitted then only return ``.png``, ``.jpg`` files follow_links: If true then follow symbolic links when recursively searching the given dataset directory shuffle_index_directory: Path to directory to hold generated index of the dataset If omitted, then an index is generated at <directory>/.index list_valid_filenames_in_directory_function: This is a custom dataset processing callback. It should return a list of valid file names for the given class. It has the following function signature: .. highlight:: python .. code-block:: python def list_valid_filenames_in_directory( base_directory:str, search_class:str, white_list_formats:List[str], split:Tuple[float,float], follow_links:bool, shuffle_index_directory:str ) -> Tuple[str, List[str]] ... return search_class, filenames process_samples_function: This allows for processing the samples BEFORE they're returned by this API. This allows for adding/removing samples. It has the following function signature: .. highlight:: python .. code-block:: python def process_samples( directory:str, # The provided directory to this API sample_paths:Dict[str,str], # A dictionary: <class name>, [<sample paths relative to directory>] split:Tuple[float,float], follow_links:bool, white_list_formats:List[str], shuffle:bool, seed:int, **kwargs ) ... Returns: Returns a tuple of two tf.data.Dataset, (samples, label_ids) """white_list_formats=white_list_formatsor('.png','.jpg')sample_paths,sample_labels=list_dataset_directory(directory=directory,classes=classes,max_samples_per_class=max_samples_per_class,list_valid_filenames_in_directory_function=list_valid_filenames_in_directory_function,shuffle_index_directory=shuffle_index_directory,unknown_class_percentage=unknown_class_percentage,unknown_class_label='_unknown_',split=split,shuffle=shuffle,seed=seed,white_list_formats=white_list_formats,return_absolute_paths=True,follow_links=follow_links,class_counts=class_counts,process_samples_function=process_samples_function)label_ds=tf.data.Dataset.from_tensor_slices(np.array(sample_labels,dtype=np.int32))path_ds=tf.data.Dataset.from_tensor_slices(sample_paths)ifreturn_image_data:feature_ds=path_ds.map(lambdax:read_image_file(x,return_numpy=False,target_channels=0))else:feature_ds=path_dsifonehot_encode:label_ds=label_ds.map(lambdax:tf.one_hot(x,depth=len(classes),dtype=tf.int32),)returnfeature_ds,label_ds
[docs]defparallel_process(dataset:tf.data.Dataset,callback:Callable,dtype:Union[np.dtype,Tuple[np.dtype]]=np.float32,n_jobs:Union[int,float]=4,job_batch_size:int=None,pool:ProcessPool=None,name='ParallelProcess',env:Dict[str,str]=None,disable_gpu_in_subprocesses=True,)->Tuple[tf.data.Dataset,ProcessPool]:"""Parallel process the dataset This will invoke the given ``callback`` across the available CPUs in the system which can greatly improve throughput. .. note:: This uses the `tf.numpy_function <https://www.tensorflow.org/api_docs/python/tf/numpy_function>`_ API which can slow processing in some instances. Args: dataset: The Tensorflow dataset to parallel process callback: The callback to invoke in parallel processes This callback must be at the root of the python module (i.e. it cannot be nested or a class method) dtype: The data type that the ``callback`` returns, this can also be a list of dtypes if the callback returns multiple np.ndarrays n_jobs: The number of jobs (i.e. CPU cores) to use for processing This can either be an integer or a float between (0,1.0] job_batch_size: This size of the batches to use for processing If omitted, then use the calculated ``n_jobs`` pool: An existing processing pool. If omitted then create a new pool name: The prefix to use in the model graph env: Optional OS environment variables to export in the parallel subprocesses disable_gpu_in_subprocesses: By default the GPU is disabled in the parallel subprocesses Returns: (tf.data.Dataset, ProcessPool), a tuple of the updated dataset with the parallel processing and the associated process pool """n_jobs=calculate_n_jobs(n_jobs)job_batch_size=job_batch_sizeorn_jobsprocess_pool=poolorProcessPool(entry_point=callback,n_jobs=n_jobs,name=name,env=env,disable_gpu_in_subprocesses=disable_gpu_in_subprocesses,logger=get_mltk_logger())def_np_parallel_process(*args):pool_batch=process_pool.create_batch(len(args[0]))iflen(args)==1:forxinargs[0]:process_pool(x,pool_batch=pool_batch)else:foriinrange(pool_batch.size):batch_x=[]forainargs:batch_x.append(a[i])process_pool(*batch_x,pool_batch=pool_batch)results=pool_batch.wait()ifisinstance(dtype,(list,tuple)):retval=[]r0=results[0]fori,tinenumerate(dtype):r0_i=r0[i]r0_i_shape=(pool_batch.size,)+r0_i.shaperetval.append(np.empty(r0_i_shape,dtype=t))forbatch_i,rinenumerate(results):forr_i,xinenumerate(r):retval[r_i][batch_i]=xreturnretvalelse:returnnp.array(results,dtype=dtype)@tf.function#(autograph=False)def_tf_parallel_process(*args):returntf.numpy_function(_np_parallel_process,args,dtype)ds=dataset.batch(job_batch_size)ds=ds.map(_tf_parallel_process)ds=ds.unbatch()returnds,process_pool
[docs]defenable_numpy_behavior()->bool:"""Enable NumPy behavior on Tensors. NOTE: This requires Tensorflow 2.5+ Enabling NumPy behavior has three effects: - It adds to tf.Tensor some common NumPy methods such as T, reshape and ravel. - It changes dtype promotion in tf.Tensor operators to be compatible with NumPy. For example, tf.ones([], tf.int32) + tf.ones([], tf.float32) used to throw a "dtype incompatible" error, but after this it will return a float64 tensor (obeying NumPy's promotion rules). - It enhances tf.Tensor's indexing capability to be on par with NumPy's. Refer to the `Tensorflow docs <https://tensorflow.org/versions/r2.5/api_docs/python/tf/experimental/numpy/experimental_enable_numpy_behavior>`_ for more details. Returns: True if the numpy behavior was enabled, False else """enabled_numpy_behavior=globals().get('enabled_numpy_behavior',None)ifenabled_numpy_behaviorisNone:try:enabled_numpy_behavior=Truefromtensorflow.python.ops.numpy_opsimportnp_confignp_config.enable_numpy_behavior()except:try:tf.experimental.numpy.experimental_enable_numpy_behavior()except:enabled_numpy_behavior=Falseglobals()['enabled_numpy_behavior']=enabled_numpy_behaviorreturnenabled_numpy_behavior
def_update_silence_samples(sample_paths:List[str],sample_rate_hz:int):"""This will create a "silence" audio sample and add it to the dataset where necessary"""ifsample_rate_hz==-1ornotsample_rate_hz:sample_rate_hz=16000silence_wav_path=create_tempdir('temp')+f'/silence_{sample_rate_hz}hz.wav'ifnotos.path.exists(silence_wav_path):withwave.open(silence_wav_path,mode='wb')aswav:nchannels=1nframes=sample_rate_hz*1comptype='NONE'compname='not compressed'wav.setparams((nchannels,2,sample_rate_hz,nframes,comptype,compname))wav.writeframes(bytearray([0]*nframes*2))fori,pinenumerate(sample_paths):ifpisNone:sample_paths[i]=silence_wav_path
Important: We use cookies only for functional and traffic analytics.
We DO NOT use cookies for any marketing purposes. By using our site you acknowledge you have read and understood our Cookie Policy.