Skip to content

Module arti.types.pyarrow

None

None

View Source
from __future__ import annotations

import json

from collections.abc import Callable

from typing import Any, cast

import pyarrow as pa

from arti import Type, TypeAdapter, TypeSystem, types

from arti.internal.utils import classproperty

pyarrow_type_system = TypeSystem(key="pyarrow")

# Not implemented:

#     decimal128(int precision, int scale=0),

#     dictionary(index_type, value_type, …),

#     large_binary(),

#     large_list(value_type),

#     large_string(),

class _PyarrowTypeAdapter(TypeAdapter):

    @classproperty

    def _is_system(cls) -> Callable[[pa.DataType], bool]:

        return getattr(pa.types, f"is_{cls.system.__name__}")  # type: ignore[no-any-return]

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph()

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, pa.DataType) and cls._is_system(type_)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return cls.system()

def _gen_adapter(*, artigraph: type[Type], system: Any, priority: int = 0) -> type[TypeAdapter]:

    return pyarrow_type_system.register_adapter(

        type(

            f"Pyarrow{system.__name__}",

            (_PyarrowTypeAdapter,),

            {"artigraph": artigraph, "system": system, "priority": priority},

        )

    )

_gen_adapter(artigraph=types.String, system=pa.string)

_gen_adapter(artigraph=types.Null, system=pa.null)

# Date matching requires `priority=_precision` since it is not 1:1, but the float/int ones are.

for _precision in (32, 64):

    _gen_adapter(

        artigraph=types.Date,

        system=getattr(pa, f"date{_precision}"),

        priority=_precision,

    )

for _precision in (16, 32, 64):

    _gen_adapter(

        artigraph=getattr(types, f"Float{_precision}"),

        system=getattr(pa, f"float{_precision}"),

    )

for _precision in (8, 16, 32, 64):

    _gen_adapter(

        artigraph=getattr(types, f"Int{_precision}"),

        system=getattr(pa, f"int{_precision}"),

    )

    _gen_adapter(

        artigraph=getattr(types, f"UInt{_precision}"),

        system=getattr(pa, f"uint{_precision}"),

    )

@pyarrow_type_system.register_adapter

class BinaryTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Binary

    system = pa.binary

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if isinstance(type_, pa.FixedSizeBinaryType):

            return cls.artigraph(byte_size=type_.byte_width)

        return cls.artigraph()

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...

        # but pa.types.is_binary only checks for DataType(binary).

        return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)

# The pyarrow bool constructor and checker have different names

@pyarrow_type_system.register_adapter

class BoolTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Boolean

    system = pa.bool_

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_boolean(type_))

@pyarrow_type_system.register_adapter

class GeographyTypeAdapter(_PyarrowTypeAdapter):

    # TODO: Can we do something with pa.field metadata to round trip (eg: format, srid, etc) or

    # infer GeoParquet?

    artigraph = types.Geography

    system = pa.string  # or pa.binary if geography.format == "WKB"

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # We don't have any metadata to differentiate normal strings from geographies, so avoid

        # matching. This will prevent round tripping.

        return False

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return pa.binary() if type_.format == "WKB" else pa.string()

@pyarrow_type_system.register_adapter

class ListTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.List

    system = pa.list_

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            element=type_system.to_artigraph(type_.value_type, hints=hints),

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_list(type_))

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(value_type=type_system.to_system(type_.element, hints=hints))

@pyarrow_type_system.register_adapter

class MapTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Map

    system = pa.map_

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            key=type_system.to_artigraph(type_.key_type, hints=hints),

            value=type_system.to_artigraph(type_.item_type, hints=hints),

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_map(type_))

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            key_type=type_system.to_system(type_.key, hints=hints),

            item_type=type_system.to_system(type_.value, hints=hints),

        )

@pyarrow_type_system.register_adapter

class StructTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Struct

    system = pa.struct

    @classmethod

    def _field_to_artigraph(

        cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        ret = type_system.to_artigraph(type_.type, hints=hints)

        if type_.nullable != ret.nullable:  # Avoid setting nullable if matching to minimize repr

            ret = ret.copy(update={"nullable": type_.nullable})

        return ret

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            fields={

                field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)

                for field in type_

            }

        )

    @classmethod

    def _field_to_system(

        cls, name: str, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Any:

        return pa.field(name, type_system.to_system(type_, hints=hints), nullable=type_.nullable)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            [

                cls._field_to_system(name, subtype, hints=hints, type_system=type_system)

                for name, subtype in type_.fields.items()

            ]

        )

# NOTE: pa.schema and pa.struct are structurally similar, but pa.schema has additional attributes

# (eg: .metadata) and cannot be nested (like Collection).

@pyarrow_type_system.register_adapter

class SchemaTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Collection

    system = pa.schema

    priority = ListTypeAdapter.priority + 1

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary

        # metadata)

        return super().matches_artigraph(type_=type_, hints=hints) and isinstance(

            type_.element, types.Struct  # type: ignore[attr-defined]

        )

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        kwargs = {}

        # NOTE: pyarrow converts all metadata keys/values to bytes

        if type_.metadata and b"artigraph" in type_.metadata:

            kwargs = json.loads(type_.metadata[b"artigraph"].decode())

            for key in ["partition_by", "cluster_by"]:

                if key in kwargs:  # pragma: no cover

                    kwargs[key] = tuple(kwargs[key])

        return cls.artigraph(

            element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),

            **kwargs,

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, pa.lib.Schema)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        return cls.system(

            StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),

            metadata={

                "artigraph": json.dumps(

                    {

                        "name": type_.name,

                        "partition_by": type_.partition_by,

                        "cluster_by": type_.cluster_by,

                    }

                )

            },

        )

class _BaseTimeTypeAdapter(_PyarrowTypeAdapter):

    precision_to_unit = {

        "second": "s",

        "millisecond": "ms",

        "microsecond": "us",

        "nanosecond": "ns",

    }

    @classproperty

    def unit_to_precision(cls) -> dict[str, str]:

        return {v: k for k, v in cls.precision_to_unit.items()}

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if (precision := cls.unit_to_precision.get(type_.unit)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"

            )

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(precision=precision)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        precision = type_.precision  # type: ignore[attr-defined]

        if (unit := cls.precision_to_unit.get(precision)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"

            )

        return cls.system(unit)

@pyarrow_type_system.register_adapter

class DateTimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.DateTime

    system = pa.timestamp

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is None

@pyarrow_type_system.register_adapter

class TimestampTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Timestamp

    system = pa.timestamp

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        tz = type_.tz.upper()

        if tz != "UTC":

            raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")

        return super().to_artigraph(type_, hints=hints, type_system=type_system)

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is not None

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        ts = super().to_system(type_, hints=hints, type_system=type_system)

        return cls.system(ts.unit, "UTC")

class _BaseSizedTimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Time

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_artigraph(type_=type_, hints=hints)

            and type_.precision in cls.precision_to_unit  # type: ignore[attr-defined]

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision

        )

@pyarrow_type_system.register_adapter

class Time32TypeAdapter(_BaseSizedTimeTypeAdapter):

    precision_to_unit = {

        "second": "s",

        "millisecond": "ms",

    }

    system = pa.time32

@pyarrow_type_system.register_adapter

class Time64TypeAdapter(_BaseSizedTimeTypeAdapter):

    precision_to_unit = {

        "microsecond": "us",

        "nanosecond": "ns",

    }

    system = pa.time64

Variables

pyarrow_type_system

Classes

BinaryTypeAdapter

class BinaryTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class BinaryTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Binary

    system = pa.binary

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if isinstance(type_, pa.FixedSizeBinaryType):

            return cls.artigraph(byte_size=type_.byte_width)

        return cls.artigraph()

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...

        # but pa.types.is_binary only checks for DataType(binary).

        return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # pa.binary returns a DataType(binary) when length=-1, otherwise a FixedSizeBinaryType...

        # but pa.types.is_binary only checks for DataType(binary).

        return super().matches_system(type_, hints=hints) or pa.types.is_fixed_size_binary(type_)

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if isinstance(type_, pa.FixedSizeBinaryType):

            return cls.artigraph(byte_size=type_.byte_width)

        return cls.artigraph()

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(length=-1 if type_.byte_size is None else type_.byte_size)

