Skip to content

Module arti.internal.utils

None

None

View Source
from __future__ import annotations

import importlib

import inspect

import pkgutil

import threading

from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, MutableMapping

from contextlib import contextmanager

from pathlib import Path

from tempfile import TemporaryDirectory

from types import GenericAlias, ModuleType

from typing import IO, Any, ClassVar, Optional, SupportsIndex, TypeVar, Union, cast

from box import Box

from arti.internal.type_hints import Self

from arti.internal.vendored.setuptools import find_namespace_packages

_K = TypeVar("_K")

_V = TypeVar("_V")

class ClassName:

    def __get__(self, obj: Any, type_: type[Any]) -> str:

        return type_.__name__

class_name = cast(Callable[[], str], ClassName)

PropReturn = TypeVar("PropReturn")

def classproperty(meth: Callable[..., PropReturn]) -> PropReturn:

    """Access a @classmethod like a @property."""

    # mypy doesn't understand class properties yet: https://github.com/python/mypy/issues/2563

    return classmethod(property(meth))  # type: ignore[arg-type,return-value]

class frozendict(Mapping[_K, _V]):

    def __init__(

        self, arg: Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]] = (), **kwargs: _V

    ) -> None:

        self._data = dict[_K, _V](arg, **kwargs)

        # Eagerly evaluate the hash to confirm elements are also frozen (via frozenset) at

        # creation time, not just when hashed.

        self._hash = hash(frozenset(self._data.items()))

    def __getitem__(self, key: _K) -> _V:

        return self._data[key]

    def __hash__(self) -> int:

        return self._hash

    def __iter__(self) -> Iterator[_K]:

        return iter(self._data)

    def __len__(self) -> int:

        return len(self._data)

    def __or__(self, other: Mapping[_K, _V]) -> frozendict[_K, _V]:

        return type(self)({**self, **other})

    __ror__ = __or__

    def __repr__(self) -> str:

        return repr(self._data)

def get_module_name(depth: int = 1) -> Optional[str]:

    """Return the module name of a specific level in the stack.

    Depth describes how many levels to traverse, for example:

    - depth=0: return get_module_name's module

    - depth=1 (default): return the caller's module

    - depth=2: return the caller's calling module

    - ...

    """

    frame = inspect.currentframe()

    if frame is None:  # the interpreter doesn't support frame inspection

        return None  # pragma: no cover

    for _ in range(depth):

        frame = frame.f_back

        if frame is None:

            return None

    return frame.f_globals.get("__name__", "__main__")

def import_submodules(

    path: list[str],  # module.__path__ is a list[str]

    name: str,

    *,

    lock: threading.Lock = threading.Lock(),

) -> dict[str, ModuleType]:

    """Recursively import submodules.

    This can be useful with registry patterns to automatically discover and import submodules

    defining additional implementations.

    `path` and `name` are usually provided from an existing module's `__path__` and `__name__`.

    This function is thread-safe and supports namespace modules.

    NOTE: This inherently triggers eager imports, which has performance impacts and may cause import

    cycles. To reduce these issues, avoid calling during module definition.

    """

    # pkgutil.iter_modules is not recursive and pkgutil.walk_packages does not handle namespace

    # packages... however we can leverage setuptools.find_namespace_packages, which was built for

    # exactly this.

    path_names = {p: name for p in path}

    path_names.update(

        {

            str(Path(path).joinpath(*name.split("."))): f"{root_name}.{name}"

            for path, root_name in path_names.items()

            for name in find_namespace_packages(path)

        }

    )

    with lock:

        return {

            name: importlib.import_module(name)

            for path, name in path_names.items()

            for _, name, _ in pkgutil.iter_modules([path], prefix=f"{name}.")

        }

_int_sub = TypeVar("_int_sub", bound="_int")

