Module arti.internal.utils
None
None
View Source
from __future__ import annotations
import importlib
import inspect
import pkgutil
import threading
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, MutableMapping
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from types import GenericAlias, ModuleType
from typing import IO, Any, ClassVar, Optional, SupportsIndex, TypeVar, Union, cast
from box import Box
from arti.internal.type_hints import Self
from arti.internal.vendored.setuptools import find_namespace_packages
_K = TypeVar("_K")
_V = TypeVar("_V")
class ClassName:
def __get__(self, obj: Any, type_: type[Any]) -> str:
return type_.__name__
class_name = cast(Callable[[], str], ClassName)
PropReturn = TypeVar("PropReturn")
def classproperty(meth: Callable[..., PropReturn]) -> PropReturn:
"""Access a @classmethod like a @property."""
# mypy doesn't understand class properties yet: https://github.com/python/mypy/issues/2563
return classmethod(property(meth)) # type: ignore[arg-type,return-value]
class frozendict(Mapping[_K, _V]):
def __init__(
self, arg: Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]] = (), **kwargs: _V
) -> None:
self._data = dict[_K, _V](arg, **kwargs)
# Eagerly evaluate the hash to confirm elements are also frozen (via frozenset) at
# creation time, not just when hashed.
self._hash = hash(frozenset(self._data.items()))
def __getitem__(self, key: _K) -> _V:
return self._data[key]
def __hash__(self) -> int:
return self._hash
def __iter__(self) -> Iterator[_K]:
return iter(self._data)
def __len__(self) -> int:
return len(self._data)
def __or__(self, other: Mapping[_K, _V]) -> frozendict[_K, _V]:
return type(self)({**self, **other})
__ror__ = __or__
def __repr__(self) -> str:
return repr(self._data)
def get_module_name(depth: int = 1) -> Optional[str]:
"""Return the module name of a specific level in the stack.
Depth describes how many levels to traverse, for example:
- depth=0: return get_module_name's module
- depth=1 (default): return the caller's module
- depth=2: return the caller's calling module
- ...
"""
frame = inspect.currentframe()
if frame is None: # the interpreter doesn't support frame inspection
return None # pragma: no cover
for _ in range(depth):
frame = frame.f_back
if frame is None:
return None
return frame.f_globals.get("__name__", "__main__")
def import_submodules(
path: list[str], # module.__path__ is a list[str]
name: str,
*,
lock: threading.Lock = threading.Lock(),
) -> dict[str, ModuleType]:
"""Recursively import submodules.
This can be useful with registry patterns to automatically discover and import submodules
defining additional implementations.
`path` and `name` are usually provided from an existing module's `__path__` and `__name__`.
This function is thread-safe and supports namespace modules.
NOTE: This inherently triggers eager imports, which has performance impacts and may cause import
cycles. To reduce these issues, avoid calling during module definition.
"""
# pkgutil.iter_modules is not recursive and pkgutil.walk_packages does not handle namespace
# packages... however we can leverage setuptools.find_namespace_packages, which was built for
# exactly this.
path_names = {p: name for p in path}
path_names.update(
{
str(Path(path).joinpath(*name.split("."))): f"{root_name}.{name}"
for path, root_name in path_names.items()
for name in find_namespace_packages(path)
}
)
with lock:
return {
name: importlib.import_module(name)
for path, name in path_names.items()
for _, name, _ in pkgutil.iter_modules([path], prefix=f"{name}.")
}
_int_sub = TypeVar("_int_sub", bound="_int")
class _int(int):
def __repr__(self) -> str:
return f"{qname(self)}({int(self)})"
def __str__(self) -> str:
return str(int(self))
# Stock magics.
def __add__(self, x: int) -> Self:
return type(self)(super().__add__(x))
def __and__(self, n: int) -> Self:
return type(self)(super().__and__(n))
def __ceil__(self) -> Self:
return type(self)(super().__ceil__())
def __floor__(self) -> Self:
return type(self)(super().__floor__())
def __floordiv__(self, x: int) -> Self:
return type(self)(super().__floordiv__(x))
def __invert__(self) -> Self:
return type(self)(super().__invert__())
def __lshift__(self, n: int) -> Self:
return type(self)(super().__lshift__(n))
def __mod__(self, x: int) -> Self:
return type(self)(super().__mod__(x))
def __mul__(self, x: int) -> Self:
return type(self)(super().__mul__(x))
def __neg__(self) -> Self:
return type(self)(super().__neg__())
def __or__(self, n: int) -> Self:
return type(self)(super().__or__(n))
def __pos__(self) -> Self:
return type(self)(super().__pos__())
def __radd__(self, x: int) -> Self:
return type(self)(super().__radd__(x))
def __rand__(self, n: int) -> Self:
return type(self)(super().__rand__(n))
def __rfloordiv__(self, x: int) -> Self:
return type(self)(super().__rfloordiv__(x))
def __rlshift__(self, n: int) -> Self:
return type(self)(super().__rlshift__(n))
def __rmod__(self, x: int) -> Self:
return type(self)(super().__rmod__(x))
def __rmul__(self, x: int) -> Self:
return type(self)(super().__rmul__(x))
def __ror__(self, n: int) -> Self:
return type(self)(super().__ror__(n))
def __round__(self, ndigits: SupportsIndex = 0) -> Self:
return type(self)(super().__round__(ndigits))
def __rrshift__(self, n: int) -> Self:
return type(self)(super().__rrshift__(n))
def __rshift__(self, n: int) -> Self:
return type(self)(super().__rshift__(n))
def __rsub__(self, x: int) -> Self:
return type(self)(super().__rsub__(x))
def __rxor__(self, n: int) -> Self:
return type(self)(super().__rxor__(n))
def __sub__(self, x: int) -> Self:
return type(self)(super().__sub__(x))
def __trunc__(self) -> Self:
return type(self)(super().__trunc__())
def __xor__(self, n: int) -> Self:
return type(self)(super().__xor__(n))
class int64(_int):
_min, _max = -(2**63), (2**63) - 1
def __new__(cls, i: Union[int, int64, uint64]) -> int64:
if i > cls._max:
if isinstance(i, uint64):
i = int(i) - uint64._max - 1
else:
raise ValueError(f"{i} is too large for int64. Hint: cast to uint64 first.")
if i < cls._min:
raise ValueError(f"{i} is too small for int64.")
return super().__new__(cls, i)
class uint64(_int):
_min, _max = 0, (2**64) - 1
def __new__(cls, i: Union[int, int64, uint64]) -> uint64:
if i > cls._max:
raise ValueError(f"{i} is too large for uint64.")
if i < cls._min:
if isinstance(i, int64):
i = int(i) + cls._max + 1
else:
raise ValueError(f"{i} is negative. Hint: cast to int64 first.")
return super().__new__(cls, i)
@contextmanager
def named_temporary_file(mode: str = "w+b") -> Generator[IO[Any], None, None]:
"""Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows."""
with TemporaryDirectory() as d, (Path(d) / "contents").open(mode=mode) as f:
yield f
def one_or_none(values: Optional[list[_V]], *, item_name: str) -> Optional[_V]:
if values is None or len(values) == 0:
return None
if len(values) > 1:
raise ValueError(f"multiple {item_name} values found: {values}")
return values[0]
def ordinal(n: int) -> str:
"""Convert an integer into its ordinal representation."""
n = int(n)
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
if 11 <= (n % 100) <= 13:
suffix = "th"
return str(n) + suffix
def register(
registry: dict[_K, _V],
key: _K,
value: _V,
get_priority: Optional[Callable[[_V], int]] = None,
) -> _V:
if key in registry:
existing = registry[key]
if get_priority is None:
raise ValueError(f"{key} is already registered with: {existing}!")
existing_priority, new_priority = get_priority(existing), get_priority(value)
if existing_priority > new_priority:
return value
if existing_priority == new_priority:
raise ValueError(
f"{key} with matching priority ({existing_priority}) is already registered with: {existing}!"
)
registry[key] = value
return value
def qname(val: Union[object, type]) -> str:
if isinstance(val, type):
return val.__qualname__
return type(val).__qualname__
class NoCopyMixin:
"""Mixin to bypass (deep)copying.
This is useful for objects that are *intended* to be stateful and preserved, despite usually
preferring immutable data structures and Pydantic models, which (deep)copy often.
"""
def __copy__(self) -> Self:
return self # pragma: no cover
def __deepcopy__(self, memo: Any) -> Self:
return self # pragma: no cover
class NoCopyDict(dict[_K, _V], NoCopyMixin):
pass
class TypedBox(Box, MutableMapping[str, Union[_V, MutableMapping[str, _V]]]):
"""TypedBox holds a collection of typed values.
Subclasses must set the __target_type__ to a base class for the contained values.
"""
__target_type__: ClassVar[type[_V]] # type: ignore[misc]
@classmethod
def __class_getitem__(cls, item: type[_V]) -> GenericAlias:
if isinstance(item, tuple):
raise TypeError(f"{cls.__name__} expects a single value type")
value_type = item
return GenericAlias(
type(
cls.__name__,
(cls,),
{
"__module__": get_module_name(depth=2), # Set to our caller's module
"__target_type__": value_type,
},
),
item,
)
def __setattr__(self, key: str, value: Any) -> None:
# GenericAlias sets __orig_class__ after __init__, so preempt Box from storing that (or
# erroring if frozen).
if key == "__orig_class__":
return object.__setattr__(self, key, value)
super().__setattr__(key, value)
return None
def __cast_value(self, item: str, value: Any) -> _V:
if isinstance(value, self.__target_type__):
return value
tgt_name = self.__target_type__.__name__
if hasattr(self.__target_type__, "cast"):
casted = cast(Any, self.__target_type__).cast(value)
if isinstance(casted, self.__target_type__):
return casted
raise TypeError(
f"Expected {tgt_name}.cast({value}) to return an instance of {tgt_name}, got: {casted}"
)
raise TypeError(f"Expected an instance of {tgt_name}, got: {value}")
# NOTE: Box uses name mangling (double __) to prevent conflicts with contained values.
def _Box__convert_and_store(self, item: str, value: _V) -> None:
if isinstance(value, dict):
super()._Box__convert_and_store(item, value) # pylint: disable=no-member
elif item in self:
raise ValueError(f"{item} is already set!")
else:
super()._Box__convert_and_store(item, self.__cast_value(item, value))
def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:
for k, v in self.items():
subroot = (*root, k)
if isinstance(v, TypedBox):
yield from v.walk(root=subroot)
else:
yield ".".join(subroot), v # type: ignore[misc]
Variables
PropReturn
Functions
classproperty
def classproperty(
meth: 'Callable[..., PropReturn]'
) -> 'PropReturn'
Access a @classmethod like a @property.
View Source
def classproperty(meth: Callable[..., PropReturn]) -> PropReturn:
"""Access a @classmethod like a @property."""
# mypy doesn't understand class properties yet: https://github.com/python/mypy/issues/2563
return classmethod(property(meth)) # type: ignore[arg-type,return-value]
get_module_name
def get_module_name(
depth: 'int' = 1
) -> 'Optional[str]'
Return the module name of a specific level in the stack.
Depth describes how many levels to traverse, for example: - depth=0: return get_module_name's module - depth=1 (default): return the caller's module - depth=2: return the caller's calling module - ...
View Source
def get_module_name(depth: int = 1) -> Optional[str]:
"""Return the module name of a specific level in the stack.
Depth describes how many levels to traverse, for example:
- depth=0: return get_module_name's module
- depth=1 (default): return the caller's module
- depth=2: return the caller's calling module
- ...
"""
frame = inspect.currentframe()
if frame is None: # the interpreter doesn't support frame inspection
return None # pragma: no cover
for _ in range(depth):
frame = frame.f_back
if frame is None:
return None
return frame.f_globals.get("__name__", "__main__")
import_submodules
def import_submodules(
path: 'list[str]',
name: 'str',
*,
lock: 'threading.Lock' = <unlocked _thread.lock object at 0x7f7a490779c0>
) -> 'dict[str, ModuleType]'
Recursively import submodules.
This can be useful with registry patterns to automatically discover and import submodules defining additional implementations.
path
and name
are usually provided from an existing module's __path__
and __name__
.
This function is thread-safe and supports namespace modules.
NOTE: This inherently triggers eager imports, which has performance impacts and may cause import cycles. To reduce these issues, avoid calling during module definition.
View Source
def import_submodules(
path: list[str], # module.__path__ is a list[str]
name: str,
*,
lock: threading.Lock = threading.Lock(),
) -> dict[str, ModuleType]:
"""Recursively import submodules.
This can be useful with registry patterns to automatically discover and import submodules
defining additional implementations.
`path` and `name` are usually provided from an existing module's `__path__` and `__name__`.
This function is thread-safe and supports namespace modules.
NOTE: This inherently triggers eager imports, which has performance impacts and may cause import
cycles. To reduce these issues, avoid calling during module definition.
"""
# pkgutil.iter_modules is not recursive and pkgutil.walk_packages does not handle namespace
# packages... however we can leverage setuptools.find_namespace_packages, which was built for
# exactly this.
path_names = {p: name for p in path}
path_names.update(
{
str(Path(path).joinpath(*name.split("."))): f"{root_name}.{name}"
for path, root_name in path_names.items()
for name in find_namespace_packages(path)
}
)
with lock:
return {
name: importlib.import_module(name)
for path, name in path_names.items()
for _, name, _ in pkgutil.iter_modules([path], prefix=f"{name}.")
}
named_temporary_file
def named_temporary_file(
mode: 'str' = 'w+b'
) -> 'Generator[IO[Any], None, None]'
Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows.
View Source
@contextmanager
def named_temporary_file(mode: str = "w+b") -> Generator[IO[Any], None, None]:
"""Minimal alternative to tempfile.NamedTemporaryFile that can be re-opened on Windows."""
with TemporaryDirectory() as d, (Path(d) / "contents").open(mode=mode) as f:
yield f
one_or_none
def one_or_none(
values: 'Optional[list[_V]]',
*,
item_name: 'str'
) -> 'Optional[_V]'
View Source
def one_or_none(values: Optional[list[_V]], *, item_name: str) -> Optional[_V]:
if values is None or len(values) == 0:
return None
if len(values) > 1:
raise ValueError(f"multiple {item_name} values found: {values}")
return values[0]
ordinal
def ordinal(
n: 'int'
) -> 'str'
Convert an integer into its ordinal representation.
View Source
def ordinal(n: int) -> str:
"""Convert an integer into its ordinal representation."""
n = int(n)
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
if 11 <= (n % 100) <= 13:
suffix = "th"
return str(n) + suffix
qname
def qname(
val: 'Union[object, type]'
) -> 'str'
View Source
def qname(val: Union[object, type]) -> str:
if isinstance(val, type):
return val.__qualname__
return type(val).__qualname__
register
def register(
registry: 'dict[_K, _V]',
key: '_K',
value: '_V',
get_priority: 'Optional[Callable[[_V], int]]' = None
) -> '_V'
View Source
def register(
registry: dict[_K, _V],
key: _K,
value: _V,
get_priority: Optional[Callable[[_V], int]] = None,
) -> _V:
if key in registry:
existing = registry[key]
if get_priority is None:
raise ValueError(f"{key} is already registered with: {existing}!")
existing_priority, new_priority = get_priority(existing), get_priority(value)
if existing_priority > new_priority:
return value
if existing_priority == new_priority:
raise ValueError(
f"{key} with matching priority ({existing_priority}) is already registered with: {existing}!"
)
registry[key] = value
return value
Classes
ClassName
class ClassName(
/,
*args,
**kwargs
)
View Source
class ClassName:
def __get__(self, obj: Any, type_: type[Any]) -> str:
return type_.__name__
NoCopyDict
class NoCopyDict(
/,
*args,
**kwargs
)
View Source
class NoCopyDict(dict[_K, _V], NoCopyMixin):
pass
Ancestors (in MRO)
- builtins.dict
- arti.internal.utils.NoCopyMixin
Methods
clear
def clear(
...
)
D.clear() -> None. Remove all items from D.
copy
def copy(
...
)
D.copy() -> a shallow copy of D
fromkeys
def fromkeys(
iterable,
value=None,
/
)
Create a new dictionary with keys from iterable and values set to value.
get
def get(
self,
key,
default=None,
/
)
Return the value for key if key is in the dictionary, else default.
items
def items(
...
)
D.items() -> a set-like object providing a view on D's items
keys
def keys(
...
)
D.keys() -> a set-like object providing a view on D's keys
pop
def pop(
...
)
D.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
popitem
def popitem(
self,
/
)
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
setdefault
def setdefault(
self,
key,
default=None,
/
)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
update
def update(
...
)
D.update([E, ]**F) -> None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
values
def values(
...
)
D.values() -> an object providing a view on D's values
NoCopyMixin
class NoCopyMixin(
/,
*args,
**kwargs
)
View Source
class NoCopyMixin:
"""Mixin to bypass (deep)copying.
This is useful for objects that are *intended* to be stateful and preserved, despite usually
preferring immutable data structures and Pydantic models, which (deep)copy often.
"""
def __copy__(self) -> Self:
return self # pragma: no cover
def __deepcopy__(self, memo: Any) -> Self:
return self # pragma: no cover
Descendants
- arti.internal.utils.NoCopyDict
- arti.backends.memory._NoCopyContainer
TypedBox
class TypedBox(
*args: Any,
default_box: bool = False,
default_box_attr: Any = <object object at 0x7f7a490f8080>,
default_box_none_transform: bool = True,
default_box_create_on_get: bool = True,
frozen_box: bool = False,
camel_killer_box: bool = False,
conversion_box: bool = True,
modify_tuples_box: bool = False,
box_safe_prefix: str = 'x',
box_duplicates: str = 'ignore',
box_intact_types: Union[Tuple, List] = (),
box_recast: Optional[Dict] = None,
box_dots: bool = False,
box_class: Union[Dict, Type[ForwardRef('Box')], NoneType] = None,
box_namespace: Tuple[str, ...] = (),
**kwargs: Any
)
View Source
class TypedBox(Box, MutableMapping[str, Union[_V, MutableMapping[str, _V]]]):
"""TypedBox holds a collection of typed values.
Subclasses must set the __target_type__ to a base class for the contained values.
"""
__target_type__: ClassVar[type[_V]] # type: ignore[misc]
@classmethod
def __class_getitem__(cls, item: type[_V]) -> GenericAlias:
if isinstance(item, tuple):
raise TypeError(f"{cls.__name__} expects a single value type")
value_type = item
return GenericAlias(
type(
cls.__name__,
(cls,),
{
"__module__": get_module_name(depth=2), # Set to our caller's module
"__target_type__": value_type,
},
),
item,
)
def __setattr__(self, key: str, value: Any) -> None:
# GenericAlias sets __orig_class__ after __init__, so preempt Box from storing that (or
# erroring if frozen).
if key == "__orig_class__":
return object.__setattr__(self, key, value)
super().__setattr__(key, value)
return None
def __cast_value(self, item: str, value: Any) -> _V:
if isinstance(value, self.__target_type__):
return value
tgt_name = self.__target_type__.__name__
if hasattr(self.__target_type__, "cast"):
casted = cast(Any, self.__target_type__).cast(value)
if isinstance(casted, self.__target_type__):
return casted
raise TypeError(
f"Expected {tgt_name}.cast({value}) to return an instance of {tgt_name}, got: {casted}"
)
raise TypeError(f"Expected an instance of {tgt_name}, got: {value}")
# NOTE: Box uses name mangling (double __) to prevent conflicts with contained values.
def _Box__convert_and_store(self, item: str, value: _V) -> None:
if isinstance(value, dict):
super()._Box__convert_and_store(item, value) # pylint: disable=no-member
elif item in self:
raise ValueError(f"{item} is already set!")
else:
super()._Box__convert_and_store(item, self.__cast_value(item, value))
def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:
for k, v in self.items():
subroot = (*root, k)
if isinstance(v, TypedBox):
yield from v.walk(root=subroot)
else:
yield ".".join(subroot), v # type: ignore[misc]
Ancestors (in MRO)
- box.box.Box
- builtins.dict
- collections.abc.MutableMapping
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Descendants
- arti.graphs.TypedBox
Static methods
from_json
def from_json(
json_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transform a json object string into a Box object. If the incoming
json is a list, you must use BoxList.from_json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_string | None | string to pass to json.loads |
None |
filename | None | filename to open and pass to json.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() or json.loads |
None |
Returns:
Type | Description |
---|---|
None | Box object from json data |
View Source
@classmethod
def from_json(
cls,
json_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transform a json object string into a Box object. If the incoming
json is a list, you must use BoxList.from_json.
:param json_string: string to pass to `json.loads`
:param filename: filename to open and pass to `json.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `json.loads`
:return: Box object from json data
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, **kwargs)
if not isinstance(data, dict):
raise BoxError(f"json data not returned as a dictionary, but rather a {type(data).__name__}")
return cls(data, **box_args)
from_msgpack
def from_msgpack(
msgpack_bytes: Optional[bytes] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
View Source
@classmethod
def from_msgpack(
cls,
msgpack_bytes: Optional[bytes] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')
from_toml
def from_toml(
toml_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transforms a toml string or file into a Box object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
toml_string | None | string to pass to toml.load |
None |
filename | None | filename to open and pass to toml.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() |
None |
Returns:
Type | Description |
---|---|
None | Box object |
View Source
@classmethod
def from_toml(
cls,
toml_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transforms a toml string or file into a Box object
:param toml_string: string to pass to `toml.load`
:param filename: filename to open and pass to `toml.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()`
:return: Box object
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_toml(toml_string=toml_string, filename=filename, encoding=encoding, errors=errors)
return cls(data, **box_args)
from_yaml
def from_yaml(
yaml_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transform a yaml object string into a Box object. By default will use SafeLoader.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
yaml_string | None | string to pass to yaml.load |
None |
filename | None | filename to open and pass to yaml.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() or yaml.load |
None |
Returns:
Type | Description |
---|---|
None | Box object from yaml data |
View Source
@classmethod
def from_yaml(
cls,
yaml_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transform a yaml object string into a Box object. By default will use SafeLoader.
:param yaml_string: string to pass to `yaml.load`
:param filename: filename to open and pass to `yaml.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `yaml.load`
:return: Box object from yaml data
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_yaml(yaml_string=yaml_string, filename=filename, encoding=encoding, errors=errors, **kwargs)
if not data:
return cls(**box_args)
if not isinstance(data, dict):
raise BoxError(f"yaml data not returned as a dictionary but rather a {type(data).__name__}")
return cls(data, **box_args)
Methods
clear
def clear(
self
)
D.clear() -> None. Remove all items from D.
View Source
def clear(self):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
super().clear()
self._box_config["__safe_keys"].clear()
copy
def copy(
self
) -> 'Box'
D.copy() -> a shallow copy of D
View Source
def copy(self) -> "Box":
config = self.__box_config()
config.pop("box_namespace") # Detach namespace; it will be reassigned if we nest again
return Box(super().copy(), **config)
fromkeys
def fromkeys(
iterable,
value=None,
/
)
Create a new dictionary with keys from iterable and values set to value.
get
def get(
self,
key,
default=<object object at 0x7f7a490f8080>
)
Return the value for key if key is in the dictionary, else default.
View Source
def get(self, key, default=NO_DEFAULT):
if key not in self:
if default is NO_DEFAULT:
if self._box_config["default_box"] and self._box_config["default_box_none_transform"]:
return self.__get_default(key)
else:
return None
if isinstance(default, dict) and not isinstance(default, Box):
return Box(default)
if isinstance(default, list) and not isinstance(default, box.BoxList):
return box.BoxList(default)
return default
return self[key]
items
def items(
self,
dotted: bool = False
)
D.items() -> a set-like object providing a view on D's items
View Source
def items(self, dotted: Union[bool] = False):
if not dotted:
return super().items()
if not self._box_config["box_dots"]:
raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")
return [(k, self[k]) for k in self.keys(dotted=True)]
keys
def keys(
self,
dotted: bool = False
)
D.keys() -> a set-like object providing a view on D's keys
View Source
def keys(self, dotted: Union[bool] = False):
if not dotted:
return super().keys()
if not self._box_config["box_dots"]:
raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")
keys = set()
for key, value in self.items():
added = False
if isinstance(key, str):
if isinstance(value, Box):
for sub_key in value.keys(dotted=True):
keys.add(f"{key}.{sub_key}")
added = True
elif isinstance(value, box.BoxList):
for pos in value._dotted_helper():
keys.add(f"{key}{pos}")
added = True
if not added:
keys.add(key)
return sorted(keys, key=lambda x: str(x))
merge_update
def merge_update(
self,
*args,
**kwargs
)
View Source
def merge_update(self, *args, **kwargs):
merge_type = None
if "box_merge_lists" in kwargs:
merge_type = kwargs.pop("box_merge_lists")
def convert_and_set(k, v):
intact_type = self._box_config["box_intact_types"] and isinstance(v, self._box_config["box_intact_types"])
if isinstance(v, dict) and not intact_type:
# Box objects must be created in case they are already
# in the `converted` box_config set
v = self._box_config["box_class"](v, **self.__box_config(extra_namespace=k))
if k in self and isinstance(self[k], dict):
self[k].merge_update(v)
return
if isinstance(v, list) and not intact_type:
v = box.BoxList(v, **self.__box_config(extra_namespace=k))
if merge_type == "extend" and k in self and isinstance(self[k], list):
self[k].extend(v)
return
if merge_type == "unique" and k in self and isinstance(self[k], list):
for item in v:
if item not in self[k]:
self[k].append(item)
return
self.__setitem__(k, v)
if (len(args) + int(bool(kwargs))) > 1:
raise BoxTypeError(f"merge_update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")
single_arg = next(iter(args), None)
if single_arg:
if hasattr(single_arg, "keys"):
for k in single_arg:
convert_and_set(k, single_arg[k])
else:
for k, v in single_arg:
convert_and_set(k, v)
for key in kwargs:
convert_and_set(key, kwargs[key])
pop
def pop(
self,
key,
*args
)
D.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
View Source
def pop(self, key, *args):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
if args:
if len(args) != 1:
raise BoxError('pop() takes only one optional argument "default"')
try:
item = self[key]
except KeyError:
return args[0]
else:
del self[key]
return item
try:
item = self[key]
except KeyError:
raise BoxKeyError(f"{key}") from None
else:
del self[key]
return item
popitem
def popitem(
self
)
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
View Source
def popitem(self):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
try:
key = next(self.__iter__())
except StopIteration:
raise BoxKeyError("Empty box") from None
return key, self.pop(key)
setdefault
def setdefault(
self,
item,
default=None
)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
View Source
def setdefault(self, item, default=None):
if item in self:
return self[item]
if self._box_config["box_dots"]:
if item in _get_dot_paths(self):
return self[item]
if isinstance(default, dict):
default = self._box_config["box_class"](default, **self.__box_config(extra_namespace=item))
if isinstance(default, list):
default = box.BoxList(default, **self.__box_config(extra_namespace=item))
self[item] = default
return self[item]
to_dict
def to_dict(
self
) -> Dict
Turn the Box and sub Boxes back into a native python dictionary.
Returns:
Type | Description |
---|---|
None | python dictionary of this Box |
View Source
def to_dict(self) -> Dict:
"""
Turn the Box and sub Boxes back into a native python dictionary.
:return: python dictionary of this Box
"""
out_dict = dict(self)
for k, v in out_dict.items():
if v is self:
out_dict[k] = out_dict
elif isinstance(v, Box):
out_dict[k] = v.to_dict()
elif isinstance(v, box.BoxList):
out_dict[k] = v.to_list()
return out_dict
to_json
def to_json(
self,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**json_kwargs
)
Transform the Box object into a JSON string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | If provided will save to file | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
json_kwargs | None | additional arguments to pass to json.dump(s) | None |
Returns:
Type | Description |
---|---|
None | string of JSON (if no filename provided) |
View Source
def to_json(
self,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**json_kwargs,
):
"""
Transform the Box object into a JSON string.
:param filename: If provided will save to file
:param encoding: File encoding
:param errors: How to handle encoding errors
:param json_kwargs: additional arguments to pass to json.dump(s)
:return: string of JSON (if no filename provided)
"""
return _to_json(self.to_dict(), filename=filename, encoding=encoding, errors=errors, **json_kwargs)
to_msgpack
def to_msgpack(
self,
filename: Union[str, os.PathLike, NoneType] = None,
**kwargs
)
View Source
def to_msgpack(self, filename: Optional[Union[str, PathLike]] = None, **kwargs):
raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')
to_toml
def to_toml(
self,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict'
)
Transform the Box object into a toml string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | File to write toml object too | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
Returns:
Type | Description |
---|---|
None | string of TOML (if no filename provided) |
View Source
def to_toml(
self, filename: Optional[Union[str, PathLike]] = None, encoding: str = "utf-8", errors: str = "strict"
):
"""
Transform the Box object into a toml string.
:param filename: File to write toml object too
:param encoding: File encoding
:param errors: How to handle encoding errors
:return: string of TOML (if no filename provided)
"""
return _to_toml(self.to_dict(), filename=filename, encoding=encoding, errors=errors)
to_yaml
def to_yaml(
self,
filename: Union[str, os.PathLike, NoneType] = None,
default_flow_style: bool = False,
encoding: str = 'utf-8',
errors: str = 'strict',
**yaml_kwargs
)
Transform the Box object into a YAML string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | If provided will save to file | None |
default_flow_style | None | False will recursively dump dicts | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
yaml_kwargs | None | additional arguments to pass to yaml.dump | None |
Returns:
Type | Description |
---|---|
None | string of YAML (if no filename provided) |
View Source
def to_yaml(
self,
filename: Optional[Union[str, PathLike]] = None,
default_flow_style: bool = False,
encoding: str = "utf-8",
errors: str = "strict",
**yaml_kwargs,
):
"""
Transform the Box object into a YAML string.
:param filename: If provided will save to file
:param default_flow_style: False will recursively dump dicts
:param encoding: File encoding
:param errors: How to handle encoding errors
:param yaml_kwargs: additional arguments to pass to yaml.dump
:return: string of YAML (if no filename provided)
"""
return _to_yaml(
self.to_dict(),
filename=filename,
default_flow_style=default_flow_style,
encoding=encoding,
errors=errors,
**yaml_kwargs,
)
update
def update(
self,
*args,
**kwargs
)
D.update([E, ]**F) -> None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
View Source
def update(self, *args, **kwargs):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
if (len(args) + int(bool(kwargs))) > 1:
raise BoxTypeError(f"update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")
single_arg = next(iter(args), None)
if single_arg:
if hasattr(single_arg, "keys"):
for k in single_arg:
self.__convert_and_store(k, single_arg[k])
else:
for k, v in single_arg:
self.__convert_and_store(k, v)
for k in kwargs:
self.__convert_and_store(k, kwargs[k])
values
def values(
...
)
D.values() -> an object providing a view on D's values
walk
def walk(
self,
root: 'tuple[str, ...]' = ()
) -> 'Iterator[tuple[str, _V]]'
View Source
def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:
for k, v in self.items():
subroot = (*root, k)
if isinstance(v, TypedBox):
yield from v.walk(root=subroot)
else:
yield ".".join(subroot), v # type: ignore[misc]
class_name
class class_name(
/,
*args,
**kwargs
)
View Source
class ClassName:
def __get__(self, obj: Any, type_: type[Any]) -> str:
return type_.__name__
frozendict
class frozendict(
arg: 'Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]]' = (),
**kwargs: '_V'
)
View Source
class frozendict(Mapping[_K, _V]):
def __init__(
self, arg: Union[Mapping[_K, _V], Iterable[tuple[_K, _V]]] = (), **kwargs: _V
) -> None:
self._data = dict[_K, _V](arg, **kwargs)
# Eagerly evaluate the hash to confirm elements are also frozen (via frozenset) at
# creation time, not just when hashed.
self._hash = hash(frozenset(self._data.items()))
def __getitem__(self, key: _K) -> _V:
return self._data[key]
def __hash__(self) -> int:
return self._hash
def __iter__(self) -> Iterator[_K]:
return iter(self._data)
def __len__(self) -> int:
return len(self._data)
def __or__(self, other: Mapping[_K, _V]) -> frozendict[_K, _V]:
return type(self)({**self, **other})
__ror__ = __or__
def __repr__(self) -> str:
return repr(self._data)
Ancestors (in MRO)
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Methods
get
def get(
self,
key,
default=None
)
D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.
items
def items(
self
)
D.items() -> a set-like object providing a view on D's items
keys
def keys(
self
)
D.keys() -> a set-like object providing a view on D's keys
values
def values(
self
)
D.values() -> an object providing a view on D's values
int64
class int64(
/,
*args,
**kwargs
)
View Source
class int64(_int):
_min, _max = -(2**63), (2**63) - 1
def __new__(cls, i: Union[int, int64, uint64]) -> int64:
if i > cls._max:
if isinstance(i, uint64):
i = int(i) - uint64._max - 1
else:
raise ValueError(f"{i} is too large for int64. Hint: cast to uint64 first.")
if i < cls._min:
raise ValueError(f"{i} is too small for int64.")
return super().__new__(cls, i)
Ancestors (in MRO)
- arti.internal.utils._int
- builtins.int
Class variables
denominator
imag
numerator
real
Methods
as_integer_ratio
def as_integer_ratio(
self,
/
)
Return integer ratio.
Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.
(10).as_integer_ratio() (10, 1) (-10).as_integer_ratio() (-10, 1) (0).as_integer_ratio() (0, 1)
bit_count
def bit_count(
self,
/
)
Number of ones in the binary representation of the absolute value of self.
Also known as the population count.
bin(13) '0b1101' (13).bit_count() 3
bit_length
def bit_length(
self,
/
)
Number of bits necessary to represent self in binary.
bin(37) '0b100101' (37).bit_length() 6
conjugate
def conjugate(
...
)
Returns self, the complex conjugate of any int.
from_bytes
def from_bytes(
bytes,
byteorder='big',
*,
signed=False
)
Return the integer represented by the given array of bytes.
bytes Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Indicates whether two's complement is used to represent the integer.
to_bytes
def to_bytes(
self,
/,
length=1,
byteorder='big',
*,
signed=False
)
Return an array of bytes representing an integer.
length Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Determines whether two's complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.
uint64
class uint64(
/,
*args,
**kwargs
)
View Source
class uint64(_int):
_min, _max = 0, (2**64) - 1
def __new__(cls, i: Union[int, int64, uint64]) -> uint64:
if i > cls._max:
raise ValueError(f"{i} is too large for uint64.")
if i < cls._min:
if isinstance(i, int64):
i = int(i) + cls._max + 1
else:
raise ValueError(f"{i} is negative. Hint: cast to int64 first.")
return super().__new__(cls, i)
Ancestors (in MRO)
- arti.internal.utils._int
- builtins.int
Class variables
denominator
imag
numerator
real
Methods
as_integer_ratio
def as_integer_ratio(
self,
/
)
Return integer ratio.
Return a pair of integers, whose ratio is exactly equal to the original int and with a positive denominator.
(10).as_integer_ratio() (10, 1) (-10).as_integer_ratio() (-10, 1) (0).as_integer_ratio() (0, 1)
bit_count
def bit_count(
self,
/
)
Number of ones in the binary representation of the absolute value of self.
Also known as the population count.
bin(13) '0b1101' (13).bit_count() 3
bit_length
def bit_length(
self,
/
)
Number of bits necessary to represent self in binary.
bin(37) '0b100101' (37).bit_length() 6
conjugate
def conjugate(
...
)
Returns self, the complex conjugate of any int.
from_bytes
def from_bytes(
bytes,
byteorder='big',
*,
signed=False
)
Return the integer represented by the given array of bytes.
bytes Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Indicates whether two's complement is used to represent the integer.
to_bytes
def to_bytes(
self,
/,
length=1,
byteorder='big',
*,
signed=False
)
Return an array of bytes representing an integer.
length Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1. byteorder The byte order used to represent the integer. If byteorder is 'big', the most significant byte is at the beginning of the byte array. If byteorder is 'little', the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder' as the byte order value. Default is to use 'big'. signed Determines whether two's complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.