Methods

system

def system(
    ...
)

binary(int length=-1)

Create variable-length binary type.

Parameters:

Name Type Description Default
length int, optional, default -1 If length == -1 then return a variable length binary type. If length is
greater than or equal to 0 then return a fixed size binary type of
width length. None

BoolTypeAdapter

class BoolTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class BoolTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Boolean

    system = pa.bool_

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_boolean(type_))

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_boolean(type_))

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph()

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return cls.system()

Methods

system

def system(
    ...
)

bool_()

Create instance of boolean type.

DateTimeTypeAdapter

class DateTimeTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class DateTimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.DateTime

    system = pa.timestamp

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is None

Ancestors (in MRO)

  • arti.types.pyarrow._BaseTimeTypeAdapter
  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision_to_unit
priority
unit_to_precision

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is None

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if (precision := cls.unit_to_precision.get(type_.unit)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"

            )

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(precision=precision)

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        precision = type_.precision  # type: ignore[attr-defined]

        if (unit := cls.precision_to_unit.get(precision)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"

            )

        return cls.system(unit)

Methods

system

def system(
    ...
)

timestamp(unit, tz=None)

Create instance of timestamp type with resolution and optional time zone.

Parameters:

Name Type Description Default
unit str one of 's' [second], 'ms' [millisecond], 'us' [microsecond], or 'ns'
[nanosecond] None
tz str, default None Time zone name. None indicates time zone naive None

Returns:

Type Description
TimestampType None

GeographyTypeAdapter

class GeographyTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class GeographyTypeAdapter(_PyarrowTypeAdapter):

    # TODO: Can we do something with pa.field metadata to round trip (eg: format, srid, etc) or

    # infer GeoParquet?

    artigraph = types.Geography

    system = pa.string  # or pa.binary if geography.format == "WKB"

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # We don't have any metadata to differentiate normal strings from geographies, so avoid

        # matching. This will prevent round tripping.

        return False

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return pa.binary() if type_.format == "WKB" else pa.string()

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        # We don't have any metadata to differentiate normal strings from geographies, so avoid

        # matching. This will prevent round tripping.

        return False

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph()

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return pa.binary() if type_.format == "WKB" else pa.string()

Methods

system

def system(
    ...
)

string()

Create UTF8 variable-length string type.

ListTypeAdapter

class ListTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class ListTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.List

    system = pa.list_

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            element=type_system.to_artigraph(type_.value_type, hints=hints),

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_list(type_))

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(value_type=type_system.to_system(type_.element, hints=hints))

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_list(type_))

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            element=type_system.to_artigraph(type_.value_type, hints=hints),

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(value_type=type_system.to_system(type_.element, hints=hints))

Methods

system

def system(
    ...
)

list_(value_type, int list_size=-1)

Create ListType instance from child data type or field.

Parameters:

Name Type Description Default
value_type DataType or Field None None
list_size int, optional, default -1 If length == -1 then return a variable length list type. If length is
greater than or equal to 0 then return a fixed size list type. None

Returns:

Type Description
DataType None

MapTypeAdapter

class MapTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class MapTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Map

    system = pa.map_

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            key=type_system.to_artigraph(type_.key_type, hints=hints),

            value=type_system.to_artigraph(type_.item_type, hints=hints),

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_map(type_))

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            key_type=type_system.to_system(type_.key, hints=hints),

            item_type=type_system.to_system(type_.value, hints=hints),

        )

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return cast(bool, pa.types.is_map(type_))

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            key=type_system.to_artigraph(type_.key_type, hints=hints),

            value=type_system.to_artigraph(type_.item_type, hints=hints),

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            key_type=type_system.to_system(type_.key, hints=hints),

            item_type=type_system.to_system(type_.value, hints=hints),

        )

Methods

system

def system(
    ...
)

map_(key_type, item_type, keys_sorted=False) -> MapType

Create MapType instance from key and item data types or fields.

Parameters:

Name Type Description Default
key_type DataType None None
item_type DataType None None
keys_sorted bool None None

Returns:

Type Description
DataType None