class _int(int):

    def __repr__(self) -> str:

        return f"{qname(self)}({int(self)})"

    def __str__(self) -> str:

        return str(int(self))

    # Stock magics.

    def __add__(self, x: int) -> Self:

        return type(self)(super().__add__(x))

    def __and__(self, n: int) -> Self:

        return type(self)(super().__and__(n))

    def __ceil__(self) -> Self:

        return type(self)(super().__ceil__())

    def __floor__(self) -> Self:

        return type(self)(super().__floor__())

    def __floordiv__(self, x: int) -> Self:

        return type(self)(super().__floordiv__(x))

    def __invert__(self) -> Self:

        return type(self)(super().__invert__())

    def __lshift__(self, n: int) -> Self:

        return type(self)(super().__lshift__(n))

    def __mod__(self, x: int) -> Self:

        return type(self)(super().__mod__(x))

    def __mul__(self, x: int) -> Self:

        return type(self)(super().__mul__(x))

    def __neg__(self) -> Self:

        return type(self)(super().__neg__())

    def __or__(self, n: int) -> Self:

        return type(self)(super().__or__(n))

    def __pos__(self) -> Self:

        return type(self)(super().__pos__())

    def __radd__(self, x: int) -> Self:

        return type(self)(super().__radd__(x))

    def __rand__(self, n: int) -> Self:

        return type(self)(super().__rand__(n))

    def __rfloordiv__(self, x: int) -> Self:

        return type(self)(super().__rfloordiv__(x))

    def __rlshift__(self, n: int) -> Self:

        return type(self)(super().__rlshift__(n))

    def __rmod__(self, x: int) -> Self:

        return type(self)(super().__rmod__(x))

    def __rmul__(self, x: int) -> Self:

        return type(self)(super().__rmul__(x))

    def __ror__(self, n: int) -> Self:

        return type(self)(super().__ror__(n))

    def __round__(self, ndigits: SupportsIndex = 0) -> Self:

        return type(self)(super().__round__(ndigits))

    def __rrshift__(self, n: int) -> Self:

        return type(self)(super().__rrshift__(n))

    def __rshift__(self, n: int) -> Self:

        return type(self)(super().__rshift__(n))

    def __rsub__(self, x: int) -> Self:

        return type(self)(super().__rsub__(x))

    def __rxor__(self, n: int) -> Self:

        return type(self)(super().__rxor__(n))

    def __sub__(self, x: int) -> Self:

        return type(self)(super().__sub__(x))

    def __trunc__(self) -> Self:

        return type(self)(super().__trunc__())

    def __xor__(self, n: int) -> Self:

        return type(self)(super().__xor__(n))

class int64(_int):

    _min, _max = -(2**63), (2**63) - 1

    def __new__(cls, i: Union[int, int64, uint64]) -> int64:

        if i > cls._max:

            if isinstance(i, uint64):

                i = int(i) - uint64._max - 1

            else:

                raise ValueError(f"{i} is too large for int64. Hint: cast to uint64 first.")

        if i < cls._min:

            raise ValueError(f"{i} is too small for int64.")

        return super().__new__(cls, i)

class uint64(_int):

    _min, _max = 0, (2**64) - 1

    def __new__(cls, i: Union[int, int64, uint64]) -> uint64:

        if i > cls._max:

            raise ValueError(f"{i} is too large for uint64.")

        if i < cls._min:

            if isinstance(i, int64):

                i = int(i) + cls._max + 1

            else:

                raise ValueError(f"{i} is negative. Hint: cast to int64 first.")

        return super().__new__(cls, i)

@contextmanager

def named_temporary_file(mode: str = "w+b") -> Generator[IO[Any], None, None]:

    """Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows."""

    with TemporaryDirectory() as d, (Path(d) / "contents").open(mode=mode) as f:

        yield f

def one_or_none(values: Optional[list[_V]], *, item_name: str) -> Optional[_V]:

    if values is None or len(values) == 0:

        return None

    if len(values) > 1:

        raise ValueError(f"multiple {item_name} values found: {values}")

    return values[0]

def ordinal(n: int) -> str:

    """Convert an integer into its ordinal representation."""

    n = int(n)

    suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]

    if 11 <= (n % 100) <= 13:

        suffix = "th"

    return str(n) + suffix

def register(

    registry: dict[_K, _V],

    key: _K,

    value: _V,

    get_priority: Optional[Callable[[_V], int]] = None,

) -> _V:

    if key in registry:

        existing = registry[key]

        if get_priority is None:

            raise ValueError(f"{key} is already registered with: {existing}!")

        existing_priority, new_priority = get_priority(existing), get_priority(value)

        if existing_priority > new_priority:

            return value

        if existing_priority == new_priority:

            raise ValueError(

                f"{key} with matching priority ({existing_priority}) is already registered with: {existing}!"

            )

    registry[key] = value

    return value

def qname(val: Union[object, type]) -> str:

    if isinstance(val, type):

        return val.__qualname__

    return type(val).__qualname__

class NoCopyMixin:

    """Mixin to bypass (deep)copying.

    This is useful for objects that are *intended* to be stateful and preserved, despite usually

    preferring immutable data structures and Pydantic models, which (deep)copy often.

    """

    def __copy__(self) -> Self:

        return self  # pragma: no cover

    def __deepcopy__(self, memo: Any) -> Self:

        return self  # pragma: no cover

class NoCopyDict(dict[_K, _V], NoCopyMixin):

    pass

