"""Common Python utilitiesSee the source code on Github: `mltk/utils/python.py <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/python.py>`_"""importcollectionsimportsysimportosimportreimportjsonimportloggingimportimportlibimportinspectimporttimeimportcopyfromenumimportEnumimportwarningsfromtypingimportIterable,Any,UnionfrommltkimportMLTK_ROOT_DIRfrom.loggerimportDummyLogger,make_filelikefrom.shell_cmdimportrun_shell_cmdfrom.pathimportfullpathSHORT_VERSION='.'.join([str(x)forxinsys.version_info[:2]])"""The Python version as <major>.<minor>e.g.: 3.9"""def_defaultdict_not_found():returnNone
[docs]defDefaultDict(d:dict=None,**kwargs)->collections.defaultdict:"""Creates a directory that returns None if a key does not exist NOTE: Nested dictionaries are also updated to a defaultdict """def_convert_to_default_dict(obj):ifisinstance(obj,dict):obj=DefaultDict(obj)elifisinstance(obj,list):fori,xinenumerate(obj):obj[i]=_convert_to_default_dict(x)returnobjifdisnotNone:kwargs.update(d)forkey,valueinkwargs.items():kwargs[key]=_convert_to_default_dict(value)returncollections.defaultdict(_defaultdict_not_found,kwargs)
[docs]classDictObject(dict):"""Standard Python dictionary that allows for accessing entries as object properties, e.g.: my_dict_obj = DictObject({'foo': 1, 'bar': False}) # Both lines do the same thing foo = my_dict_obj.foo foo = my_dict_obj['foo'] my_dict_obj.bar = True my_dict_obj['bar'] = True """
[docs]defmerge_dict(destination:dict,source:dict,copy_destination=False)->dict:"""Recursively merge the source dictionary into the destination and return the destination """ifcopy_destination:destination=copy.deepcopy(destination)forkey,valueinsource.items():ifisinstance(value,dict):# get node or create onenode=destination.setdefault(key,{})merge_dict(node,value)else:destination[key]=valuereturndestination
[docs]defas_list(obj:Any,split:str=None)->list:"""Convert the given object to a list - If obj is None, then return empty list - If obj is a string, If the `split` argument is given then return obj.split(split) else just wrap the string in a list """ifobjisNone:return[]elifisinstance(obj,list):returnobjelifisinstance(obj,str):return[obj]ifnotsplitelse[x.strip()forxinobj.split(split)]elifisinstance(obj,collections.abc.Iterable):return[xforxinobj]else:return[obj]
[docs]defflatten_list(l:Iterable)->list:"""Flatten the given iterable object to a list"""retval=[]forxinl:try:iter(x)exceptTypeError:is_iterable=Falseelse:is_iterable=Trueifis_iterable:retval.extend(flatten_list(x))else:retval.append(x)returnretval
[docs]deflist_rindex(lst:Iterable,value:Any)->int:"""Reverse find element index that is given value"""fori,vinenumerate(reversed(lst)):ifv==value:returnlen(lst)-i-1# return the index in the original listreturn-1
[docs]defcontains_class_type(l:Iterable,cls:Any)->bool:"""Return if the given list contains a class with the given type"""foreinl:ifisinstance(e,cls):returnTruereturnFalse
[docs]defget_case_insensitive(value:str,l:Iterable)->str:"""Get the given string with case-insensitive comparsion"""ifvalueisNone:returnNonevalue=value.lower()forvinl:ifv.lower()==value:returnvreturnNone
[docs]defis_true(arg)->bool:"""Return if the given argument is a True value"""ifisinstance(arg,str):returnarg.lower()in('yes','true','on','1')ifisinstance(arg,bool):returnargifisinstance(arg,int):returnarg!=0raiseValueError(f'Invalid boolean arg: {arg}')
[docs]defis_false(arg)->bool:"""Return if the given argument is a False value"""ifisinstance(arg,str):returnarg.lower()in('no','false','off','0')ifisinstance(arg,bool):returnargifisinstance(arg,int):returnarg==0raiseValueError(f'Invalid boolean arg: {arg}')
[docs]defforward_method_kwargs(**kwargs)->dict:"""Return all the keyword-arguments of a method, excluding the 'self' argument"""retval={}forkey,valueinkwargs.items():ifkey=='self'orkey.startswith('_'):continueelifkey=='kwargs':retval.update(value)else:retval[key]=valuereturnretval
[docs]defprepend_exception_msg(e:Exception,msg:str)->Exception:"""Prepend a message to the given exception"""e.args=(msg,*e.args)all_str=Trueforxine.args:try:str(x)except:all_str=Falsebreak# If every entry in the exception msg is a string# then make it look pretty by combining into a coma-separated stringifall_str:s=', '.join(str(x)forxine.args)e.args=(s,)returne
[docs]defappend_exception_msg(e:Exception,msg:str)->Exception:"""Append a message to the given exception"""e.args=(*e.args,msg)all_str=Trueforxine.args:try:str(x)except:all_str=Falsebreak# If every entry in the exception msg is a string# then make it look pretty by combining into a coma-separated stringifall_str:s=', '.join(str(x)forxine.args)e.args=(s,)returne
[docs]defdebugger_is_active()->bool:"""Return if the debugger is currently active"""gettrace=getattr(sys,'gettrace',lambda:None)returngettrace()isnotNone
[docs]defnotebook_is_active()->bool:"""Return if script is executing in a IPython notebook (e.g. Jupyter notebook)"""forxinsys.modules:ifx.lower()=='ipykernel':returnTruereturnFalse
[docs]definstall_pip_package(package:str,module_name:str=None,logger:logging.Logger=None,install_dir:str=None,upgrade=False,no_deps=False):"""Install the given pip package is necessary"""logger=loggerorDummyLogger()ifinstall_dir:install_dir=fullpath(install_dir)ifinstall_dirnotinsys.path:os.makedirs(install_dir,exist_ok=True)logger.info(f'Adding {install_dir} to sys.path')sys.path.append(install_dir)version_match=re.match(r'([\w\_\-]+)([=<>]).*',package)ifnotmodule_name:ifversion_match:module_name=package[:version_match.start(2)]else:module_name=package# Only try to import the module without running pip if no version is specified and upgrade=Falseifnotupgradeandnotversion_match:try:importlib.import_module(module_name)returnexcept:passmake_filelike(logger)cmd=[sys.executable,"-m","pip","install"]ifupgrade:cmd.append('-U')ifno_deps:cmd.append('--no-deps')cmd.append(package)ifinstall_dir:cmd.extend(['--target',install_dir])logger.warning(f'Running cmd: {" ".join(cmd)}\n(This may take awhile, please be patient ...)')retcode,retval=run_shell_cmd(cmd,outfile=logger)ifretcode!=0:raiseRuntimeError(f'Failed to install pip package: {package}, err:\n{retval}')
[docs]defimport_module_at_path(path:str,reload=False):"""Import the Python module at the given path and return the imported module """module_package=Nonepath=fullpath(path)mltk_root_path=fullpath(MLTK_ROOT_DIR)# If the path is within the mltk directory# Then generate the module path relative to the mltk packageifpath.startswith(f'{mltk_root_path}/'):mltk_rel_path=os.path.relpath(path,mltk_root_path).replace('\\','/')module_package=Nonemodule_name=mltk_rel_path.replace('.py','').replace('/','.')# Else if the path to an external .py script was providedelifpath.endswith('.py'):path_dir=os.path.dirname(path).replace('\\','/')module_name=os.path.basename(path).replace('.py','')ifos.path.exists(f'{path_dir}/__init__.py'):# Do a relative import if the module is in a parent packagemodule_name='.'+module_nameparent_dir=os.path.dirname(path_dir)module_package=os.path.basename(path_dir)ifparent_dirnotinsys.path:sys.path.insert(0,parent_dir)else:# Otherwise, do an absolute importifpath_dirnotinsys.path:sys.path.insert(0,path_dir)# Else the path to external Python directory was providedelse:ifnotos.path.exists(f'{path}/__init__.py'):raiseRuntimeError(f'Given path to directory: {path} does not contain a __init__.py file')parent_dir=os.path.dirname(path).replace('\\','/')module_name=os.path.basename(path)ifos.path.exists(f'{parent_dir}/__init__.py'):# Do a relative import if the module is in a parent packagemodule_name='.'+module_namemodule_package=os.path.basename(parent_dir)parent_dir=os.path.dirname(parent_dir).replace('\\','/')ifparent_dirnotinsys.path:sys.path.insert(0,parent_dir)else:# Otherwise, do an absolute importifparent_dirnotinsys.path:sys.path.insert(0,parent_dir)# If the module has already been imported# then reload it if necessaryifmodule_nameinsys.modules:ifreload:returnimportlib.reload(sys.modules[module_name])else:returnsys.modules[module_name]# Otherwise import the modulereturnimportlib.import_module(module_name,package=module_package)
[docs]defload_json_safe(path:str,*args,**kwargs)->object:"""Load a JSON file and ignoring any single-line, multi-line comments and trailing commas Args: path: Path to JSON file args, kwargs: Arguments to pass into json.loads Return: Loaded JSON object """RE_SINGLE_LINE_COMMENT=re.compile(r'("(?:(?=(\\?))\2.)*?")|(?:\/{2,}.*)')RE_MULTI_LINE_COMMENT=re.compile(r'("(?:(?=(\\?))\2.)*?")|(?:\/\*(?:(?!\*\/).)+\*\/)',flags=re.M|re.DOTALL)RE_TRAILING_COMMA=re.compile(r',(?=\s*?[\}\]])')withopen(path,'r')asfp:unfiltered_json_string=fp.read()filtered_json_string=RE_SINGLE_LINE_COMMENT.sub(r'\1',unfiltered_json_string)filtered_json_string=RE_MULTI_LINE_COMMENT.sub(r'\1',filtered_json_string)filtered_json_string=RE_TRAILING_COMMA.sub('',filtered_json_string)returnjson.loads(filtered_json_string,*args,**kwargs)
[docs]deffind_object_key_with_value(obj:object,needle:object,throw_exception=False)->str:"""Given an class or class instance, search the attribute values of the object for the given "needle" and return its corresponding key. Note: If a class if given then it must be instantiable using a default constructor. Args: obj: Class or class instance needle: Class attribute value to find in class instance throw_exception: If true, throw an exception if the needle is not found, return 'none' otherwise Return: Lowercase key of found attribute value or "none" if value is not found """ifinspect.isclass(obj)andnotissubclass(obj,Enum):obj=obj()forkeyindir(obj):ifgetattr(obj,key)==needle:returnkey.lower()ifthrow_exception:raiseValueError(f'{needle} not found in {obj}')return'none'
[docs]deffind_object_value_with_key(obj:object,needle:str,ignore_case=False,throw_exception=False):"""Given a class or class instance, search the attribute keys of the object for the given "needle" and return its corresponding value. NOTE: If a class if given then it must be instantiable using a default constructor (except of Enum classes). Args: obj: Class or class instance needle: Class attribute key to find in class instance ignore_case: Ignore the key's case if True throw_exception: If true, throw an exception if the needle is not found, return None otherwise Return: Value of found attribute key or None if key is not found """ifneedleisNone:returnNoneifinspect.isclass(obj)andnotissubclass(obj,Enum):obj=obj()ifignore_case:needle=needle.lower()forkeyindir(obj):ifignore_case:ifkey.lower()==needle:returngetattr(obj,key)else:ifkey==needle:returngetattr(obj,key)ifthrow_exception:raiseValueError(f'{needle} not found in {obj}')returnNone
[docs]deffind_object_value_with_key_or_value(obj:object,needle:Union[str,object],ignore_case=False,throw_exception=False):"""Given a class or class instance, search the attribute keys and values of the object for the given "needle" and return its corresponding value. NOTE: If a class if given then it must be instantiable using a default constructor (except of Enum classes). Args: obj: Class or class instance needle: Class attribute key or value to find in class instance ignore_case: Ignore the key's case if True (needle must be a string) throw_exception: If true, throw an exception if the needle is not found, return None otherwise Return: Value of found attribute key or None if key/value is not found """ifneedleisNone:returnNoneifinspect.isclass(obj)andnotissubclass(obj,Enum):obj=obj()needle_lower=Noneifignore_caseandisinstance(needle,str):needle_lower=needle.lower()forkeyindir(obj):value=getattr(obj,key)if(needle_lowerisNoneandkey==needle)or \
(needle_lowerisnotNoneandkey.lower()==needle_lower)or \
(value==needle):returnvalueifthrow_exception:raiseValueError(f'{needle} not found in {obj}')returnNone
[docs]deftimeit(method):"""Decorator to measure time it takes for method or function to execute"""deftimed(*args,**kw):ts=time.time()try:returnmethod(*args,**kw)finally:te=time.time()diff=(te-ts)*1000print(f'{method.__name__}{diff:4f}ms')returntimed
[docs]defset_absl_log_level(level):"""Set the absl.logging library log level and return previous level The absl.logging library, a google Python package, can be very verbose. This sets the log level and returns the previous log level Args: level: The new log level Returns: Previous log level """prev_level=Nonetry:importabsl.loggingprev_level=absl.logging.get_verbosity()absl.logging.set_verbosity(getattr(absl.logging,level))except:passreturnprev_level
[docs]defdisable_warnings(func,level='ERROR'):"""Function decorator hide warnings from the Python warnings library and absl.logging library while the decorated function executes"""def_wrapper(*args,**kwargs):prev_level=set_absl_log_level(level)try:withwarnings.catch_warnings():warnings.simplefilter('ignore')returnfunc(*args,**kwargs)finally:set_absl_log_level(prev_level)return_wrapper
Important: We use cookies only for functional and traffic analytics.
We DO NOT use cookies for any marketing purposes. By using our site you acknowledge you have read and understood our Cookie Policy.