mltk.utils.process_pool.ProcessPool¶
- class ProcessPool[source]¶
Parallel Processing Pool
This allows for executing a Python function across multiple CPU cores. The function executes in separate processes and thus is not limited by the Global Interpreter Lock (GIL) which can slow down parallel processing when using Python threads.
There are three ways of using this utility.
# Assume we have the processing function at the root of a Python module: def my_processing_func(x): y = x * x return y # And we instantiate the pool as pool = ProcessPool(my_processing_func, 0.5) # ---------------------------- # Callback Driven # # Add all of the processing data to the pool and invoke a callback for each result results = [] def _processing_callback(y): results.append(y) # Process each entry in a subprocess # and call the callback when the results is available for i in range(1000): pool(i, pool_callback=_processing_callback) # Main thread waits for the results to complete while len(results) != 1000: time.sleep(1) # ---------------------------- # Batch Driven # # Process the data in batches and wait for all of the results # Create a batch batch = pool.create_batch(1000) # Process each entry in a subprocess # Internally the given batch will be populated with the results for i in range(1000): pool(i, pool_batch=batch) # Main thread waits for the results to complete results = batch.wait() # ---------------------------- # Wait for each result # # Process the data and wait for the result # NOTE: In this mode, the pool should be invoked from separate threads to see a speedup # from the parallel processing results = [] for i in range(1000): y = pool(i) results.append(y)
See the source code on Github: mltk/utils/process_pool
- Parameters:
entry_point (
Callable
) – The Python function to execute in separate subprocesses. This function must exist at the root of its module (i.e. it must not be nested or a class method) Typically the function will accept one or more arguments, process the data, and return a resultn_jobs (
Union
[int
,float
]) – The number of subprocesses to use for processing. Typically more jobs means faster processing at the expends of more RAM/CPU usage - A float value between (0, 1.0], which specifies the percentage of the available CPUs - Integer specifying the exact number of CPUs. This will automatically clamp to the maximum CPUs in the system if necessary - If -1 then use all available CPUsname – The name to prefix to the python threads used to monitor the subprocesses
start – Automatically start the subprocesses. If false, then the start() must be called before processing
debug – If true then the entry_point is executed in a single thread instead of multiple subprocesses. This reduces complexity which can aid debugging
env (
Dict
[str
,str
]) – The OS environment variables to export in the subprocessesdisable_gpu_in_subprocesses – Disables NVidia GPU usage in the subprocesses. This is necessary if the Tensorflow python package is imported in the entry_point’s module
logger (
Logger
) – Optional Python logger
Properties
detected_subprocess_error
Returns true if the processing pool is actively running
The number of subprocesses used
The name of this processing pool instance
Methods
Create a processing pool batch.
Process the given args, kwargs in the next available subprocess See this class's docs for more info
Shutdown the processing pool subprocesses immediately
Start the processing pool subprocesses
- __init__(entry_point, n_jobs=-1, name='ProcessPool', start=True, debug=False, env=None, disable_gpu_in_subprocesses=True, logger=None)[source]¶
- Parameters:
entry_point (Callable) –
n_jobs (Union[int, float]) –
env (Dict[str, str]) –
logger (Logger) –
- property n_jobs: int¶
The number of subprocesses used
- Return type:
int
- property name: str¶
The name of this processing pool instance
- Return type:
str
- property is_running: bool¶
Returns true if the processing pool is actively running
- Return type:
bool