class TypedBox(Box, MutableMapping[str, Union[_V, MutableMapping[str, _V]]]):

    """TypedBox holds a collection of typed values.

    Subclasses must set the __target_type__ to a base class for the contained values.

    """

    __target_type__: ClassVar[type[_V]]  # type: ignore[misc]

    @classmethod

    def __class_getitem__(cls, item: type[_V]) -> GenericAlias:

        if isinstance(item, tuple):

            raise TypeError(f"{cls.__name__} expects a single value type")

        value_type = item

        return GenericAlias(

            type(

                cls.__name__,

                (cls,),

                {

                    "__module__": get_module_name(depth=2),  # Set to our caller's module

                    "__target_type__": value_type,

                },

            ),

            item,

        )

    def __setattr__(self, key: str, value: Any) -> None:

        # GenericAlias sets __orig_class__ after __init__, so preempt Box from storing that (or

        # erroring if frozen).

        if key == "__orig_class__":

            return object.__setattr__(self, key, value)

        super().__setattr__(key, value)

        return None

    def __cast_value(self, item: str, value: Any) -> _V:

        if isinstance(value, self.__target_type__):

            return value

        tgt_name = self.__target_type__.__name__

        if hasattr(self.__target_type__, "cast"):

            casted = cast(Any, self.__target_type__).cast(value)

            if isinstance(casted, self.__target_type__):

                return casted

            raise TypeError(

                f"Expected {tgt_name}.cast({value}) to return an instance of {tgt_name}, got: {casted}"

            )

        raise TypeError(f"Expected an instance of {tgt_name}, got: {value}")

    # NOTE: Box uses name mangling (double __) to prevent conflicts with contained values.

    def _Box__convert_and_store(self, item: str, value: _V) -> None:

        if isinstance(value, dict):

            super()._Box__convert_and_store(item, value)  # pylint: disable=no-member

        elif item in self:

            raise ValueError(f"{item} is already set!")

        else:

            super()._Box__convert_and_store(item, self.__cast_value(item, value))

    def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:

        for k, v in self.items():

            subroot = (*root, k)

            if isinstance(v, TypedBox):

                yield from v.walk(root=subroot)

            else:

                yield ".".join(subroot), v  # type: ignore[misc]

Variables

PropReturn

Functions

classproperty

def classproperty(
    meth: 'Callable[..., PropReturn]'
) -> 'PropReturn'

Access a @classmethod like a @property.

View Source
def classproperty(meth: Callable[..., PropReturn]) -> PropReturn:

    """Access a @classmethod like a @property."""

    # mypy doesn't understand class properties yet: https://github.com/python/mypy/issues/2563

    return classmethod(property(meth))  # type: ignore[arg-type,return-value]

get_module_name

def get_module_name(
    depth: 'int' = 1
) -> 'Optional[str]'

Return the module name of a specific level in the stack.

Depth describes how many levels to traverse, for example: - depth=0: return get_module_name's module - depth=1 (default): return the caller's module - depth=2: return the caller's calling module - ...

View Source
def get_module_name(depth: int = 1) -> Optional[str]:

    """Return the module name of a specific level in the stack.

    Depth describes how many levels to traverse, for example:

    - depth=0: return get_module_name's module

    - depth=1 (default): return the caller's module

    - depth=2: return the caller's calling module

    - ...

    """

    frame = inspect.currentframe()

    if frame is None:  # the interpreter doesn't support frame inspection

        return None  # pragma: no cover

    for _ in range(depth):

        frame = frame.f_back

        if frame is None:

            return None

    return frame.f_globals.get("__name__", "__main__")

import_submodules

def import_submodules(
    path: 'list[str]',
    name: 'str',
    *,
    lock: 'threading.Lock' = <unlocked _thread.lock object at 0x7f7a490779c0>
) -> 'dict[str, ModuleType]'

Recursively import submodules.

This can be useful with registry patterns to automatically discover and import submodules defining additional implementations.

path and name are usually provided from an existing module's __path__ and __name__.

This function is thread-safe and supports namespace modules.

NOTE: This inherently triggers eager imports, which has performance impacts and may cause import cycles. To reduce these issues, avoid calling during module definition.

View Source
def import_submodules(

    path: list[str],  # module.__path__ is a list[str]

    name: str,

    *,

    lock: threading.Lock = threading.Lock(),

) -> dict[str, ModuleType]:

    """Recursively import submodules.

    This can be useful with registry patterns to automatically discover and import submodules

    defining additional implementations.

    `path` and `name` are usually provided from an existing module's `__path__` and `__name__`.

    This function is thread-safe and supports namespace modules.

    NOTE: This inherently triggers eager imports, which has performance impacts and may cause import

    cycles. To reduce these issues, avoid calling during module definition.

    """

    # pkgutil.iter_modules is not recursive and pkgutil.walk_packages does not handle namespace

    # packages... however we can leverage setuptools.find_namespace_packages, which was built for

    # exactly this.

    path_names = {p: name for p in path}

    path_names.update(

        {

            str(Path(path).joinpath(*name.split("."))): f"{root_name}.{name}"

            for path, root_name in path_names.items()

            for name in find_namespace_packages(path)

        }

    )

    with lock:

        return {

            name: importlib.import_module(name)

            for path, name in path_names.items()

            for _, name, _ in pkgutil.iter_modules([path], prefix=f"{name}.")

        }

named_temporary_file

def named_temporary_file(
    mode: 'str' = 'w+b'
) -> 'Generator[IO[Any], None, None]'

Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows.

View Source
@contextmanager

def named_temporary_file(mode: str = "w+b") -> Generator[IO[Any], None, None]:

    """Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows."""

    with TemporaryDirectory() as d, (Path(d) / "contents").open(mode=mode) as f:

        yield f