SchemaTypeAdapter

class SchemaTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class SchemaTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Collection

    system = pa.schema

    priority = ListTypeAdapter.priority + 1

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary

        # metadata)

        return super().matches_artigraph(type_=type_, hints=hints) and isinstance(

            type_.element, types.Struct  # type: ignore[attr-defined]

        )

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        kwargs = {}

        # NOTE: pyarrow converts all metadata keys/values to bytes

        if type_.metadata and b"artigraph" in type_.metadata:

            kwargs = json.loads(type_.metadata[b"artigraph"].decode())

            for key in ["partition_by", "cluster_by"]:

                if key in kwargs:  # pragma: no cover

                    kwargs[key] = tuple(kwargs[key])

        return cls.artigraph(

            element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),

            **kwargs,

        )

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, pa.lib.Schema)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        return cls.system(

            StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),

            metadata={

                "artigraph": json.dumps(

                    {

                        "name": type_.name,

                        "partition_by": type_.partition_by,

                        "cluster_by": type_.cluster_by,

                    }

                )

            },

        )

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection can hold arbitrary types, but `pa.schema` is only a struct (but with arbitrary

        # metadata)

        return super().matches_artigraph(type_=type_, hints=hints) and isinstance(

            type_.element, types.Struct  # type: ignore[attr-defined]

        )

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, pa.lib.Schema)

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        kwargs = {}

        # NOTE: pyarrow converts all metadata keys/values to bytes

        if type_.metadata and b"artigraph" in type_.metadata:

            kwargs = json.loads(type_.metadata[b"artigraph"].decode())

            for key in ["partition_by", "cluster_by"]:

                if key in kwargs:  # pragma: no cover

                    kwargs[key] = tuple(kwargs[key])

        return cls.artigraph(

            element=StructTypeAdapter.to_artigraph(type_, hints=hints, type_system=type_system),

            **kwargs,

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        return cls.system(

            StructTypeAdapter.to_system(type_.element, hints=hints, type_system=type_system),

            metadata={

                "artigraph": json.dumps(

                    {

                        "name": type_.name,

                        "partition_by": type_.partition_by,

                        "cluster_by": type_.cluster_by,

                    }

                )

            },

        )

Methods

system

def system(
    ...
)

schema(fields, metadata=None)

Construct pyarrow.Schema from collection of fields.

Parameters:

Name Type Description Default
fields iterable of Fields or tuples, or mapping of strings to DataTypes None None
metadata dict, default None Keys and values must be coercible to bytes. None

Returns:

Type Description
pyarrow.Schema None

StructTypeAdapter

class StructTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class StructTypeAdapter(_PyarrowTypeAdapter):

    artigraph = types.Struct

    system = pa.struct

    @classmethod

    def _field_to_artigraph(

        cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        ret = type_system.to_artigraph(type_.type, hints=hints)

        if type_.nullable != ret.nullable:  # Avoid setting nullable if matching to minimize repr

            ret = ret.copy(update={"nullable": type_.nullable})

        return ret

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            fields={

                field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)

                for field in type_

            }

        )

    @classmethod

    def _field_to_system(

        cls, name: str, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Any:

        return pa.field(name, type_system.to_system(type_, hints=hints), nullable=type_.nullable)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            [

                cls._field_to_system(name, subtype, hints=hints, type_system=type_system)

                for name, subtype in type_.fields.items()

            ]

        )

