mltk.utils.process_pool.ProcessPool

class ProcessPool[source]

Parallel Processing Pool

This allows for executing a Python function across multiple CPU cores. The function executes in separate processes and thus is not limited by the Global Interpreter Lock (GIL) which can slow down parallel processing when using Python threads.

There are three ways of using this utility.

# Assume we have the processing function at the root of a Python module:
def my_processing_func(x):
    y = x * x
    return y

# And we instantiate the pool as
pool = ProcessPool(my_processing_func, 0.5)

# ----------------------------
# Callback Driven
#
# Add all of the processing data to the pool and invoke a callback for each result

results = []
def _processing_callback(y):
    results.append(y)

# Process each entry in a subprocess
# and call the callback when the results is available
for i in range(1000):
    pool(i, pool_callback=_processing_callback)

# Main thread waits for the results to complete
while len(results) != 1000:
    time.sleep(1)

# ----------------------------
# Batch Driven
#
# Process the data in batches and wait for all of the results

# Create a batch
batch = pool.create_batch(1000)

# Process each entry in a subprocess
# Internally the given batch will be populated with the results
for i in range(1000):
    pool(i, pool_batch=batch)

# Main thread waits for the results to complete
results = batch.wait()

# ----------------------------
# Wait for each result
#
# Process the data and wait for the result
# NOTE: In this mode, the pool should be invoked from separate threads to see a speedup
# from the parallel processing

results = []
for i in range(1000):
    y = pool(i)
    results.append(y)

See the source code on Github: mltk/utils/process_pool

Parameters:
  • entry_point (Callable) – The Python function to execute in separate subprocesses. This function must exist at the root of its module (i.e. it must not be nested or a class method) Typically the function will accept one or more arguments, process the data, and return a result

  • n_jobs (Union[int, float]) – The number of subprocesses to use for processing. Typically more jobs means faster processing at the expends of more RAM/CPU usage - A float value between (0, 1.0], which specifies the percentage of the available CPUs - Integer specifying the exact number of CPUs. This will automatically clamp to the maximum CPUs in the system if necessary - If -1 then use all available CPUs

  • name – The name to prefix to the python threads used to monitor the subprocesses

  • start – Automatically start the subprocesses. If false, then the start() must be called before processing

  • debug – If true then the entry_point is executed in a single thread instead of multiple subprocesses. This reduces complexity which can aid debugging

  • env (Dict[str, str]) – The OS environment variables to export in the subprocesses

  • disable_gpu_in_subprocesses – Disables NVidia GPU usage in the subprocesses. This is necessary if the Tensorflow python package is imported in the entry_point’s module

  • logger (Logger) – Optional Python logger

Properties

detected_subprocess_error

is_running

Returns true if the processing pool is actively running

n_jobs

The number of subprocesses used

name

The name of this processing pool instance

Methods

__init__

create_batch

Create a processing pool batch.

process

Process the given args, kwargs in the next available subprocess See this class's docs for more info

shutdown

Shutdown the processing pool subprocesses immediately

start

Start the processing pool subprocesses

__init__(entry_point, n_jobs=-1, name='ProcessPool', start=True, debug=False, env=None, disable_gpu_in_subprocesses=True, logger=None)[source]
Parameters:
  • entry_point (Callable) –

  • n_jobs (Union[int, float]) –

  • env (Dict[str, str]) –

  • logger (Logger) –

property n_jobs: int

The number of subprocesses used

Return type:

int

property name: str

The name of this processing pool instance

Return type:

str

property is_running: bool

Returns true if the processing pool is actively running

Return type:

bool

start()[source]

Start the processing pool subprocesses

shutdown()[source]

Shutdown the processing pool subprocesses immediately

create_batch(size)[source]

Create a processing pool batch. See this class’s docs for more info

Return type:

ProcessPoolBatch

Parameters:

size (int) –

process(*args, pool_callback=None, pool_batch=None, **kwargs)[source]

Process the given args, kwargs in the next available subprocess See this class’s docs for more info

Return type:

Union[ProcessPoolBatch, object]

Parameters:
  • pool_callback (Callable) –

  • pool_batch (ProcessPoolBatch) –