one_or_none

def one_or_none(
    values: 'Optional[list[_V]]',
    *,
    item_name: 'str'
) -> 'Optional[_V]'
View Source
def one_or_none(values: Optional[list[_V]], *, item_name: str) -> Optional[_V]:

    if values is None or len(values) == 0:

        return None

    if len(values) > 1:

        raise ValueError(f"multiple {item_name} values found: {values}")

    return values[0]

ordinal

def ordinal(
    n: 'int'
) -> 'str'

Convert an integer into its ordinal representation.

View Source
def ordinal(n: int) -> str:

    """Convert an integer into its ordinal representation."""

    n = int(n)

    suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]

    if 11 <= (n % 100) <= 13:

        suffix = "th"

    return str(n) + suffix

qname

def qname(
    val: 'Union[object, type]'
) -> 'str'
View Source
def qname(val: Union[object, type]) -> str:

    if isinstance(val, type):

        return val.__qualname__

    return type(val).__qualname__

register

def register(
    registry: 'dict[_K, _V]',
    key: '_K',
    value: '_V',
    get_priority: 'Optional[Callable[[_V], int]]' = None
) -> '_V'
View Source
def register(

    registry: dict[_K, _V],

    key: _K,

    value: _V,

    get_priority: Optional[Callable[[_V], int]] = None,

) -> _V:

    if key in registry:

        existing = registry[key]

        if get_priority is None:

            raise ValueError(f"{key} is already registered with: {existing}!")

        existing_priority, new_priority = get_priority(existing), get_priority(value)

        if existing_priority > new_priority:

            return value

        if existing_priority == new_priority:

            raise ValueError(

                f"{key} with matching priority ({existing_priority}) is already registered with: {existing}!"

            )

    registry[key] = value

    return value

Classes

ClassName

class ClassName(
    /,
    *args,
    **kwargs
)
View Source
class ClassName:

    def __get__(self, obj: Any, type_: type[Any]) -> str:

        return type_.__name__

NoCopyDict

class NoCopyDict(
    /,
    *args,
    **kwargs
)
View Source
class NoCopyDict(dict[_K, _V], NoCopyMixin):

    pass

Ancestors (in MRO)

  • builtins.dict
  • arti.internal.utils.NoCopyMixin

Methods

clear

def clear(
    ...
)

D.clear() -> None. Remove all items from D.

copy

def copy(
    ...
)

D.copy() -> a shallow copy of D

fromkeys

def fromkeys(
    iterable,
    value=None,
    /
)

Create a new dictionary with keys from iterable and values set to value.

get

def get(
    self,
    key,
    default=None,
    /
)

Return the value for key if key is in the dictionary, else default.

items

def items(
    ...
)

D.items() -> a set-like object providing a view on D's items

keys

def keys(
    ...
)

D.keys() -> a set-like object providing a view on D's keys

pop

def pop(
    ...
)

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem

def popitem(
    self,
    /
)

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault

def setdefault(
    self,
    key,
    default=None,
    /
)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update

def update(
    ...
)

D.update([E, ]**F) -> None. Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values

def values(
    ...
)

D.values() -> an object providing a view on D's values

NoCopyMixin

class NoCopyMixin(
    /,
    *args,
    **kwargs
)
View Source
class NoCopyMixin:

    """Mixin to bypass (deep)copying.

    This is useful for objects that are *intended* to be stateful and preserved, despite usually

    preferring immutable data structures and Pydantic models, which (deep)copy often.

    """

    def __copy__(self) -> Self:

        return self  # pragma: no cover

    def __deepcopy__(self, memo: Any) -> Self:

        return self  # pragma: no cover

Descendants

  • arti.internal.utils.NoCopyDict
  • arti.backends.memory._NoCopyContainer

TypedBox

