[docs]classProcessPool:"""Parallel Processing Pool This allows for executing a Python function across multiple CPU cores. The function executes in separate processes and thus is not limited by the Global Interpreter Lock (GIL) which can slow down parallel processing when using Python threads. There are three ways of using this utility. .. highlight:: python .. code-block:: python # Assume we have the processing function at the root of a Python module: def my_processing_func(x): y = x * x return y # And we instantiate the pool as pool = ProcessPool(my_processing_func, 0.5) # ---------------------------- # Callback Driven # # Add all of the processing data to the pool and invoke a callback for each result results = [] def _processing_callback(y): results.append(y) # Process each entry in a subprocess # and call the callback when the results is available for i in range(1000): pool(i, pool_callback=_processing_callback) # Main thread waits for the results to complete while len(results) != 1000: time.sleep(1) # ---------------------------- # Batch Driven # # Process the data in batches and wait for all of the results # Create a batch batch = pool.create_batch(1000) # Process each entry in a subprocess # Internally the given batch will be populated with the results for i in range(1000): pool(i, pool_batch=batch) # Main thread waits for the results to complete results = batch.wait() # ---------------------------- # Wait for each result # # Process the data and wait for the result # NOTE: In this mode, the pool should be invoked from separate threads to see a speedup # from the parallel processing results = [] for i in range(1000): y = pool(i) results.append(y) See the source code on Github: `mltk/utils/process_pool <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/process_pool>`_ Args: entry_point: The Python function to execute in separate subprocesses. This function must exist at the root of its module (i.e. it must not be nested or a class method) Typically the function will accept one or more arguments, process the data, and return a result n_jobs: The number of subprocesses to use for processing. Typically more jobs means faster processing at the expends of more RAM/CPU usage - A float value between (0, 1.0], which specifies the percentage of the available CPUs - Integer specifying the exact number of CPUs. This will automatically clamp to the maximum CPUs in the system if necessary - If `-1` then use all available CPUs name: The name to prefix to the python threads used to monitor the subprocesses start: Automatically start the subprocesses. If false, then the start() must be called before processing debug: If true then the entry_point is executed in a single thread instead of multiple subprocesses. This reduces complexity which can aid debugging env: The OS environment variables to export in the subprocesses disable_gpu_in_subprocesses: Disables NVidia GPU usage in the subprocesses. This is necessary if the Tensorflow python package is imported in the entry_point's module logger: Optional Python logger """
[docs]def__init__(self,entry_point:Callable,n_jobs:Union[int,float]=-1,name='ProcessPool',start=True,debug=False,env:Dict[str,str]=None,disable_gpu_in_subprocesses=True,logger:logging.Logger=None):ifos.environ.get('MLTK_PROCESS_POOL_SUBPROCESS',''):returnself.logger=loggerorlogging.getLogger(name)self._n_jobs=1ifdebugelsecalculate_n_jobs(n_jobs)self._name=nameself._entry_point=entry_pointself._running_event=threading.Event()self._ready_q=queue.Queue(maxsize=self._n_jobs)self._processes:List[_Subprocess]=[]self._debug=debugself._env=copy.deepcopy(env)ifenvelse{}self._lock=threading.Lock()self._detected_pthread_error=Falseself._detected_subprocess_error:str=Noneself.logger.info(f'{self.name} is using {self.n_jobs} subprocesses')ifdisable_gpu_in_subprocesses:self._env['CUDA_VISIBLE_DEVICES']='-1'self._env['TF_CPP_MIN_LOG_LEVEL']='3'# Limit the number of thread spun up in the subprocesses# as they can quickly deplete the system resources.# NOTE: If any of these variables are defined in the env arg or global env# then that value will be used insteadMAX_NUM_THREADS_VARS=['OMP_NUM_THREADS','OPENBLAS_NUM_THREADS','MKL_NUM_THREADS','BLIS_NUM_THREADS','VECLIB_MAXIMUM_THREADS','NUMBA_NUM_THREADS','NUMEXPR_NUM_THREADS',]forkeyinMAX_NUM_THREADS_VARS:self._env[key]=self._env.get(key,os.environ.get(key,'1'))atexit.register(self.shutdown)ifstart:self.start()
@propertydefn_jobs(self)->int:"""The number of subprocesses used"""returnself._n_jobs@propertydefname(self)->str:"""The name of this processing pool instance"""returnself._name@propertydefis_running(self)->bool:"""Returns true if the processing pool is actively running"""returnself._running_event.is_set()@propertydefdetected_subprocess_error(self)->str:withself._lock:returnself._detected_subprocess_error@detected_subprocess_error.setterdefdetected_subprocess_error(self,v:str):withself._lock:ifself._detected_subprocess_errorisNone:self._detected_subprocess_error=v
[docs]defstart(self):"""Start the processing pool subprocesses"""ifself.is_running:raiseRuntimeError(f'Process pool: {self.name} already running')self._running_event.set()foriinrange(self._n_jobs):subprocess=_Subprocess(name=f'{self._name}-{i}',pool=self,entry_point=self._entry_point,debug=self._debug,env=self._env,logger=self.logger)self._processes.append(subprocess)self._ready_q.put(subprocess)
[docs]defshutdown(self):"""Shutdown the processing pool subprocesses immediately"""ifself.is_running:self._running_event.clear()forsubprocessinself._processes:subprocess.shutdown()ifself._detected_pthread_error:self.logger.warning('\n***\nYour system may be running low on resources.\n''Trying reducing the n_jobs (or cores) used for parallel processing.\n***\n\n')
[docs]defcreate_batch(self,size:int)->ProcessPoolBatch:"""Create a processing pool batch. See this class's docs for more info """returnProcessPoolBatch(pool=self,size=size)
[docs]defprocess(self,*args,pool_callback:Callable=None,pool_batch:ProcessPoolBatch=None,**kwargs)->Union[ProcessPoolBatch,object]:"""Process the given args, kwargs in the next available subprocess See this class's docs for more info """ifnotself.is_running:raiseRuntimeError(f'ProcessPool: {self.name} not started')batch=pool_batchorProcessPoolBatch(self,pool_callback=pool_callback)whileTrue:ifnotself.is_running:raiseRuntimeError(f'ProcessPool: {self.name} shutdown')try:subprocess:_Subprocess=self._ready_q.get(block=True,timeout=0.100)breakexceptqueue.Empty:continuesubprocess.invoke(args,kwargs,batch=batch)ifpool_callbackisNoneandpool_batchisNone:results=batch.wait()ifnotself.is_running:raiseRuntimeError(f'ProcessPool: {self.name} shutdown')returnresultsreturnbatch
classProcessPoolBatch:""""Processing Pool Batch This is used to store the results of a processing batch of data. """def__init__(self,pool:ProcessPool,pool_callback:Callable=None,size:int=0):self.pool=poolself.pool_callback=pool_callbackself._condition=threading.Condition()self._size=sizeself._return_scalar=size==0self._remaining=max(1,size)self._next_index_count=0self._results=[Nonefor_inrange(self._remaining)]@propertydefsize(self)->int:"""The size of the batch"""returnself._sizedefwait(self)->Union[object,List[object]]:"""Wait for all of the data in the processing batch to complete and return the results. Returns: Either a single object if size=0 or a list of objects if size>0 """withself._condition:whileself._remaining>0:ifnotself.pool.is_running:returnNoneself._condition.wait()returnself._results[0]ifself._return_scalarelseself._resultsdef_next_index(self)->int:withself._condition:ifself._remaining<=0:raiseRuntimeError('Batch overflow')retval=self._next_index_countself._next_index_count+=1returnretvaldef_add_results(self,index:int,results):ifself.pool_callbackisnotNone:ifindex!=-1:self.pool_callback(results)returnwithself._condition:ifindex!=-1:self._results[index]=resultsself._remaining-=1self._condition.notify()class_Subprocess(threading.Thread):def__init__(self,name:str,pool:ProcessPool,entry_point:Callable,debug:bool,env:Dict[str,str],logger:logging):threading.Thread.__init__(self,name=name,daemon=True,target=self._process_thread_loop)self.pool=poolself.logger=loggerself._entry_point=entry_pointself._invoke_sem=threading.Semaphore(value=0)self._invoke_args=Noneself._invoke_kwargs=Noneself._invoke_batch:ProcessPoolBatch=Noneself._invoke_batch_index:int=-1self._shutdown_event=threading.Event()ifnotdebug:curdir=os.path.dirname(os.path.abspath(__file__)).replace('\\','/')subprocess_main_path=f'{curdir}/_subprocess_main.py'os_env=os.environ.copy()os_env.update(dict(MODULE_PATH=entry_point.__code__.co_filename,FUNCTION_NAME=entry_point.__name__,PROCESS_POOL_NAME=name,))os_env.update(env)self._subprocess=subprocess.Popen([sys.executable,'-u',subprocess_main_path],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=False,env=os_env,close_fds=True,bufsize=-1,)self._monitor_thread=threading.Thread(name=f'{name}-monitor',target=self._monitor_process_logs,daemon=True)self._monitor_thread.start()else:self._subprocess=Noneself._process_thread_loop_unsafe=self._debug_thread_loop_unsafeself.start()definvoke(self,args,kwargs,batch:ProcessPoolBatch):assertself._invoke_argsisNone,'Bad processing Q state'self._invoke_args=argsself._invoke_kwargs=kwargsself._invoke_batch=batchself._invoke_batch_index=batch._next_index()self._invoke_sem.release()def_process_thread_loop(self):try:self._process_thread_loop_unsafe()exceptKeyboardInterrupt:passexceptOSError:passexceptExceptionase:ifself.pool.is_running:self.logger.error(f'{self.name}: {e}',exc_info=e)finally:self.pool.shutdown()def_process_thread_loop_unsafe(self):whileTrue:self._invoke_sem.acquire()retcode=self._subprocess.poll()ifretcodeisnotNone:ifretcode!=0:raiseRuntimeError(f'{self.name} terminated with error code: {retcode}')returnwrite_data(self._subprocess.stdin,self._invoke_args,self._invoke_kwargs,)self._invoke_args=Noneself._invoke_kwargs=Noneresult,_=read_data(self._subprocess.stdout)# If the subprocess failed to return dataiflen(result)==0:# Wait a moment for the subprocess to completeself._subprocess.wait(1)# Poll the subprocesses exit coderetcode=self._subprocess.poll()# If an exit code was returnedifretcodeisnotNone:# If the retcode is non-zero then the subprocess failed# So throw an exceptionifretcode!=0:raiseRuntimeError(f'{self.name} terminated with error code: {retcode}')# Otherwise just return as the subprocess is being gracefully terminatedreturn# Otherwise the subprocess is not properly generating data# So throw an exceptionraiseRuntimeError(f'{self.name} did not return a result')ifisinstance(result,(tuple,list))andlen(result)==1:result=result[0]self._invoke_batch._add_results(self._invoke_batch_index,result)self._invoke_batch=Noneself._invoke_batch_index=-1self.pool._ready_q.put(self)def_debug_thread_loop_unsafe(self):whileTrue:self._invoke_sem.acquire()ifself._shutdown_event.is_set():breakresult=self._entry_point(*self._invoke_args,**self._invoke_kwargs)result=resultor(None,)self._invoke_args=Noneself._invoke_kwargs=Noneiflen(result)==0:raiseRuntimeError(f'{self.name} did not return a result')ifisinstance(result,(tuple,list))andlen(result)==1:result=result[0]self._invoke_batch._add_results(self._invoke_batch_index,result)self._invoke_batch=Noneself._invoke_batch_index=-1self.pool._ready_q.put(self)defshutdown(self):try:# First try to gracefully terminate the subprocess (e.g. issue Ctrl+C)# and wait a moment for it to completeself._subprocess.terminate()self._subprocess.wait(1)except:passtry:# Close the data input pipeself._subprocess.stdin.close()except:passtry:# Close the data output pipeself._subprocess.stdout.close()except:passtry:# Wait a moment for the log monitor thread to completeself._monitor_thread.join(3)except:passtry:# Close the log pipeself._subprocess.stderr.close()except:passtry:# Forcefully kill the subprocess if necessaryself._subprocess.kill()except:passself._shutdown_event.set()self._invoke_sem.release()batch=self._invoke_batchself._invoke_batch=NoneifbatchisnotNone:batch._add_results(-1,None)def_monitor_process_logs(self):try:whileTrue:line=self._subprocess.stderr.readline()ifnotline:returnifnotself.pool.detected_subprocess_errorandself._subprocess.poll():self.pool.detected_subprocess_error=self.namedetected_subprocess_error=self.pool.detected_subprocess_errorif(self.pool.is_runninganddetected_subprocess_errorisNone)ordetected_subprocess_error==self.name:line=line.decode('utf-8').rstrip()ifnotself.pool._detected_pthread_errorand'pthread_create() failed'inline:self.pool._detected_pthread_error=Trueif'Traceback'inline:self.pool.detected_subprocess_error=self.nameself.logger.info(f'{self.name}: {line}')except:passdefcalculate_n_jobs(n_jobs:Union[float,int]=-1)->int:"""Calculate the number of subprocesses to use for the processing pool Args: n_jobs: This should be one of: - A float value between (0, 1.0], which specifies the percentage of the available CPUs - Integer specifying the exact number of CPUs. This will automatically clamp to the maximum CPUs in the system if necessary Returns: The calculated number of jobs to use for subprocessing """n_cpus=get_cpu_count()ifn_jobs==-1:n_jobs=n_cpuselifisinstance(n_jobs,float):ifn_jobs<0orn_jobs>1:raiseValueError('Must either be an integer or a float in the range (0, 1.0]')n_jobs=round(n_cpus*n_jobs)returnmin(max(n_jobs,1),n_cpus)
Important: We use cookies only for functional and traffic analytics.
We DO NOT use cookies for any marketing purposes. By using our site you acknowledge you have read and understood our Cookie Policy.