Source code for mltk.utils.path

"""File path utilities

See the source code on Github: `mltk/utils/path.py <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/path.py>`_
"""
import os
import time
import re
import glob
import shutil
import tempfile
import datetime
import subprocess
import contextlib
from pathlib import Path
from typing import Callable, List, Union, Iterator, Tuple

from .system import get_username


# This is the base directory used for temporary files
# It includes the the current user's name in the path
# to separate temp files for different user's that use the same tempdir
TEMP_BASE_DIR = os.environ.get('MLTK_TEMP_DIR', f'{tempfile.gettempdir()}/{get_username()}/mltk')


[docs]def fullpath(path : str, cwd:str=None) -> str: """Return the full, normalized path of the given path""" orig_cwd = os.getcwd() if cwd: os.chdir(cwd) try: path = os.path.expandvars(path) path = os.path.expanduser(path) path = os.path.normpath(path) path = os.path.abspath(path) finally: os.chdir(orig_cwd) # Return the filepath as it actually exists on the FS # If the path doesn't exist, then just return the normalized path path = get_actual_path(path) or path return path.replace('\\', '/')
[docs]def get_actual_path(path: str): """Return the file path as it actually appears on the FS (including upper/lower case) Return None if the path doesn't exist """ try: dirs = path.split('\\') # disk letter test_name = [dirs[0].upper()] for d in dirs[1:]: test_name += ["%s[%s]" % (d[:-1], d[-1])] res = glob.glob('\\'.join(test_name)) if not res: #File not found return None return res[0] except: return None
[docs]def extension(path : str) -> str: """Return the extension of the given path""" idx = path.rfind('.') if idx == -1: return '' if idx == 0: return '' return path[idx+1:]
[docs]def has_filename(path: str) -> bool: """Return if the given path has a filename with an extension or is only a directory path""" if not path: return False path = fullpath(path) last_slash_index = path.rfind('/') ext_index = path[last_slash_index+1:].rfind('.') return ext_index > 0
[docs]def create_dir(path:str) -> str: """Create the given path's directories""" path = fullpath(path) if has_filename(path): path = os.path.dirname(path) if not path: return None os.makedirs(path, exist_ok=True) return path
[docs]def create_tempdir(subdir='') -> str: """Create a temporary directory as <temp dir>/<username>/mltk""" d = TEMP_BASE_DIR if subdir: subdir = subdir.replace('\\', '/') if not subdir.startswith('/'): d += f'/{subdir}' d = fullpath(d) os.makedirs(d, exist_ok=True) return d
[docs]def create_user_dir(suffix:str='', base_dir:str=None) -> str: """Create a user directory This creates the directory in one of the following base directories based on availability: - base_dir argument - OS environment variable: MLTK_CACHE_DIR - ~/.mltk - <user temp dir>/<username>/mltk Args: suffix: Optional suffix to append to the base directory base_dir: Optional base directory, default to MLTK_CACHE_DIR, ~/.mltk, or <user temp dir>/<user name>/mltk if omitted Returns: path to created directory """ if suffix: suffix = suffix.replace('\\', '/') if not suffix.startswith('/'): suffix = f'/{suffix}' is_read_only = os.environ.get('MLTK_READONLY') user_dir = base_dir if base_dir else os.environ.get('MLTK_CACHE_DIR', '~/.mltk') user_dir = fullpath(user_dir + suffix) # If the MLTK_READONLY environment variable is set # then don't check if the user_dir directory is writable if not is_read_only: try: # Try to create the directory in MLTK_CACHE_DIR or ~/.mltk # if we have permission os.makedirs(user_dir, exist_ok=True) if not os.access(user_dir, os.W_OK): raise Exception() except: # Otherwise just create in the temp directory user_dir = create_tempdir(suffix) return user_dir
[docs]def get_user_setting(name:str, default=None): """Return the value of a user setting if it exists User settings are defined in the file: - Environment variable: MLTK_USER_SETTINGS_PATH - OR <user home>/.mltk/user_settings.yaml User settings include: - model_paths: list of directories to search for MLTK models - commander: Simplicity Commander options device: Device code serial_number: Adapter serial number ip_address: Adapter IP address See `settings_file <https://siliconlabs.github.io/mltk/docs/other/settings_file.html>`_ """ user_settings_path = fullpath(os.environ.get('MLTK_USER_SETTINGS_PATH', '~/.mltk/user_settings.yaml')) if not os.path.exists(user_settings_path): return default try: # Import the YAML package here # in-case it's not installed yet import yaml with open(user_settings_path, 'r') as fp: user_settings = yaml.load(fp, Loader=yaml.SafeLoader) except Exception as e: if not '_printed_user_settings_error' in globals(): globals()['_printed_user_settings_error'] = True print(f'WARN: Failed to parse {user_settings_path}, err: {e}') return default if user_settings and name in user_settings: return user_settings[name] return default
[docs]def add_user_setting(name:str, value:object): """Add an entry to the user settings User settings are defined in the file: <user home>/.mltk/user_settings.yaml """ # Import the YAML package here # in-case it's not installed yet import yaml user_settings_paths = fullpath('~/.mltk/user_settings.yaml') if os.path.exists(user_settings_paths): with open(user_settings_paths, 'r') as fp: user_settings = yaml.load(fp, Loader=yaml.SafeLoader) else: user_settings = dict() user_settings[name] = value with open(user_settings_paths, 'w') as f: yaml.dump(user_settings, f, Dumper=yaml.SafeDumper)
[docs]def remove_directory(path:str): """Remove the directory at the given path This will remove non-empty directories and retry a few times if necessary """ if not path: return def _remove_dir(d): if os.name == 'nt': subprocess.check_output(['cmd', '/C', 'rmdir', '/S', '/Q', os.path.abspath(d)]) else: subprocess.check_output(['rm', '-rf', os.path.abspath(d)]) retries = 5 while retries > 0 and os.path.exists(path): try: _remove_dir(path) except: pass finally: time.sleep(.001) retries -= 1
[docs]def clean_directory(path:str): """Remove all files within directory and subdirectories """ for root, _, files in os.walk(path): for fn in files: p = f'{root}/{fn}' for _ in range(3): try: os.remove(p) break except: time.sleep(.001)
[docs]def copy_directory(src, dst, exclude_dirs=None): """Recursively copy a directory. Only copy files that are new or out-dated""" if exclude_dirs: for x in exclude_dirs: if src.replace('\\', '/').startswith(x.replace('\\', '/')): return if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): copy_directory(s, d, exclude_dirs=exclude_dirs) else: if not os.path.exists(d) or os.stat(s).st_mtime - os.stat(d).st_mtime > 1: shutil.copy2(s, d)
[docs]def set_file_last_modified(file_path: str, dt: datetime.datetime = None): if dt is None: dt = datetime.datetime.fromisoformat('2000-01-01') dt_epoch = int(dt.timestamp()) os.utime(file_path, (dt_epoch, dt_epoch))
[docs]def file_is_newer(source_path: str, other_path: str): if not os.path.exists(source_path) or not os.path.exists(other_path): return False return os.stat(source_path).st_mtime > os.stat(other_path).st_mtime
[docs]def file_is_in_use(file_path:str) -> bool: """Return if the file is currently opened""" path = Path(file_path) if not path.exists(): raise FileNotFoundError try: path.rename(path) except PermissionError: return True else: return False
[docs]def recursive_listdir( base_dir:str, followlinks=True, regex:Union[str,re.Pattern,Callable[[str],bool]]=None, return_relative_paths:bool=False ) -> List[str]: """Return list of all files recursively found in base_dir Args: base_dir: The base directory to recursively search followlinks: IF true then follow symbolic links regex: Optional regex of file paths to INCLUDE in the returned list This can either be a string, re.Pattern, or a callback function If return_relative_paths=False then the tested path is the absolute path with forward slashes If return_relative_paths=True then the tested path is the path relative to the base_dir with forward slashes If a callback function is given, if the function returns True then the path is INCLUDED, else it is excluded return_relative_paths: If true then return paths relative to the base_dir, else return absolute paths Returns: List of file paths with forward slashes for directory delimiters """ base_dir = fullpath(base_dir) if regex is not None: if isinstance(regex, str): regex = re.compile(regex) regex_func = regex.match elif isinstance(regex, re.Pattern): regex_func = regex.match else: regex_func = regex else: regex_func = lambda p: True retval = [] for root, _, files in os.walk(base_dir, followlinks=followlinks): for fn in files: p = os.path.join(root, fn).replace('\\', '/') if return_relative_paths: p = os.path.relpath(p, base_dir).replace('\\', '/') if not regex_func(p): continue retval.append(p) return retval
[docs]def walk_with_depth( base_dir:str, depth=1, followlinks=True, ) -> Iterator[Tuple[str, List[str], List[str]]]: """Walk a directory with a max depth. This is similar to os.walk except it has an optional maximum directory depth that it will walk """ base_dir = base_dir.rstrip(os.path.sep) assert os.path.isdir(base_dir) num_sep = base_dir.count(os.path.sep) for root, dirs, files in os.walk(base_dir, followlinks=followlinks): yield root, dirs, files num_sep_this = root.count(os.path.sep) if num_sep + depth <= num_sep_this: del dirs[:]
[docs]@contextlib.contextmanager def pushd(new_dir:str): """Change to the given directory, execute, and return to the previous directory Example: print(f'Old path: {os.getcwd()}') with pushd('some/path') print(f'New path: {os.getcwd()}') print(f'Old path: {os.getcwd()}') """ old_dir = os.getcwd() os.chdir(new_dir) try: yield finally: os.chdir(old_dir)