class TypedBox(
    *args: Any,
    default_box: bool = False,
    default_box_attr: Any = <object object at 0x7f7a490f8080>,
    default_box_none_transform: bool = True,
    default_box_create_on_get: bool = True,
    frozen_box: bool = False,
    camel_killer_box: bool = False,
    conversion_box: bool = True,
    modify_tuples_box: bool = False,
    box_safe_prefix: str = 'x',
    box_duplicates: str = 'ignore',
    box_intact_types: Union[Tuple, List] = (),
    box_recast: Optional[Dict] = None,
    box_dots: bool = False,
    box_class: Union[Dict, Type[ForwardRef('Box')], NoneType] = None,
    box_namespace: Tuple[str, ...] = (),
    **kwargs: Any
)
View Source
class TypedBox(Box, MutableMapping[str, Union[_V, MutableMapping[str, _V]]]):

    """TypedBox holds a collection of typed values.

    Subclasses must set the __target_type__ to a base class for the contained values.

    """

    __target_type__: ClassVar[type[_V]]  # type: ignore[misc]

    @classmethod

    def __class_getitem__(cls, item: type[_V]) -> GenericAlias:

        if isinstance(item, tuple):

            raise TypeError(f"{cls.__name__} expects a single value type")

        value_type = item

        return GenericAlias(

            type(

                cls.__name__,

                (cls,),

                {

                    "__module__": get_module_name(depth=2),  # Set to our caller's module

                    "__target_type__": value_type,

                },

            ),

            item,

        )

    def __setattr__(self, key: str, value: Any) -> None:

        # GenericAlias sets __orig_class__ after __init__, so preempt Box from storing that (or

        # erroring if frozen).

        if key == "__orig_class__":

            return object.__setattr__(self, key, value)

        super().__setattr__(key, value)

        return None

    def __cast_value(self, item: str, value: Any) -> _V:

        if isinstance(value, self.__target_type__):

            return value

        tgt_name = self.__target_type__.__name__

        if hasattr(self.__target_type__, "cast"):

            casted = cast(Any, self.__target_type__).cast(value)

            if isinstance(casted, self.__target_type__):

                return casted

            raise TypeError(

                f"Expected {tgt_name}.cast({value}) to return an instance of {tgt_name}, got: {casted}"

            )

        raise TypeError(f"Expected an instance of {tgt_name}, got: {value}")

    # NOTE: Box uses name mangling (double __) to prevent conflicts with contained values.

    def _Box__convert_and_store(self, item: str, value: _V) -> None:

        if isinstance(value, dict):

            super()._Box__convert_and_store(item, value)  # pylint: disable=no-member

        elif item in self:

            raise ValueError(f"{item} is already set!")

        else:

            super()._Box__convert_and_store(item, self.__cast_value(item, value))

    def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:

        for k, v in self.items():

            subroot = (*root, k)

            if isinstance(v, TypedBox):

                yield from v.walk(root=subroot)

            else:

                yield ".".join(subroot), v  # type: ignore[misc]

Ancestors (in MRO)

  • box.box.Box
  • builtins.dict
  • collections.abc.MutableMapping
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Descendants

  • arti.graphs.TypedBox

Static methods

from_json

def from_json(
    json_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transform a json object string into a Box object. If the incoming

json is a list, you must use BoxList.from_json.

Parameters:

Name Type Description Default
json_string None string to pass to json.loads None
filename None filename to open and pass to json.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() or json.loads None

Returns:

Type Description
None Box object from json data
View Source
    @classmethod

    def from_json(

        cls,

        json_string: Optional[str] = None,

        filename: Optional[Union[str, PathLike]] = None,

        encoding: str = "utf-8",

        errors: str = "strict",

        **kwargs,

    ) -> "Box":

        """

        Transform a json object string into a Box object. If the incoming

        json is a list, you must use BoxList.from_json.

        :param json_string: string to pass to `json.loads`

        :param filename: filename to open and pass to `json.load`

        :param encoding: File encoding

        :param errors: How to handle encoding errors

        :param kwargs: parameters to pass to `Box()` or `json.loads`

        :return: Box object from json data

        """

        box_args = {}

        for arg in kwargs.copy():

            if arg in BOX_PARAMETERS:

                box_args[arg] = kwargs.pop(arg)

        data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, **kwargs)

        if not isinstance(data, dict):

            raise BoxError(f"json data not returned as a dictionary, but rather a {type(data).__name__}")

        return cls(data, **box_args)

from_msgpack

