Module arti.types.pyarrow
None
None
View Source
from __future__ import annotations
import json
from collections.abc import Callable
from typing import Any, cast
import pyarrow as pa
from arti import Type, TypeAdapter, TypeSystem, types
from arti.internal.utils import classproperty
pyarrow_type_system = TypeSystem(key="pyarrow")
# Not implemented:
# decimal128(int precision, int scale=0),
# dictionary(index_type, value_type, …),
# large_binary(),
# large_list(value_type),
# large_string(),
class _PyarrowTypeAdapter(TypeAdapter):
@classproperty
def _is_system(cls) -> Callable[[pa.DataType], bool]:
return getattr(pa.types, f"is_{cls.system.__name__}") # type: ignore[no-any-return]
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph()
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, pa.DataType) and cls._is_system(type_)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
return cls.system()
def _gen_adapter(*, artigraph: type[Type], system: Any, priority: int = 0) -> type[TypeAdapter]:
return pyarrow_type_system.register_adapter(
type(
f"Pyarrow{system.__name__}",
(_PyarrowTypeAdapter,),
{"artigraph": artigraph, "system": system, "priority": priority},
)
)
_gen_adapter(artigraph=types.String, system=pa.string)
_gen_adapter(artigraph=types.Null, system=pa.null)
# Date matching requires `priority=_precision` since it is not 1:1, but the float/int ones are.
for _precision in (32, 64):
_gen_adapter(
artigraph=types.Date,
system=getattr(pa, f"date{_precision}"),
priority=_precision,
)
for _precision in (16, 32, 64):
_gen_adapter(
artigraph=getattr(types, f"Float{_precision}"),
system=getattr(pa, f"float{_precision}"),
)
for _precision in (8, 16, 32, 64):
_gen_adapter(
artigraph=getattr(types, f"Int{_precision}"),
system=getattr(pa, f"int{_precision}"),
)
_gen_adapter(
artigraph=getattr(types, f"UInt{_precision}"),
system=getattr(pa, f"uint{_precision}"),
)
@pyarrow_type_system.register_adapter
class BinaryTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Binary
system = pa.binary
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if isinstance(type_, pa.FixedSizeBinaryType):
return cls.artigraph(byte_size=type_.byte_width)
return cls.artigraph()
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...
# but pa.types.is_binary only checks for DataType(binary).
return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)
# The pyarrow bool constructor and checker have different names
@pyarrow_type_system.register_adapter
class BoolTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Boolean
system = pa.bool_
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_boolean(type_))
@pyarrow_type_system.register_adapter
class GeographyTypeAdapter(_PyarrowTypeAdapter):
# TODO: Can we do something with pa.field metadata to round trip (eg: format, srid, etc) or
# infer GeoParquet?
artigraph = types.Geography
system = pa.string # or pa.binary if geography.format == "WKB"
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# We don't have any metadata to differentiate normal strings from geographies, so avoid
# matching. This will prevent round tripping.
return False
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return pa.binary() if type_.format == "WKB" else pa.string()
@pyarrow_type_system.register_adapter
class ListTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.List
system = pa.list_
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
element=type_system.to_artigraph(type_.value_type, hints=hints),
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_list(type_))
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(value_type=type_system.to_system(type_.element, hints=hints))
@pyarrow_type_system.register_adapter
class MapTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Map
system = pa.map_
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
key=type_system.to_artigraph(type_.key_type, hints=hints),
value=type_system.to_artigraph(type_.item_type, hints=hints),
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_map(type_))
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
key_type=type_system.to_system(type_.key, hints=hints),
item_type=type_system.to_system(type_.value, hints=hints),
)
@pyarrow_type_system.register_adapter
class StructTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Struct
system = pa.struct
@classmethod
def _field_to_artigraph(
cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem
) -> Type:
ret = type_system.to_artigraph(type_.type, hints=hints)
if type_.nullable != ret.nullable: # Avoid setting nullable if matching to minimize repr
ret = ret.copy(update={"nullable": type_.nullable})
return ret
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
fields={
field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)
for field in type_
}
)
@classmethod
def _field_to_system(
cls, name: str, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem
) -> Any:
return pa.field(name, type_system.to_system(type_, hints=hints), nullable=type_.nullable)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
[
cls._field_to_system(name, subtype, hints=hints, type_system=type_system)
for name, subtype in type_.fields.items()
]
)
# NOTE: pa.schema and pa.struct are structurally similar, but pa.schema has additional attributes
# (eg: .metadata) and cannot be nested (like Collection).
@pyarrow_type_system.register_adapter
class SchemaTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Collection
system = pa.schema
priority = ListTypeAdapter.priority + 1
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
# Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary
# metadata)
return super().matches_artigraph(type_=type_, hints=hints) and isinstance(
type_.element, types.Struct # type: ignore[attr-defined]
)
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
kwargs = {}
# NOTE: pyarrow converts all metadata keys/values to bytes
if type_.metadata and b"artigraph" in type_.metadata:
kwargs = json.loads(type_.metadata[b"artigraph"].decode())
for key in ["partition_by", "cluster_by"]:
if key in kwargs: # pragma: no cover
kwargs[key] = tuple(kwargs[key])
return cls.artigraph(
element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),
**kwargs,
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, pa.lib.Schema)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
assert isinstance(type_.element, types.Struct)
return cls.system(
StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),
metadata={
"artigraph": json.dumps(
{
"name": type_.name,
"partition_by": type_.partition_by,
"cluster_by": type_.cluster_by,
}
)
},
)
class _BaseTimeTypeAdapter(_PyarrowTypeAdapter):
precision_to_unit = {
"second": "s",
"millisecond": "ms",
"microsecond": "us",
"nanosecond": "ns",
}
@classproperty
def unit_to_precision(cls) -> dict[str, str]:
return {v: k for k, v in cls.precision_to_unit.items()}
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if (precision := cls.unit_to_precision.get(type_.unit)) is None: # pragma: no cover
raise ValueError(
f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"
)
assert issubclass(cls.artigraph, types._TimeMixin)
return cls.artigraph(precision=precision)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
precision = type_.precision # type: ignore[attr-defined]
if (unit := cls.precision_to_unit.get(precision)) is None: # pragma: no cover
raise ValueError(
f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"
)
return cls.system(unit)
@pyarrow_type_system.register_adapter
class DateTimeTypeAdapter(_BaseTimeTypeAdapter):
artigraph = types.DateTime
system = pa.timestamp
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is None
@pyarrow_type_system.register_adapter
class TimestampTypeAdapter(_BaseTimeTypeAdapter):
artigraph = types.Timestamp
system = pa.timestamp
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
tz = type_.tz.upper()
if tz != "UTC":
raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")
return super().to_artigraph(type_, hints=hints, type_system=type_system)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is not None
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
ts = super().to_system(type_, hints=hints, type_system=type_system)
return cls.system(ts.unit, "UTC")
class _BaseSizedTimeTypeAdapter(_BaseTimeTypeAdapter):
artigraph = types.Time
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return (
super().matches_artigraph(type_=type_, hints=hints)
and type_.precision in cls.precision_to_unit # type: ignore[attr-defined]
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return (
super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision
)
@pyarrow_type_system.register_adapter
class Time32TypeAdapter(_BaseSizedTimeTypeAdapter):
precision_to_unit = {
"second": "s",
"millisecond": "ms",
}
system = pa.time32
@pyarrow_type_system.register_adapter
class Time64TypeAdapter(_BaseSizedTimeTypeAdapter):
precision_to_unit = {
"microsecond": "us",
"nanosecond": "ns",
}
system = pa.time64
Variables
pyarrow_type_system
Classes
BinaryTypeAdapter
class BinaryTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class BinaryTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Binary
system = pa.binary
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if isinstance(type_, pa.FixedSizeBinaryType):
return cls.artigraph(byte_size=type_.byte_width)
return cls.artigraph()
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...
# but pa.types.is_binary only checks for DataType(binary).
return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...
# but pa.types.is_binary only checks for DataType(binary).
return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if isinstance(type_, pa.FixedSizeBinaryType):
return cls.artigraph(byte_size=type_.byte_width)
return cls.artigraph()
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)
Methods
system
def system(
...
)
binary(int length=-1)
Create variable-length binary type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
length | int, optional, default -1 | If length == -1 then return a variable length binary type. If length is | |
greater than or equal to 0 then return a fixed size binary type of | |||
width length . |
None |
BoolTypeAdapter
class BoolTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class BoolTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Boolean
system = pa.bool_
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_boolean(type_))
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_boolean(type_))
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph()
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
return cls.system()
Methods
system
def system(
...
)
bool_()
Create instance of boolean type.
DateTimeTypeAdapter
class DateTimeTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class DateTimeTypeAdapter(_BaseTimeTypeAdapter):
artigraph = types.DateTime
system = pa.timestamp
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is None
Ancestors (in MRO)
- arti.types.pyarrow._BaseTimeTypeAdapter
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
precision_to_unit
priority
unit_to_precision
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is None
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if (precision := cls.unit_to_precision.get(type_.unit)) is None: # pragma: no cover
raise ValueError(
f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"
)
assert issubclass(cls.artigraph, types._TimeMixin)
return cls.artigraph(precision=precision)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
precision = type_.precision # type: ignore[attr-defined]
if (unit := cls.precision_to_unit.get(precision)) is None: # pragma: no cover
raise ValueError(
f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"
)
return cls.system(unit)
Methods
system
def system(
...
)
timestamp(unit, tz=None)
Create instance of timestamp type with resolution and optional time zone.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit | str | one of 's' [second], 'ms' [millisecond], 'us' [microsecond], or 'ns' | |
[nanosecond] | None | ||
tz | str, default None | Time zone name. None indicates time zone naive | None |
Returns:
Type | Description |
---|---|
TimestampType | None |
GeographyTypeAdapter
class GeographyTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class GeographyTypeAdapter(_PyarrowTypeAdapter):
# TODO: Can we do something with pa.field metadata to round trip (eg: format, srid, etc) or
# infer GeoParquet?
artigraph = types.Geography
system = pa.string # or pa.binary if geography.format == "WKB"
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# We don't have any metadata to differentiate normal strings from geographies, so avoid
# matching. This will prevent round tripping.
return False
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return pa.binary() if type_.format == "WKB" else pa.string()
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
# We don't have any metadata to differentiate normal strings from geographies, so avoid
# matching. This will prevent round tripping.
return False
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph()
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return pa.binary() if type_.format == "WKB" else pa.string()
Methods
system
def system(
...
)
string()
Create UTF8 variable-length string type.
ListTypeAdapter
class ListTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class ListTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.List
system = pa.list_
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
element=type_system.to_artigraph(type_.value_type, hints=hints),
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_list(type_))
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(value_type=type_system.to_system(type_.element, hints=hints))
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_list(type_))
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
element=type_system.to_artigraph(type_.value_type, hints=hints),
)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(value_type=type_system.to_system(type_.element, hints=hints))
Methods
system
def system(
...
)
list_(value_type, int list_size=-1)
Create ListType instance from child data type or field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type | DataType or Field | None | None |
list_size | int, optional, default -1 | If length == -1 then return a variable length list type. If length is | |
greater than or equal to 0 then return a fixed size list type. | None |
Returns:
Type | Description |
---|---|
DataType | None |
MapTypeAdapter
class MapTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class MapTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Map
system = pa.map_
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
key=type_system.to_artigraph(type_.key_type, hints=hints),
value=type_system.to_artigraph(type_.item_type, hints=hints),
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_map(type_))
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
key_type=type_system.to_system(type_.key, hints=hints),
item_type=type_system.to_system(type_.value, hints=hints),
)
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return cast(bool, pa.types.is_map(type_))
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
key=type_system.to_artigraph(type_.key_type, hints=hints),
value=type_system.to_artigraph(type_.item_type, hints=hints),
)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
key_type=type_system.to_system(type_.key, hints=hints),
item_type=type_system.to_system(type_.value, hints=hints),
)
Methods
system
def system(
...
)
map_(key_type, item_type, keys_sorted=False) -> MapType
Create MapType instance from key and item data types or fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key_type | DataType | None | None |
item_type | DataType | None | None |
keys_sorted | bool | None | None |
Returns:
Type | Description |
---|---|
DataType | None |
SchemaTypeAdapter
class SchemaTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class SchemaTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Collection
system = pa.schema
priority = ListTypeAdapter.priority + 1
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
# Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary
# metadata)
return super().matches_artigraph(type_=type_, hints=hints) and isinstance(
type_.element, types.Struct # type: ignore[attr-defined]
)
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
kwargs = {}
# NOTE: pyarrow converts all metadata keys/values to bytes
if type_.metadata and b"artigraph" in type_.metadata:
kwargs = json.loads(type_.metadata[b"artigraph"].decode())
for key in ["partition_by", "cluster_by"]:
if key in kwargs: # pragma: no cover
kwargs[key] = tuple(kwargs[key])
return cls.artigraph(
element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),
**kwargs,
)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, pa.lib.Schema)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
assert isinstance(type_.element, types.Struct)
return cls.system(
StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),
metadata={
"artigraph": json.dumps(
{
"name": type_.name,
"partition_by": type_.partition_by,
"cluster_by": type_.cluster_by,
}
)
},
)
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
# Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary
# metadata)
return super().matches_artigraph(type_=type_, hints=hints) and isinstance(
type_.element, types.Struct # type: ignore[attr-defined]
)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, pa.lib.Schema)
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
kwargs = {}
# NOTE: pyarrow converts all metadata keys/values to bytes
if type_.metadata and b"artigraph" in type_.metadata:
kwargs = json.loads(type_.metadata[b"artigraph"].decode())
for key in ["partition_by", "cluster_by"]:
if key in kwargs: # pragma: no cover
kwargs[key] = tuple(kwargs[key])
return cls.artigraph(
element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),
**kwargs,
)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
assert isinstance(type_.element, types.Struct)
return cls.system(
StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),
metadata={
"artigraph": json.dumps(
{
"name": type_.name,
"partition_by": type_.partition_by,
"cluster_by": type_.cluster_by,
}
)
},
)
Methods
system
def system(
...
)
schema(fields, metadata=None)
Construct pyarrow.Schema from collection of fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fields | iterable of Fields or tuples, or mapping of strings to DataTypes | None | None |
metadata | dict, default None | Keys and values must be coercible to bytes. | None |
Returns:
Type | Description |
---|---|
pyarrow.Schema | None |
StructTypeAdapter
class StructTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class StructTypeAdapter(_PyarrowTypeAdapter):
artigraph = types.Struct
system = pa.struct
@classmethod
def _field_to_artigraph(
cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem
) -> Type:
ret = type_system.to_artigraph(type_.type, hints=hints)
if type_.nullable != ret.nullable: # Avoid setting nullable if matching to minimize repr
ret = ret.copy(update={"nullable": type_.nullable})
return ret
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
fields={
field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)
for field in type_
}
)
@classmethod
def _field_to_system(
cls, name: str, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem
) -> Any:
return pa.field(name, type_system.to_system(type_, hints=hints), nullable=type_.nullable)
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
[
cls._field_to_system(name, subtype, hints=hints, type_system=type_system)
for name, subtype in type_.fields.items()
]
)
Ancestors (in MRO)
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, pa.DataType) and cls._is_system(type_)
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
return cls.artigraph(
fields={
field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)
for field in type_
}
)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
assert isinstance(type_, cls.artigraph)
return cls.system(
[
cls._field_to_system(name, subtype, hints=hints, type_system=type_system)
for name, subtype in type_.fields.items()
]
)
Methods
system
def system(
...
)
struct(fields)
Create StructType instance from fields.
A struct is a nested type parameterized by an ordered sequence of types (which can all be distinct), called its fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fields | iterable of Fields or tuples, or mapping of strings to DataTypes | Each field must have a UTF8-encoded name, and these field names are | |
part of the type metadata. | None |
Returns:
Type | Description |
---|---|
DataType | None |
Time32TypeAdapter
class Time32TypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class Time32TypeAdapter(_BaseSizedTimeTypeAdapter):
precision_to_unit = {
"second": "s",
"millisecond": "ms",
}
system = pa.time32
Ancestors (in MRO)
- arti.types.pyarrow._BaseSizedTimeTypeAdapter
- arti.types.pyarrow._BaseTimeTypeAdapter
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
precision_to_unit
priority
unit_to_precision
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return (
super().matches_artigraph(type_=type_, hints=hints)
and type_.precision in cls.precision_to_unit # type: ignore[attr-defined]
)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return (
super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision
)
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if (precision := cls.unit_to_precision.get(type_.unit)) is None: # pragma: no cover
raise ValueError(
f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"
)
assert issubclass(cls.artigraph, types._TimeMixin)
return cls.artigraph(precision=precision)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
precision = type_.precision # type: ignore[attr-defined]
if (unit := cls.precision_to_unit.get(precision)) is None: # pragma: no cover
raise ValueError(
f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"
)
return cls.system(unit)
Methods
system
def system(
...
)
time32(unit)
Create instance of 32-bit time (time of day) type with unit resolution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit | str | one of 's' [second], or 'ms' [millisecond] | None |
Returns:
Type | Description |
---|---|
pyarrow.Time32Type | None |
Time64TypeAdapter
class Time64TypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class Time64TypeAdapter(_BaseSizedTimeTypeAdapter):
precision_to_unit = {
"microsecond": "us",
"nanosecond": "ns",
}
system = pa.time64
Ancestors (in MRO)
- arti.types.pyarrow._BaseSizedTimeTypeAdapter
- arti.types.pyarrow._BaseTimeTypeAdapter
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
precision_to_unit
priority
unit_to_precision
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return (
super().matches_artigraph(type_=type_, hints=hints)
and type_.precision in cls.precision_to_unit # type: ignore[attr-defined]
)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return (
super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision
)
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
if (precision := cls.unit_to_precision.get(type_.unit)) is None: # pragma: no cover
raise ValueError(
f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"
)
assert issubclass(cls.artigraph, types._TimeMixin)
return cls.artigraph(precision=precision)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
precision = type_.precision # type: ignore[attr-defined]
if (unit := cls.precision_to_unit.get(precision)) is None: # pragma: no cover
raise ValueError(
f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"
)
return cls.system(unit)
Methods
system
def system(
...
)
time64(unit)
Create instance of 64-bit time (time of day) type with unit resolution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit | str | One of 'us' [microsecond], or 'ns' [nanosecond]. | None |
Returns:
Type | Description |
---|---|
pyarrow.Time64Type | None |
TimestampTypeAdapter
class TimestampTypeAdapter(
/,
*args,
**kwargs
)
View Source
@pyarrow_type_system.register_adapter
class TimestampTypeAdapter(_BaseTimeTypeAdapter):
artigraph = types.Timestamp
system = pa.timestamp
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
tz = type_.tz.upper()
if tz != "UTC":
raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")
return super().to_artigraph(type_, hints=hints, type_system=type_system)
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is not None
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
ts = super().to_system(type_, hints=hints, type_system=type_system)
return cls.system(ts.unit, "UTC")
Ancestors (in MRO)
- arti.types.pyarrow._BaseTimeTypeAdapter
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.TypeAdapter
Class variables
artigraph
key
precision_to_unit
priority
unit_to_precision
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
return super().matches_system(type_, hints=hints) and type_.tz is not None
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
tz = type_.tz.upper()
if tz != "UTC":
raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")
return super().to_artigraph(type_, hints=hints, type_system=type_system)
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
ts = super().to_system(type_, hints=hints, type_system=type_system)
return cls.system(ts.unit, "UTC")
Methods
system
def system(
...
)
timestamp(unit, tz=None)
Create instance of timestamp type with resolution and optional time zone.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit | str | one of 's' [second], 'ms' [millisecond], 'us' [microsecond], or 'ns' | |
[nanosecond] | None | ||
tz | str, default None | Time zone name. None indicates time zone naive | None |
Returns:
Type | Description |
---|---|
TimestampType | None |