Skip to content

Core Module

The core module provides essential utilities for file handling, timing, and data comparison.

File Handling

Main class for handling file operations across different formats.

Source code in suthing/file_handle.py
class FileHandle:
    """Main class for handling file operations across different formats."""

    @classmethod
    def _find_mode(cls, lemma: str):
        """Determine file type from file extension.

        Args:
            lemma: File extension string

        Returns:
            FileType enum value corresponding to the extension
        """
        if lemma in [".yml", ".yaml"]:
            return FileType.YAML
        elif lemma == ".json":
            return FileType.JSON
        elif lemma == ".jsonld":
            return FileType.JSONLD
        elif lemma in [".pkl", ".pickle"]:
            return FileType.PICKLE
        elif lemma in [".csv"]:
            return FileType.CSV
        elif lemma in [".env"]:
            return FileType.ENV
        else:
            return FileType.TXT

    @classmethod
    def _dump_pointer(cls, item, p, how: FileType, bytes_: bool = True) -> None:
        """Write data to file pointer in specified format.

        Args:
            item: Data to write
            p: File pointer
            how: FileType indicating format to write in
            bytes_: Whether to write in bytes mode
        """
        if how == FileType.PICKLE:
            pickle.dump(item, p, pickle.HIGHEST_PROTOCOL)
        elif how == FileType.YAML:
            yc = yaml.dump(item)
            if bytes_:
                yc = yc.encode("utf-8")  # type: ignore
            p.write(yc)
        elif how == FileType.JSON:
            jc = json.dumps(item, indent=2) + "\n"
            if bytes_:
                jc = jc.encode("utf-8")  # type: ignore
            p.write(jc)
        elif how == FileType.JSONLD:
            for subitem in item:
                jc = json.dumps(subitem) + "\n"
                if bytes_:
                    jc = jc.encode("utf-8")  # type: ignore
                p.write(jc)
        elif how == FileType.CSV and (
            isinstance(item, pd.DataFrame) or isinstance(item, pd.Series)
        ):
            r = item.to_csv()
            if bytes_:
                r = r.encode("utf-8")  # type: ignore
            p.write(r)
        elif how == FileType.TXT:
            p.write(str(item))

    @classmethod
    def _open_pointer(cls, p: io.BytesIO | gzip.GzipFile, how: FileType, **kwargs):
        """Read data from file pointer in specified format.

        Args:
            p: File pointer
            how: FileType indicating format to read
            **kwargs: Additional arguments passed to readers

        Returns:
            Data read from file in appropriate format

        Raises:
            ValueError: If trying to read gzipped env files
        """
        if how == FileType.PICKLE:
            r = pickle.load(p)
        elif how == FileType.YAML:
            r = yaml.load(p, Loader=yaml.FullLoader)
        elif how == FileType.JSON:
            r = json.load(p)
        elif how == FileType.JSONLD:
            r = [json.loads(s.decode()) for s in p.readlines()]
        elif how == FileType.CSV:
            r = pd.read_csv(p, **kwargs)  # type: ignore[arg-type]
        elif how == FileType.TXT:
            r = p.read().decode()
        elif how == FileType.ENV:
            if isinstance(p, io.BytesIO):
                config = io.StringIO(p.getvalue().decode("UTF-8"))
                r = load_dotenv(stream=config)
            else:
                raise ValueError("Will not read gzipped env files")
        else:
            r = dict()
        return r

    @classmethod
    def load(
        cls,
        ppath: str | pathlib.Path | None = None,
        pname: str | None = None,
        how: FileType = FileType.YAML,
        **kwargs,
    ):
        """

        :param ppath:
        :param pname:
        :param how:
        :param kwargs:
        :return:
        """

        compression = kwargs.pop("compression", None)
        fpath: str | pathlib.Path | None = kwargs.pop("fpath", None)

        # assume loading from a package
        if pname is not None:
            lemmas = suffixes(pname)
            if lemmas[-1] == ".gz":
                compression = "gz"
                how_ = cls._find_mode(lemmas[-2])
            else:
                how_ = cls._find_mode(lemmas[-1])
            if how_:
                how = how_
            if ppath is not None and isinstance(ppath, str):
                bytes_ = pkgutil.get_data(ppath, pname)
            else:
                raise ValueError(
                    "package name provided, package path (as a string) needed"
                )

        # interpret as filesystem load
        else:
            if fpath is None:
                if ppath is not None:
                    fpath = ppath
                else:
                    raise ValueError("either fpath or ppath should be provided")
            fpath = pathlib.Path(fpath).expanduser().as_posix()
            lemmas = suffixes(fpath)
            if lemmas[-1] == ".gz":
                compression = "gz"
                how_ = cls._find_mode(lemmas[-2])
            else:
                how_ = cls._find_mode(lemmas[-1])
            if how_:
                how = how_
            with open(fpath, "rb") as fp:
                bytes_ = fp.read()

        if bytes_ is None:
            raise ValueError("None received as Bytes")

        if compression == "gz":
            with gzip.GzipFile(fileobj=io.BytesIO(bytes_), mode="r") as p:
                r = cls._open_pointer(p, how, **kwargs)
        else:
            with io.BytesIO(bytes_) as p:
                r = cls._open_pointer(p, how, **kwargs)
        return r

    @classmethod
    def dump(cls, item, path: str | pathlib.Path, how: FileType = FileType.YAML):
        """

        :param item:
        :param path: if path ends with ".gz" the output will be gzip compressed
        :param how:
        :return:
        """

        lemmas = suffixes(path)
        path = pathlib.Path(path).expanduser().as_posix()
        if lemmas[-1] == ".gz":
            compression = "gz"
            how_ = cls._find_mode(lemmas[-2])
        else:
            compression = None
            how_ = cls._find_mode(lemmas[-1])
        if how_:
            how = how_
        if how == FileType.PICKLE:
            mode = "wb"
        else:
            mode = "w"
        if compression == "gz":
            if not path.endswith(".gz"):
                path += ".gz"
            with gzip.GzipFile(path, mode=mode) as p:
                cls._dump_pointer(item, p, how)
        else:
            with open(path, mode=mode) as p:
                cls._dump_pointer(item, p, how, bytes_=False)

