import random
import copy
import os
from typing import Dict
import numpy as np
from PIL import ImageEnhance
from mltk.core.keras import (
ImageDataGenerator,
array_to_img,
img_to_array
)
from .directory_iterator import ParallelDirectoryIterator
[docs]class ParallelImageDataGenerator(ImageDataGenerator):
'''Generate batches of tensor image data with real-time data augmentation.
The data will be looped over (in batches).
This class works the exact same as the `Keras ImageDataGenerator <https://keras.io/preprocessing/image>`_
except images are processed in the background using the standard Python
`multiprocessing <https://docs.python.org/3.10/library/multiprocessing.html>`_ module.
The can greatly improve training times as multiple CPU cores can process batch images in the background while
training is done in the foreground on the GPU(s).
(The standard Keras ImageDataGenerator module processes batch images then trains serially)
From the outside, this module works the exact same as `Keras ImageDataGenerator <https://keras.io/preprocessing/image>`_.
Args:
cores: The number of CPU cores to use for spawned image batch processes.
This number can be either an integer, which specifies the exact number
of CPU cores, or it can be a float < 1.0. The float is the percentage
of CPU cores to use for processing.
A large number of CPU cores will consume more system memory.
debug: Use a ThreadPool rather than a Multiprocessing Pool,
this allows for single-step debugging the processing function
max_batches_pending: This is the number of processed batches to queue.
A larger number can improving training times at the expense of
increased system memory usage.
validation_augmentation_enabled: If True, then augmentations will be applied to
validation data. If False, then no augmentations will be applied to validation data.
featurewise_center: Boolean.
Set input mean to 0 over the dataset, feature-wise.
samplewise_center: Boolean. Set each sample mean to 0.
featurewise_std_normalization: Boolean.
Divide inputs by std of the dataset, feature-wise.
samplewise_std_normalization: Boolean. Divide each input by its std.
zca_epsilon: epsilon for ZCA whitening. Default is 1e-6.
zca_whitening: Boolean. Apply ZCA whitening.
rotation_range: Int. Degree range for random rotations.
width_shift_range: Float, 1-D array-like or int
* float: fraction of total width, if < 1, or pixels if >= 1.
* 1-D array-like: random elements from the array.
* int: integer number of pixels from interval
``(-width_shift_range, +width_shift_range)``
* With ``width_shift_range=2`` possible values
are integers ``[-1, 0, +1]``,
same as with ``width_shift_range=[-1, 0, +1]``,
while with ``width_shift_range=1.0`` possible values are floats
in the interval [-1.0, +1.0).
height_shift_range: Float, 1-D array-like or int
* float: fraction of total height, if < 1, or pixels if >= 1.
* 1-D array-like: random elements from the array.
* int: integer number of pixels from interval
``(-height_shift_range, +height_shift_range)``
* With ``height_shift_range=2`` possible values
are integers ``[-1, 0, +1]``,
same as with ``height_shift_range=[-1, 0, +1]``,
while with ``height_shift_range=1.0`` possible values are floats
in the interval ``[-1.0, +1.0)``.
brightness_range: Tuple or list of two floats. Range for picking
a brightness shift value from.
shear_range: Float. Shear Intensity
(Shear angle in counter-clockwise direction in degrees)
zoom_range: Float or [lower, upper]. Range for random zoom.
If a float, ``[lower, upper] = [1-zoom_range, 1+zoom_range]``.
channel_shift_range: Float. Range for random channel shifts.
fill_mode: One of {"constant", "nearest", "reflect" or "wrap"}.
Default is 'nearest'.
Points outside the boundaries of the input are filled
according to thegiven mode:
* **constant** ``kkkkkkkk|abcd|kkkkkkkk (cval=k)``
* **nearest** ``aaaaaaaa|abcd|dddddddd``
* **reflect** ``abcddcba|abcd|dcbaabcd``
* **wrap** ``abcdabcd|abcd|abcdabcd``
cval: Float or Int.
Value used for points outside the boundaries
when ``fill_mode = "constant"``.
horizontal_flip: Boolean. Randomly flip inputs horizontally.
vertical_flip: Boolean. Randomly flip inputs vertically.
rescale: rescaling factor. Defaults to None.
If None or 0, no rescaling is applied,
otherwise we multiply the data by the value provided
(after applying all other transformations).
get_batch_function: function that should return the transformed batch.
If this is omitted, then iterator.get_batches_of_transformed_samples() is used.
NOTE: If this is supplied, then none of the other callbacks are used.
This function should have the following signature:
.. highlight:: python
.. code-block:: python
def get_batches_of_transformed_samples(
batch_index:int,
filenames:List[str],
classes:List[int],
params:ParallelProcessParams
) -> Tuple[int, Tuple[np.ndarray, np.ndarray]]:
...
return batch_index, (batch_x, batch_y)
noaug_preprocessing_function: function that will be applied on each input.
The function will run after the image is resized but before it is augmented or standardized.
The function should take at least two arguments and return the processed image:
.. highlight:: python
.. code-block:: python
def my_processing_func(
params: ParallelProcessParams,
x : np.ndarray,
class_id: Optional[int],
filename: Optional[str],
batch_index: Optional[int],
batch_class_ids: Optional[List[int]],
batch_filenames: Optional[List[str]]
) -> np.ndarray:
...
return processed_x
preprocessing_function: function that will be applied on each input.
The function will run after the image is resized (if ``interpolation != None``) and augmented but before it is standardized.
The function should take at least two arguments and return the processed image:
.. highlight:: python
.. code-block:: python
def my_processing_func(
params: ParallelProcessParams,
x : np.ndarray,
class_id: Optional[int],
filename: Optional[str],
batch_index: Optional[int],
batch_class_ids: Optional[List[int]],
batch_filenames: Optional[List[str]]
) -> np.ndarray:
...
return processed_x
validation_split: Float. Fraction of images reserved for validation
(strictly between 0 and 1).
dtype: Dtype to use for the generated arrays.
brightness_range: Tuple of two floats. Control the brightness of an image.
An enhancement factor of 0.0 gives a black image.
A factor of 1.0 gives the original image.
contrast_range: Tuple of two floats. Control the contrast of an image,
similar to the contrast control on a TV set.
An enhancement factor of 0.0 gives a solid grey image. A factor of 1.0 gives the original image.
noise: List one one or more of the following:
* **gauss** - Gaussian-distributed additive noise.
* **poisson** - Poisson-distributed noise generated from the data.
* **s&p** - Replaces random pixels with 0 or 1.
* **speckle** - Multiplicative noise using out = image + n*image,where
n is uniform noise with specified mean & variance.
The noise type used will be randomly selected from the provided list per image
random_transforms_enabled: Enable random data augmentations. Default True
max_samples_per_class: The maximum number of samples to use for a given class.
If ``-1`` then use all available samples. Default -1.
disable_gpu_in_subprocesses: Disable GPU usage in spawned subprocesses, default: true
batch_size: Generated batch size. This overrides the value given to flow_from_directory()
Set to ``-1`` to set the batch size to be the number of samples
'''
[docs] def __init__(
self, *,
cores=0.25,
debug=False,
max_batches_pending=4,
get_batch_function=None,
preprocessing_function=None,
noaug_preprocessing_function=None,
validation_augmentation_enabled=True,
contrast_range=None,
noise=None,
random_transforms_enabled=True,
max_samples_per_class = -1,
disable_gpu_in_subprocesses=True,
save_to_dir=None,
save_prefix=None,
save_format=None,
batch_size=None,
**kwargs
):
ImageDataGenerator.__init__(self, **_remove_preprocessing_function_arg(kwargs))
self.cores = cores
self.debug = debug
self.random_transforms_enabled = random_transforms_enabled
self.validation_augmentation_enabled = validation_augmentation_enabled
self.max_batches_pending = max_batches_pending
self.get_batch_function = get_batch_function
self.parallel_preprocessing_function = preprocessing_function
self.parallel_noaug_preprocessing_function = noaug_preprocessing_function
self.contrast_range = contrast_range
self.noise = noise
self.max_samples_per_class = max_samples_per_class
self.disable_gpu_in_subprocesses = disable_gpu_in_subprocesses
self.save_to_dir = save_to_dir
self.save_prefix = save_prefix
self.save_format = save_format
self.batch_size = batch_size
[docs] def flow_from_directory(
self,
directory,
target_size=(256, 256),
color_mode='rgb',
classes=None,
class_mode='categorical',
batch_size=32,
batch_shape=None,
shuffle=True,
shuffle_index_dir=None,
seed=None,
save_to_dir=None,
save_prefix=None,
save_format='png',
follow_links=False,
subset=None,
interpolation='bilinear',
list_valid_filenames_in_directory_function=None,
class_counts:Dict[str,int]=None,
):
"""Create the ParallelImageDataGenerator with the given dataset directory
Takes the path to a directory & generates batches of augmented data.
Args:
directory: string, path to the target directory. It should contain one subdirectory per class.
Any PNG, JPG, BMP, PPM or TIF images inside each of the subdirectories directory tree will be included in the generator.
target_size: Tuple of integers (height, width), defaults to (256, 256).
The dimensions to which all images found will be resized.
color_mode: One of "grayscale", "rgb", "rgba". Default: "rgb".
Whether the images will be converted to have 1, 3, or 4 channels.
classes: Required, list of class subdirectories (e.g. ['dogs', 'cats']).
class_mode: One of "categorical", "binary", "sparse", "input", or None.
Default: "categorical". Determines the type of label arrays that are returned:
* **categorical** will be 2D one-hot encoded labels,
* **binary** will be 1D binary labels, "sparse" will be 1D integer labels,
* **input** will be images identical to input images (mainly used to work with autoencoders).
* **None** no labels are returned (the generator will only yield batches of image data, which is useful to use with model.predict()).
Please note that in case of class_mode None, the data still needs to reside in a subdirectory of directory for it to work correctly.
batch_size: Size of the batches of data (default: 32).
batch_shape: Shape of the batches of data. If omitted, this defaults to (batch_size, target_size)
shuffle: Whether to shuffle the data (default: True) If set to False, sorts the data in alphanumeric order.
shuffle_index_dir: If given, the dataset directory will be shuffled the first time it is processed and
and an index file containing the shuffled file names is generated at the directory specified
by ``shuffle_index_dir``. The index file is reused to maintain the shuffled order for subsequent processing.
If ``None``, then the dataset samples are sorted alphabetically and saved to an index file in the dataset directory.
The alphabetical index file is used for subsequent processing.
Default: ``None``
seed: Optional random seed for shuffling and transformations.
follow_links: Whether to follow symlinks inside class subdirectories (default: False).
subset: Subset of data ("training" or "validation") if validation_split is set in ParallelAudioDataGenerator.
interpolation: Interpolation method used to resample the image if the target size is different from that of the loaded image.
Supported methods are ``none``, ``nearest``, ``bilinear``, ``bicubic``, ``lanczos``, ``box``, and ``hamming``.
If ``none`` is used, then the image is **not automatically resized**.
By default, ``bilinear`` is used.
list_valid_filenames_in_directory_function: This is a custom function called for each class,
that should return a list of valid file names for the given class.
It has the following function signature:
.. highlight:: python
.. code-block:: python
def list_valid_filenames_in_directory(
base_directory:str,
search_class:str,
white_list_formats:List[str],
split:float,
follow_links:bool,
shuffle_index_directory:str
) -> Tuple[str, List[str]]
...
return search_class, filenames
Returns:
A DirectoryIterator yielding tuples of (x, y) where x is a numpy array containing a batch of images with
shape (batch_size, target_size, channels) and y is a numpy array of corresponding labels.
"""
save_to_dir = self.save_to_dir or save_to_dir
save_prefix = self.save_prefix or save_prefix
save_format = self.save_format or save_format
if batch_shape is None:
if color_mode == 'rgba':
image_shape = tuple(target_size) + (4,)
elif self.color_mode == 'rgb':
image_shape = tuple(target_size) + (3,)
else:
image_shape = tuple(target_size) + (1,)
batch_shape = (batch_size,) + image_shape
else:
batch_shape = tuple(batch_shape)
# If the batch_size was specified in the constructor
# then that should override everything else
if self.batch_size is not None:
batch_shape = (self.batch_size,) + batch_shape[1:]
if save_to_dir:
os.makedirs(save_to_dir, exist_ok=True)
return ParallelDirectoryIterator(
directory,
self,
target_size=target_size,
color_mode=color_mode,
classes=classes,
class_mode=class_mode,
dtype=self.dtype,
data_format=self.data_format,
batch_shape=batch_shape,
shuffle=shuffle,
shuffle_index_dir=shuffle_index_dir,
seed=seed,
save_to_dir=save_to_dir,
save_prefix=save_prefix,
save_format=save_format,
follow_links=follow_links,
subset=subset,
interpolation=interpolation,
cores=self.cores,
debug = self.debug,
max_batches_pending=self.max_batches_pending,
get_batch_function=self.get_batch_function,
preprocessing_function=self.parallel_preprocessing_function,
noaug_preprocessing_function=self.parallel_noaug_preprocessing_function,
list_valid_filenames_in_directory_function=list_valid_filenames_in_directory_function,
max_samples_per_class=self.max_samples_per_class,
disable_gpu_in_subprocesses=self.disable_gpu_in_subprocesses,
class_counts=class_counts
)
[docs] def flow(
self,
x,
y=None,
batch_size=32,
shuffle=True,
sample_weight=None,
seed=None,
save_to_dir=None,
save_prefix='',
save_format='png',
subset=None
):
"""Takes data & label arrays, generates batches of augmented data.
Args:
x: Input data. Numpy array of rank 4 or a tuple. If tuple, the first
element should contain the images and the second element another numpy
array or a list of numpy arrays that gets passed to the output without
any modifications. Can be used to feed the model miscellaneous data
along with the images. In case of grayscale data, the channels axis of
the image array should have value 1, in case of RGB data, it should
have value 3, and in case of RGBA data, it should have value 4.
y: Labels.
batch_size: Int (default: 32).
shuffle: Boolean (default: True).
sample_weight: Sample weights.
seed: Int (default: None).
save_to_dir: None or str (default: None). This allows you to optionally
specify a directory to which to save the augmented pictures being
generated (useful for visualizing what you are doing).
save_prefix: Str (default: ``''``). Prefix to use for filenames of saved
pictures (only relevant if `save_to_dir` is set).
save_format: one of "png", "jpeg", "bmp", "pdf", "ppm", "gif",
"tif", "jpg"
(only relevant if ``save_to_dir`` is set). Default: "png".
subset: Subset of data (``"training"`` or ``"validation"``) if
``validation_split`` is set in ``ImageDataGenerator``.
Returns:
An ``Iterator`` yielding tuples of ``(x, y)``
where ``x`` is a numpy array of image data
(in the case of a single image input) or a list
of numpy arrays (in the case with
additional inputs) and ``y`` is a numpy array
of corresponding labels. If 'sample_weight' is not None,
the yielded tuples are of the form ``(x, y, sample_weight)``.
If ``y`` is None, only the numpy array ``x`` is returned.
Raises:
ValueError: If the Value of the argument, ``subset`` is other than
"training" or "validation".
"""
save_to_dir = self.save_to_dir or save_to_dir
save_prefix = self.save_prefix or save_prefix
save_format = self.save_format or save_format
batch_size = self.batch_size or batch_size
if save_to_dir:
os.makedirs(save_to_dir, exist_ok=True)
return ImageDataGenerator.flow(
self,
x=x,
y=y,
batch_size=batch_size,
shuffle=shuffle,
sample_weight=sample_weight,
seed=seed,
save_to_dir=save_to_dir,
save_prefix=save_prefix,
save_format=save_format,
subset=subset
)
@property
def default_transform(self) -> dict:
"""Return default transformation parameters"""
return copy.deepcopy({
'theta': 0,
'tx': 0,
'ty': 0,
'shear': 0,
'zx' : 1,
'zy' : 1,
'flip_horizontal': False,
'flip_vertical': False,
'channel_shift_intensity': None,
'brightness': None,
'contrast' : None,
'noise': None
})
def _apply_noise(self, x, noise):
if noise == "gauss":
row, col, ch = x.shape
mean = 0
var = 0.1
sigma = var**0.5
gauss = np.random.normal(mean, sigma, (row,col,ch))
gauss = gauss.reshape(row,col,ch)
x = x + gauss
elif noise == "s&p":
row, col, ch = x.shape
s_vs_p = 0.5
amount = 0.004
out = np.copy(x)
#if grayscale, ignore last dimension
shape = x.shape
if shape[-1] == 1: shape = shape[:-1]
# Salt mode
num_salt = np.ceil(amount * x.size * s_vs_p)
coords = [np.random.randint(0, i - 1, int(num_salt)) for i in shape]
out[tuple(coords)] = 255
# Pepper mode
num_pepper = np.ceil(amount* x.size * (1. - s_vs_p))
coords = [np.random.randint(0, i - 1, int(num_pepper)) for i in shape]
out[tuple(coords)] = 0
x = out
elif noise == "poisson":
vals = len(np.unique(x))
vals = 2 ** np.ceil(np.log2(vals))
x = np.random.poisson(x * vals) / float(vals)
elif noise =="speckle":
row, col, ch = x.shape
gauss = np.random.randn(row,col,ch)
gauss = gauss.reshape(row,col,ch)
x = x + x * gauss
return x
def _remove_preprocessing_function_arg(kwargs):
if 'preprocessing_function' in kwargs:
del kwargs['preprocessing_function']
return kwargs