def from_msgpack(
    msgpack_bytes: Optional[bytes] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'
View Source
        @classmethod

        def from_msgpack(

            cls,

            msgpack_bytes: Optional[bytes] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')

from_toml

def from_toml(
    toml_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transforms a toml string or file into a Box object

Parameters:

Name Type Description Default
toml_string None string to pass to toml.load None
filename None filename to open and pass to toml.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() None

Returns:

Type Description
None Box object
View Source
        @classmethod

        def from_toml(

            cls,

            toml_string: Optional[str] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            """

            Transforms a toml string or file into a Box object

            :param toml_string: string to pass to `toml.load`

            :param filename: filename to open and pass to `toml.load`

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param kwargs: parameters to pass to `Box()`

            :return: Box object

            """

            box_args = {}

            for arg in kwargs.copy():

                if arg in BOX_PARAMETERS:

                    box_args[arg] = kwargs.pop(arg)

            data = _from_toml(toml_string=toml_string, filename=filename, encoding=encoding, errors=errors)

            return cls(data, **box_args)

from_yaml

def from_yaml(
    yaml_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transform a yaml object string into a Box object. By default will use SafeLoader.

Parameters:

Name Type Description Default
yaml_string None string to pass to yaml.load None
filename None filename to open and pass to yaml.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() or yaml.load None

Returns:

Type Description
None Box object from yaml data
View Source
        @classmethod

        def from_yaml(

            cls,

            yaml_string: Optional[str] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            """

            Transform a yaml object string into a Box object. By default will use SafeLoader.

            :param yaml_string: string to pass to `yaml.load`

            :param filename: filename to open and pass to `yaml.load`

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param kwargs: parameters to pass to `Box()` or `yaml.load`

            :return: Box object from yaml data

            """

            box_args = {}

            for arg in kwargs.copy():

                if arg in BOX_PARAMETERS:

                    box_args[arg] = kwargs.pop(arg)

            data = _from_yaml(yaml_string=yaml_string, filename=filename, encoding=encoding, errors=errors, **kwargs)

            if not data:

                return cls(**box_args)

            if not isinstance(data, dict):

                raise BoxError(f"yaml data not returned as a dictionary but rather a {type(data).__name__}")

            return cls(data, **box_args)

Methods

clear

def clear(
    self
)

D.clear() -> None. Remove all items from D.

View Source
    def clear(self):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        super().clear()

        self._box_config["__safe_keys"].clear()

copy

def copy(
    self
) -> 'Box'

D.copy() -> a shallow copy of D

View Source
    def copy(self) -> "Box":

        config = self.__box_config()

        config.pop("box_namespace")  # Detach namespace; it will be reassigned if we nest again

        return Box(super().copy(), **config)

fromkeys

def fromkeys(
    iterable,
    value=None,
    /
)

Create a new dictionary with keys from iterable and values set to value.

get

def get(
    self,
    key,
    default=<object object at 0x7f7a490f8080>
)

Return the value for key if key is in the dictionary, else default.

View Source
    def get(self, key, default=NO_DEFAULT):

        if key not in self:

            if default is NO_DEFAULT:

                if self._box_config["default_box"] and self._box_config["default_box_none_transform"]:

                    return self.__get_default(key)

                else:

                    return None

            if isinstance(default, dict) and not isinstance(default, Box):

                return Box(default)

            if isinstance(default, list) and not isinstance(default, box.BoxList):

                return box.BoxList(default)

            return default

        return self[key]

items

def items(
    self,
    dotted: bool = False
)

D.items() -> a set-like object providing a view on D's items

View Source
    def items(self, dotted: Union[bool] = False):

        if not dotted:

            return super().items()

        if not self._box_config["box_dots"]:

            raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")

        return [(k, self[k]) for k in self.keys(dotted=True)]

keys

def keys(
    self,
    dotted: bool = False
)

D.keys() -> a set-like object providing a view on D's keys

View Source
    def keys(self, dotted: Union[bool] = False):

        if not dotted:

            return super().keys()

        if not self._box_config["box_dots"]:

            raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")

        keys = set()

        for key, value in self.items():

            added = False

            if isinstance(key, str):

                if isinstance(value, Box):

                    for sub_key in value.keys(dotted=True):

                        keys.add(f"{key}.{sub_key}")

                        added = True

                elif isinstance(value, box.BoxList):

                    for pos in value._dotted_helper():

                        keys.add(f"{key}{pos}")

                        added = True

                if not added:

                    keys.add(key)

        return sorted(keys, key=lambda x: str(x))

merge_update

def merge_update(
    self,
    *args,
    **kwargs
)
View Source
    def merge_update(self, *args, **kwargs):

        merge_type = None

        if "box_merge_lists" in kwargs:

            merge_type = kwargs.pop("box_merge_lists")

        def convert_and_set(k, v):

            intact_type = self._box_config["box_intact_types"] and isinstance(v, self._box_config["box_intact_types"])

            if isinstance(v, dict) and not intact_type:

                # Box objects must be created in case they are already

                # in the `converted` box_config set

                v = self._box_config["box_class"](v, **self.__box_config(extra_namespace=k))

                if k in self and isinstance(self[k], dict):

                    self[k].merge_update(v)

                    return

            if isinstance(v, list) and not intact_type:

                v = box.BoxList(v, **self.__box_config(extra_namespace=k))

                if merge_type == "extend" and k in self and isinstance(self[k], list):

                    self[k].extend(v)

                    return

                if merge_type == "unique" and k in self and isinstance(self[k], list):

                    for item in v:

                        if item not in self[k]:

                            self[k].append(item)

                    return

            self.__setitem__(k, v)

        if (len(args) + int(bool(kwargs))) > 1:

            raise BoxTypeError(f"merge_update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")

        single_arg = next(iter(args), None)

        if single_arg:

            if hasattr(single_arg, "keys"):

                for k in single_arg:

                    convert_and_set(k, single_arg[k])

            else:

                for k, v in single_arg:

                    convert_and_set(k, v)

        for key in kwargs:

            convert_and_set(key, kwargs[key])

pop

def pop(
    self,
    key,
    *args
)

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

View Source
    def pop(self, key, *args):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        if args:

            if len(args) != 1:

                raise BoxError('pop() takes only one optional argument "default"')

            try:

                item = self[key]

            except KeyError:

                return args[0]

            else:

                del self[key]

                return item

        try:

            item = self[key]

        except KeyError:

            raise BoxKeyError(f"{key}") from None

        else:

            del self[key]

            return item

popitem

def popitem(
    self
)

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

View Source
    def popitem(self):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        try:

            key = next(self.__iter__())

        except StopIteration:

            raise BoxKeyError("Empty box") from None

        return key, self.pop(key)

setdefault

def setdefault(
    self,
    item,
    default=None
)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

View Source
    def setdefault(self, item, default=None):

        if item in self:

            return self[item]

        if self._box_config["box_dots"]:

            if item in _get_dot_paths(self):

                return self[item]

        if isinstance(default, dict):

            default = self._box_config["box_class"](default, **self.__box_config(extra_namespace=item))

        if isinstance(default, list):

            default = box.BoxList(default, **self.__box_config(extra_namespace=item))

        self[item] = default

        return self[item]

to_dict

def to_dict(
    self
) -> Dict

Turn the Box and sub Boxes back into a native python dictionary.

Returns:

Type Description
None python dictionary of this Box
View Source
    def to_dict(self) -> Dict:

        """

        Turn the Box and sub Boxes back into a native python dictionary.

        :return: python dictionary of this Box

        """

        out_dict = dict(self)

        for k, v in out_dict.items():

            if v is self:

                out_dict[k] = out_dict

            elif isinstance(v, Box):

                out_dict[k] = v.to_dict()

            elif isinstance(v, box.BoxList):

                out_dict[k] = v.to_list()

        return out_dict

to_json

def to_json(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **json_kwargs
)

Transform the Box object into a JSON string.

Parameters:

Name Type Description Default
filename None If provided will save to file None
encoding None File encoding None
errors None How to handle encoding errors None
json_kwargs None additional arguments to pass to json.dump(s) None

Returns:

Type Description
None string of JSON (if no filename provided)
View Source
    def to_json(

        self,

        filename: Optional[Union[str, PathLike]] = None,

        encoding: str = "utf-8",

        errors: str = "strict",

        **json_kwargs,

    ):

        """

        Transform the Box object into a JSON string.

        :param filename: If provided will save to file

        :param encoding: File encoding

        :param errors: How to handle encoding errors

        :param json_kwargs: additional arguments to pass to json.dump(s)

        :return: string of JSON (if no filename provided)

        """

        return _to_json(self.to_dict(), filename=filename, encoding=encoding, errors=errors, **json_kwargs)

to_msgpack

def to_msgpack(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    **kwargs
)
View Source
        def to_msgpack(self, filename: Optional[Union[str, PathLike]] = None, **kwargs):

            raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')

to_toml

def to_toml(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict'
)

Transform the Box object into a toml string.

Parameters:

Name Type Description Default
filename None File to write toml object too None
encoding None File encoding None
errors None How to handle encoding errors None

Returns:

Type Description
None string of TOML (if no filename provided)
View Source
        def to_toml(

            self, filename: Optional[Union[str, PathLike]] = None, encoding: str = "utf-8", errors: str = "strict"

        ):

            """

            Transform the Box object into a toml string.

            :param filename: File to write toml object too

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :return: string of TOML (if no filename provided)

            """

            return _to_toml(self.to_dict(), filename=filename, encoding=encoding, errors=errors)

to_yaml

def to_yaml(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    default_flow_style: bool = False,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **yaml_kwargs
)

Transform the Box object into a YAML string.

Parameters:

Name Type Description Default
filename None If provided will save to file None
default_flow_style None False will recursively dump dicts None
encoding None File encoding None
errors None How to handle encoding errors None
yaml_kwargs None additional arguments to pass to yaml.dump None

Returns:

Type Description
None string of YAML (if no filename provided)
View Source
        def to_yaml(

            self,

            filename: Optional[Union[str, PathLike]] = None,

            default_flow_style: bool = False,

            encoding: str = "utf-8",

            errors: str = "strict",

            **yaml_kwargs,

        ):

            """

            Transform the Box object into a YAML string.

            :param filename:  If provided will save to file

            :param default_flow_style: False will recursively dump dicts

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param yaml_kwargs: additional arguments to pass to yaml.dump

            :return: string of YAML (if no filename provided)

            """

            return _to_yaml(

                self.to_dict(),

                filename=filename,

                default_flow_style=default_flow_style,

                encoding=encoding,

                errors=errors,

                **yaml_kwargs,

            )

update

def update(
    self,
    *args,
    **kwargs
)

D.update([E, ]**F) -> None. Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

View Source
    def update(self, *args, **kwargs):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        if (len(args) + int(bool(kwargs))) > 1:

            raise BoxTypeError(f"update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")

        single_arg = next(iter(args), None)

        if single_arg:

            if hasattr(single_arg, "keys"):

                for k in single_arg:

                    self.__convert_and_store(k, single_arg[k])

            else:

                for k, v in single_arg:

                    self.__convert_and_store(k, v)

        for k in kwargs:

            self.__convert_and_store(k, kwargs[k])

values

def values(
    ...
)

D.values() -> an object providing a view on D's values

walk

def walk(
    self,
    root: 'tuple[str, ...]' = ()
) -> 'Iterator[tuple[str, _V]]'
View Source
    def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:

        for k, v in self.items():

            subroot = (*root, k)

            if isinstance(v, TypedBox):

                yield from v.walk(root=subroot)

            else:

                yield ".".join(subroot), v  # type: ignore[misc]

class_name

class class_name(
    /,
    *args,
    **kwargs
)
View Source
class ClassName:

    def __get__(self, obj: Any, type_: type[Any]) -> str:

        return type_.__name__

frozendict

class frozendict(
    arg: 'Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]]' = (),
    **kwargs: '_V'
)
View Source
class frozendict(Mapping[_K, _V]):

    def __init__(

        self, arg: Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]] = (), **kwargs: _V

    ) -> None:

        self._data = dict[_K, _V](arg, **kwargs)

        # Eagerly evaluate the hash to confirm elements are also frozen (via frozenset) at

        # creation time, not just when hashed.

        self._hash = hash(frozenset(self._data.items()))

    def __getitem__(self, key: _K) -> _V:

        return self._data[key]

    def __hash__(self) -> int:

        return self._hash

    def __iter__(self) -> Iterator[_K]:

        return iter(self._data)

    def __len__(self) -> int:

        return len(self._data)

    def __or__(self, other: Mapping[_K, _V]) -> frozendict[_K, _V]:

        return type(self)({**self, **other})

    __ror__ = __or__

    def __repr__(self) -> str:

        return repr(self._data)

Ancestors (in MRO)

  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Methods

get

def get(
    self,
    key,
    default=None
)

D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.

items

def items(
    self
)

D.items() -> a set-like object providing a view on D's items

keys

def keys(
    self
)

D.keys() -> a set-like object providing a view on D's keys

values

def values(
    self
)

D.values() -> an object providing a view on D's values

int64

class int64(
    /,
    *args,
    **kwargs
)
View Source
class int64(_int):

    _min, _max = -(2**63), (2**63) - 1

    def __new__(cls, i: Union[int, int64, uint64]) -> int64:

        if i > cls._max:

            if isinstance(i, uint64):

                i = int(i) - uint64._max - 1

            else:

                raise ValueError(f"{i} is too large for int64. Hint: cast to uint64 first.")

        if i < cls._min:

            raise ValueError(f"{i} is too small for int64.")

        return super().__new__(cls, i)

Ancestors (in MRO)

  • arti.internal.utils._int
  • builtins.int

Class variables

denominator
imag
numerator
real

Methods

as_integer_ratio

def as_integer_ratio(
    self,
    /
)

Return integer ratio.

Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.

(10).as_integer_ratio() (10, 1) (-10).as_integer_ratio() (-10, 1) (0).as_integer_ratio() (0, 1)

bit_count

def bit_count(
    self,
    /
)

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

bin(13) '0b1101' (13).bit_count() 3

bit_length

def bit_length(
    self,
    /
)

Number of bits necessary to represent self in binary.

bin(37) '0b100101' (37).bit_length() 6

conjugate

def conjugate(
    ...
)

Returns self, the complex conjugate of any int.

from_bytes

def from_bytes(
    bytes,
    byteorder='big',
    *,
    signed=False
)

Return the integer represented by the given array of bytes.

bytes Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Indicates whether two's complement is used to represent the integer.

to_bytes

def to_bytes(
    self,
    /,
    length=1,
    byteorder='big',
    *,
    signed=False
)

Return an array of bytes representing an integer.

length Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Determines whether two's complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.

uint64

class uint64(
    /,
    *args,
    **kwargs
)
View Source
class uint64(_int):

    _min, _max = 0, (2**64) - 1

    def __new__(cls, i: Union[int, int64, uint64]) -> uint64:

        if i > cls._max:

            raise ValueError(f"{i} is too large for uint64.")

        if i < cls._min:

            if isinstance(i, int64):

                i = int(i) + cls._max + 1

            else:

                raise ValueError(f"{i} is negative. Hint: cast to int64 first.")

        return super().__new__(cls, i)

Ancestors (in MRO)

  • arti.internal.utils._int
  • builtins.int

Class variables

denominator
imag
numerator
real

Methods

as_integer_ratio

def as_integer_ratio(
    self,
    /
)

Return integer ratio.

Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.

(10).as_integer_ratio() (10, 1) (-10).as_integer_ratio() (-10, 1) (0).as_integer_ratio() (0, 1)

bit_count

def bit_count(
    self,
    /
)

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

bin(13) '0b1101' (13).bit_count() 3

bit_length

def bit_length(
    self,
    /
)

Number of bits necessary to represent self in binary.

bin(37) '0b100101' (37).bit_length() 6

conjugate

def conjugate(
    ...
)

Returns self, the complex conjugate of any int.

from_bytes

def from_bytes(
    bytes,
    byteorder='big',
    *,
    signed=False
)

Return the integer represented by the given array of bytes.

bytes Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Indicates whether two's complement is used to represent the integer.

to_bytes

def to_bytes(
    self,
    /,
    length=1,
    byteorder='big',
    *,
    signed=False
)

Return an array of bytes representing an integer.

length Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Determines whether two's complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.