dump(item, path, how=FileType.YAML) classmethod

:param item: :param path: if path ends with ".gz" the output will be gzip compressed :param how: :return:

Source code in suthing/file_handle.py
@classmethod
def dump(cls, item, path: str | pathlib.Path, how: FileType = FileType.YAML):
    """

    :param item:
    :param path: if path ends with ".gz" the output will be gzip compressed
    :param how:
    :return:
    """

    lemmas = suffixes(path)
    path = pathlib.Path(path).expanduser().as_posix()
    if lemmas[-1] == ".gz":
        compression = "gz"
        how_ = cls._find_mode(lemmas[-2])
    else:
        compression = None
        how_ = cls._find_mode(lemmas[-1])
    if how_:
        how = how_
    if how == FileType.PICKLE:
        mode = "wb"
    else:
        mode = "w"
    if compression == "gz":
        if not path.endswith(".gz"):
            path += ".gz"
        with gzip.GzipFile(path, mode=mode) as p:
            cls._dump_pointer(item, p, how)
    else:
        with open(path, mode=mode) as p:
            cls._dump_pointer(item, p, how, bytes_=False)

load(ppath=None, pname=None, how=FileType.YAML, **kwargs) classmethod

:param ppath: :param pname: :param how: :param kwargs: :return:

Source code in suthing/file_handle.py
@classmethod
def load(
    cls,
    ppath: str | pathlib.Path | None = None,
    pname: str | None = None,
    how: FileType = FileType.YAML,
    **kwargs,
):
    """

    :param ppath:
    :param pname:
    :param how:
    :param kwargs:
    :return:
    """

    compression = kwargs.pop("compression", None)
    fpath: str | pathlib.Path | None = kwargs.pop("fpath", None)

    # assume loading from a package
    if pname is not None:
        lemmas = suffixes(pname)
        if lemmas[-1] == ".gz":
            compression = "gz"
            how_ = cls._find_mode(lemmas[-2])
        else:
            how_ = cls._find_mode(lemmas[-1])
        if how_:
            how = how_
        if ppath is not None and isinstance(ppath, str):
            bytes_ = pkgutil.get_data(ppath, pname)
        else:
            raise ValueError(
                "package name provided, package path (as a string) needed"
            )

    # interpret as filesystem load
    else:
        if fpath is None:
            if ppath is not None:
                fpath = ppath
            else:
                raise ValueError("either fpath or ppath should be provided")
        fpath = pathlib.Path(fpath).expanduser().as_posix()
        lemmas = suffixes(fpath)
        if lemmas[-1] == ".gz":
            compression = "gz"
            how_ = cls._find_mode(lemmas[-2])
        else:
            how_ = cls._find_mode(lemmas[-1])
        if how_:
            how = how_
        with open(fpath, "rb") as fp:
            bytes_ = fp.read()

    if bytes_ is None:
        raise ValueError("None received as Bytes")

    if compression == "gz":
        with gzip.GzipFile(fileobj=io.BytesIO(bytes_), mode="r") as p:
            r = cls._open_pointer(p, how, **kwargs)
    else:
        with io.BytesIO(bytes_) as p:
            r = cls._open_pointer(p, how, **kwargs)
    return r