Ancestors (in MRO)

  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, pa.DataType) and cls._is_system(type_)

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        return cls.artigraph(

            fields={

                field.name: cls._field_to_artigraph(field, hints=hints, type_system=type_system)

                for field in type_

            }

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return cls.system(

            [

                cls._field_to_system(name, subtype, hints=hints, type_system=type_system)

                for name, subtype in type_.fields.items()

            ]

        )

Methods

system

def system(
    ...
)

struct(fields)

Create StructType instance from fields.

A struct is a nested type parameterized by an ordered sequence of types (which can all be distinct), called its fields.

Parameters:

Name Type Description Default
fields iterable of Fields or tuples, or mapping of strings to DataTypes Each field must have a UTF8-encoded name, and these field names are
part of the type metadata. None

Returns:

Type Description
DataType None

Time32TypeAdapter

class Time32TypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class Time32TypeAdapter(_BaseSizedTimeTypeAdapter):

    precision_to_unit = {

        "second": "s",

        "millisecond": "ms",

    }

    system = pa.time32

Ancestors (in MRO)

  • arti.types.pyarrow._BaseSizedTimeTypeAdapter
  • arti.types.pyarrow._BaseTimeTypeAdapter
  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision_to_unit
priority
unit_to_precision

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_artigraph(type_=type_, hints=hints)

            and type_.precision in cls.precision_to_unit  # type: ignore[attr-defined]

        )

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision

        )

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if (precision := cls.unit_to_precision.get(type_.unit)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"

            )

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(precision=precision)

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        precision = type_.precision  # type: ignore[attr-defined]

        if (unit := cls.precision_to_unit.get(precision)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"

            )

        return cls.system(unit)

Methods

system

def system(
    ...
)

time32(unit)

Create instance of 32-bit time (time of day) type with unit resolution.

Parameters:

Name Type Description Default
unit str one of 's' [second], or 'ms' [millisecond] None

Returns:

Type Description
pyarrow.Time32Type None

Time64TypeAdapter

class Time64TypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class Time64TypeAdapter(_BaseSizedTimeTypeAdapter):

    precision_to_unit = {

        "microsecond": "us",

        "nanosecond": "ns",

    }

    system = pa.time64

Ancestors (in MRO)

  • arti.types.pyarrow._BaseSizedTimeTypeAdapter
  • arti.types.pyarrow._BaseTimeTypeAdapter
  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision_to_unit
priority
unit_to_precision

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_artigraph(type_=type_, hints=hints)

            and type_.precision in cls.precision_to_unit  # type: ignore[attr-defined]

        )

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return (

            super().matches_system(type_=type_, hints=hints) and type_.unit in cls.unit_to_precision

        )

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        if (precision := cls.unit_to_precision.get(type_.unit)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.unit must be one of {tuple(cls.unit_to_precision)}, got {type_.unit}"

            )

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(precision=precision)

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        precision = type_.precision  # type: ignore[attr-defined]

        if (unit := cls.precision_to_unit.get(precision)) is None:  # pragma: no cover

            raise ValueError(

                f"{type_}.precision must be one of {tuple(cls.precision_to_unit)}, got {precision}"

            )

        return cls.system(unit)

Methods

system

def system(
    ...
)

time64(unit)

Create instance of 64-bit time (time of day) type with unit resolution.

Parameters:

Name Type Description Default
unit str One of 'us' [microsecond], or 'ns' [nanosecond]. None

Returns:

Type Description
pyarrow.Time64Type None

TimestampTypeAdapter

class TimestampTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@pyarrow_type_system.register_adapter

class TimestampTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Timestamp

    system = pa.timestamp

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        tz = type_.tz.upper()

        if tz != "UTC":

            raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")

        return super().to_artigraph(type_, hints=hints, type_system=type_system)

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is not None

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        ts = super().to_system(type_, hints=hints, type_system=type_system)

        return cls.system(ts.unit, "UTC")

Ancestors (in MRO)

  • arti.types.pyarrow._BaseTimeTypeAdapter
  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision_to_unit
priority
unit_to_precision

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        return super().matches_system(type_, hints=hints) and type_.tz is not None

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        tz = type_.tz.upper()

        if tz != "UTC":

            raise ValueError(f"Timestamp {type_}.tz must be in UTC, got {tz}")

        return super().to_artigraph(type_, hints=hints, type_system=type_system)

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        ts = super().to_system(type_, hints=hints, type_system=type_system)

        return cls.system(ts.unit, "UTC")

Methods

system

def system(
    ...
)

timestamp(unit, tz=None)

Create instance of timestamp type with resolution and optional time zone.

Parameters:

Name Type Description Default
unit str one of 's' [second], 'ms' [millisecond], 'us' [microsecond], or 'ns'
[nanosecond] None
tz str, default None Time zone name. None indicates time zone naive None

Returns:

Type Description
TimestampType None