Timer

Context manager for timing code execution.

Source code in suthing/timer.py
class Timer:
    """Context manager for timing code execution."""

    def __init__(self):
        """Initialize timer with default settings."""
        self.timer = default_timer
        self.mins = 0
        self.secs = 0
        self.elapsed = 0

    def __enter__(self):
        """Start timing on context enter.

        Returns:
            self: Timer instance
        """
        self.start = self.timer()
        return self

    def __exit__(self, *args):
        """Stop timing on context exit and calculate elapsed time."""
        end = self.timer()
        self.elapsed = end - self.start
        self.mins = int(self.elapsed / seconds_per_minute)
        self.secs = int(self.elapsed - (self.mins * seconds_per_minute))

    @property
    def elapsed_str(self, digits: int = 2) -> str:
        """Get formatted string of elapsed time.

        Args:
            digits: Number of decimal places for seconds

        Returns:
            Formatted time string (e.g. "1 min 30.5 sec")
        """
        mins = int(self.elapsed / seconds_per_minute)
        secs = round(self.elapsed - (mins * seconds_per_minute), digits)
        r = f"{secs} sec"
        if mins > 0:
            r = f"{mins} min " + r
        return r

elapsed_str property

Get formatted string of elapsed time.

Parameters:

Name Type Description Default
digits

Number of decimal places for seconds

required

Returns:

Type Description
str

Formatted time string (e.g. "1 min 30.5 sec")

__enter__()

Start timing on context enter.

Returns:

Name Type Description
self

Timer instance

Source code in suthing/timer.py
def __enter__(self):
    """Start timing on context enter.

    Returns:
        self: Timer instance
    """
    self.start = self.timer()
    return self

__exit__(*args)

Stop timing on context exit and calculate elapsed time.

Source code in suthing/timer.py
def __exit__(self, *args):
    """Stop timing on context exit and calculate elapsed time."""
    end = self.timer()
    self.elapsed = end - self.start
    self.mins = int(self.elapsed / seconds_per_minute)
    self.secs = int(self.elapsed - (self.mins * seconds_per_minute))

__init__()

Initialize timer with default settings.

Source code in suthing/timer.py
def __init__(self):
    """Initialize timer with default settings."""
    self.timer = default_timer
    self.mins = 0
    self.secs = 0
    self.elapsed = 0

Profiler

Source code in suthing/decorate.py
class SProfiler:
    def __init__(self):
        self._accumulator: defaultdict[str, list] = defaultdict(list)

    def add_metric(self, hkey, metric_key=None, value=0):
        self._accumulator[hkey] += [value]

    def view_stats(self):
        return deepcopy(self._accumulator)

Comparison

Deep comparison of two objects.

Recursively compares nested dictionaries and iterables. Strings and dicts are treated as atomic values.

Parameters:

Name Type Description Default
a Any

First object to compare

required
b Any

Second object to compare

required

Returns:

Type Description
bool

True if objects are equal, False otherwise

Source code in suthing/compare.py
def equals(a: Any, b: Any) -> bool:
    """Deep comparison of two objects.

    Recursively compares nested dictionaries and iterables.
    Strings and dicts are treated as atomic values.

    Args:
        a: First object to compare
        b: Second object to compare

    Returns:
        True if objects are equal, False otherwise
    """
    if isinstance(a, dict) and isinstance(b, dict):
        if a.keys() != b.keys():
            logger.error(f"a: {a.keys()} ; b: {b.keys()}")
            return False
        else:
            return all([equals(a[k], b[k]) for k in a.keys()])
    elif (isinstance(a, Iterable) and not isinstance(a, excluded_types)) and (
        isinstance(b, Iterable) and not isinstance(b, excluded_types)
    ):
        return all([equals(ea, eb) for ea, eb in zip(a, b)])
    else:
        return a == b