Skip to content

Module arti.types.bigquery

None

None

View Source
from __future__ import annotations

import warnings

from copy import deepcopy

from typing import Any

from google.cloud import bigquery

from google.cloud.bigquery.enums import SqlTypeNames

from arti import Type, TypeAdapter, TypeSystem, types

# The BigQuery types are enumerated in [1], but a few are not (yet) implemented:

# - BIGNUMERIC

# - INTERVAL

# - JSON

# - NUMERIC

#

# 1: https://github.com/googleapis/python-bigquery/blob/76d88fbb1316317a61fa1a63c101bc6f42f23af8/google/cloud/bigquery/enums.py#L252-L274

bigquery_type_system = TypeSystem(key="bigquery")

class BIGQUERY_MODE:

    REQUIRED = "REQUIRED"

    NULLABLE = "NULLABLE"

    REPEATED = "REPEATED"

# BigQuery Structs contain list[SchemaField], each with an embedded name. Artigraph Structs contain

# dict[name, Type]. Therefore, converting a Type to a SchemaField requires the field name to be

# passed in from higher up, which is handled via this key in the `hints`.

BIGQUERY_HINT_FIELD_NAME = f"{bigquery_type_system.key}.field_name"

def _create_schema_field(

    field_type: str, type_: Type, hints: dict[str, Any], **kwargs: Any

) -> bigquery.SchemaField:

    # TODO: Support default values (which would need support in arti.Type)

    if type_.description is not None:

        kwargs.setdefault("description", type_.description)

    return bigquery.SchemaField(

        name=hints.get(BIGQUERY_HINT_FIELD_NAME, types.DEFAULT_ANONYMOUS_NAME),

        field_type=field_type,

        mode=BIGQUERY_MODE.NULLABLE if type_.nullable else BIGQUERY_MODE.REQUIRED,

        **kwargs,

    )

class _BigQueryTypeAdapter(TypeAdapter):

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        return cls.artigraph(description=type_.description, nullable=type_.is_nullable)

    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.field_type.upper() == cls.system  # type: ignore[no-any-return]

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return _create_schema_field(cls.system, type_, hints)

def _gen_adapter(*, artigraph: type[Type], system: Any, priority: int = 0) -> type[TypeAdapter]:

    return bigquery_type_system.register_adapter(

        type(

            f"BigQuery{system}{artigraph}",

            (_BigQueryTypeAdapter,),

            {"artigraph": artigraph, "system": system, "priority": priority},

        )

    )

_gen_adapter(artigraph=types.Binary, system=SqlTypeNames.BYTES)

_gen_adapter(artigraph=types.Boolean, system=SqlTypeNames.BOOL)

_gen_adapter(artigraph=types.Date, system=SqlTypeNames.DATE)

_gen_adapter(artigraph=types.Geography, system=SqlTypeNames.GEOGRAPHY)

_gen_adapter(artigraph=types.String, system=SqlTypeNames.STRING)

# BQ only supports 64-bit ints and floats (aside from numerics), so round tripping results in eg:

#     arti Float16 -> bq FLOAT64 -> arti Float64

for _precision in (16, 32, 64):

    _gen_adapter(

        artigraph=getattr(types, f"Float{_precision}"),

        system=SqlTypeNames.FLOAT64,

        priority=_precision,

    )

for _precision in (8, 16, 32, 64):

    _gen_adapter(

        artigraph=getattr(types, f"Int{_precision}"), system=SqlTypeNames.INT64, priority=_precision

    )

class _BaseTimeTypeAdapter(_BigQueryTypeAdapter):

    # BQ time precision is microsecond (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type)

    precision = "microsecond"

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(

            description=type_.description, nullable=type_.is_nullable, precision=cls.precision

        )

@bigquery_type_system.register_adapter

class DateTimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.DateTime

    system = SqlTypeNames.DATETIME

@bigquery_type_system.register_adapter

class TimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Time

    system = SqlTypeNames.TIME

@bigquery_type_system.register_adapter

class TimestampTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Timestamp

    system = SqlTypeNames.TIMESTAMP

@bigquery_type_system.register_adapter

class StructTypeAdapter(_BigQueryTypeAdapter):

    # See https://cloud.google.com/bigquery/docs/nested-repeated

    artigraph = types.Struct

    system = SqlTypeNames.STRUCT

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        return cls.artigraph(

            description=type_.description,

            fields={

                field.name: type_system.to_artigraph(field, hints=hints) for field in type_.fields

            },

            nullable=type_.is_nullable,

        )

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return _create_schema_field(

            cls.system,

            type_,

            hints,

            fields=[

                type_system.to_system(subtype, hints=hints | {BIGQUERY_HINT_FIELD_NAME: name})

                for name, subtype in type_.fields.items()

            ],

        )

@bigquery_type_system.register_adapter

class ListFieldTypeAdapter(TypeAdapter):

    # See https://cloud.google.com/bigquery/docs/nested-repeated

    artigraph = types.List

    system = bigquery.SchemaField

    priority = int(1e9)  # Bump the priority so we can catch all with `mode == "REPEATED"`

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        # Convert the REPEATED field to REQUIRED (BigQuery only supports non-nullable array

        # elements) for subsequent conversion by other TypeAdapters.

        element_type = deepcopy(type_)

        element_type._properties["mode"] = BIGQUERY_MODE.REQUIRED

        return types.List(

            description=type_.description,

            element=type_system.to_artigraph(element_type, hints=hints),

            nullable=False,  # Cannot be nullable

        )

    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.mode == BIGQUERY_MODE.REPEATED  # type: ignore[no-any-return]

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection is a subclass of List - but handled by a separate TypeAdapter

        return super().matches_artigraph(type_, hints=hints) and not isinstance(

            type_, types.Collection

        )

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        if type_.nullable:

            warnings.warn("BigQuery doesn't support nullable arrays", stacklevel=2)

        if type_.element.nullable:

            warnings.warn("BigQuery doesn't support nullable array elements", stacklevel=2)

            type_ = type_.copy(update={"element": type_.element.copy(update={"nullable": False})})

        if isinstance(type_.element, types.List):

            raise ValueError("BigQuery doesn't support nested arrays")

        field = type_system.to_system(type_.element, hints=hints)

        assert field.mode == BIGQUERY_MODE.REQUIRED

        field._properties["mode"] = BIGQUERY_MODE.REPEATED

        return field

@bigquery_type_system.register_adapter

class TableTypeAdapter(TypeAdapter):

    artigraph = types.Collection

    system = bigquery.Table

    priority = ListFieldTypeAdapter.priority + 1

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.Table, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        kwargs: dict[str, Any] = {}

        if type_.time_partitioning:

            if type_.time_partitioning.type_ != bigquery.TimePartitioningType.DAY:

                raise NotImplementedError(

                    f"BigQuery time partitioning other than 'DAY' is not implemented (got '{type_.time_partitioning.type_}')"

                )

            kwargs["partition_by"] = (type_.time_partitioning.field,)

        if type_.range_partitioning:

            raise NotImplementedError("BigQuery integer range partitioning is not implemented")

        if type_.clustering_fields:

            kwargs["cluster_by"] = tuple(type_.clustering_fields)

        return cls.artigraph(

            name=f"{type_.project}.{type_.dataset_id}.{type_.table_id}",

            element=type_system.to_artigraph(

                bigquery.SchemaField(

                    types.DEFAULT_ANONYMOUS_NAME,

                    SqlTypeNames.STRUCT,

                    fields=type_.schema,

                    mode="REQUIRED",

                ),

                hints=hints,

            ),

            nullable=False,

            **kwargs,

        )

    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.system)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        name = type_.name

        # Override invalid default (must be project/dataset qualified)

        if name == types.DEFAULT_ANONYMOUS_NAME:

            name = "project.dataset.table"

        table = cls.system(name, schema=type_system.to_system(type_.element, hints=hints).fields)

        partition, cluster = type_.partition_by, type_.cluster_by

        if partition:

            # BigQuery only supports a single partitioning field. We'll move the rest to the

            # beginning of the cluster_by. This shouldn't matter much anyway since, depending on the

            # Storage, we'll have separate tables for each unique composite key.

            head, *tail = partition

            if tail:

                cluster = (*tail, *cluster)

            if isinstance(

                type_.element.fields[head], (types.Date, types.DateTime, types.Timestamp)

            ):

                # TODO: Support other granularities than DAY

                table.time_partitioning = bigquery.TimePartitioning(

                    field=head, type_=bigquery.TimePartitioningType.DAY

                )

                table.require_partition_filter = True

            elif isinstance(type_.element.fields[head], types._Int):

                raise NotImplementedError("BigQuery integer range partitioning is not implemented")

            else:

                raise ValueError("BigQuery only supports integer range or time partitioning")

        if cluster:

            table.clustering_fields = cluster

        return table

Variables

BIGQUERY_HINT_FIELD_NAME
bigquery_type_system

Classes

BIGQUERY_MODE

class BIGQUERY_MODE(
    /,
    *args,
    **kwargs
)
View Source
class BIGQUERY_MODE:

    REQUIRED = "REQUIRED"

    NULLABLE = "NULLABLE"

    REPEATED = "REPEATED"

Class variables

NULLABLE
REPEATED
REQUIRED

DateTimeTypeAdapter

class DateTimeTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class DateTimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.DateTime

    system = SqlTypeNames.DATETIME

Ancestors (in MRO)

  • arti.types.bigquery._BaseTimeTypeAdapter
  • arti.types.bigquery._BigQueryTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.field_type.upper() == cls.system  # type: ignore[no-any-return]

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(

            description=type_.description, nullable=type_.is_nullable, precision=cls.precision

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return _create_schema_field(cls.system, type_, hints)

ListFieldTypeAdapter

class ListFieldTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class ListFieldTypeAdapter(TypeAdapter):

    # See https://cloud.google.com/bigquery/docs/nested-repeated

    artigraph = types.List

    system = bigquery.SchemaField

    priority = int(1e9)  # Bump the priority so we can catch all with `mode == "REPEATED"`

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        # Convert the REPEATED field to REQUIRED (BigQuery only supports non-nullable array

        # elements) for subsequent conversion by other TypeAdapters.

        element_type = deepcopy(type_)

        element_type._properties["mode"] = BIGQUERY_MODE.REQUIRED

        return types.List(

            description=type_.description,

            element=type_system.to_artigraph(element_type, hints=hints),

            nullable=False,  # Cannot be nullable

        )

    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.mode == BIGQUERY_MODE.REPEATED  # type: ignore[no-any-return]

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection is a subclass of List - but handled by a separate TypeAdapter

        return super().matches_artigraph(type_, hints=hints) and not isinstance(

            type_, types.Collection

        )

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        if type_.nullable:

            warnings.warn("BigQuery doesn't support nullable arrays", stacklevel=2)

        if type_.element.nullable:

            warnings.warn("BigQuery doesn't support nullable array elements", stacklevel=2)

            type_ = type_.copy(update={"element": type_.element.copy(update={"nullable": False})})

        if isinstance(type_.element, types.List):

            raise ValueError("BigQuery doesn't support nested arrays")

        field = type_system.to_system(type_.element, hints=hints)

        assert field.mode == BIGQUERY_MODE.REQUIRED

        field._properties["mode"] = BIGQUERY_MODE.REPEATED

        return field

Ancestors (in MRO)

  • arti.types.TypeAdapter

Class variables

artigraph
key
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        # Collection is a subclass of List - but handled by a separate TypeAdapter

        return super().matches_artigraph(type_, hints=hints) and not isinstance(

            type_, types.Collection

        )

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.mode == BIGQUERY_MODE.REPEATED  # type: ignore[no-any-return]

to_artigraph

def to_artigraph(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        # Convert the REPEATED field to REQUIRED (BigQuery only supports non-nullable array

        # elements) for subsequent conversion by other TypeAdapters.

        element_type = deepcopy(type_)

        element_type._properties["mode"] = BIGQUERY_MODE.REQUIRED

        return types.List(

            description=type_.description,

            element=type_system.to_artigraph(element_type, hints=hints),

            nullable=False,  # Cannot be nullable

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        if type_.nullable:

            warnings.warn("BigQuery doesn't support nullable arrays", stacklevel=2)

        if type_.element.nullable:

            warnings.warn("BigQuery doesn't support nullable array elements", stacklevel=2)

            type_ = type_.copy(update={"element": type_.element.copy(update={"nullable": False})})

        if isinstance(type_.element, types.List):

            raise ValueError("BigQuery doesn't support nested arrays")

        field = type_system.to_system(type_.element, hints=hints)

        assert field.mode == BIGQUERY_MODE.REQUIRED

        field._properties["mode"] = BIGQUERY_MODE.REPEATED

        return field

StructTypeAdapter

class StructTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class StructTypeAdapter(_BigQueryTypeAdapter):

    # See https://cloud.google.com/bigquery/docs/nested-repeated

    artigraph = types.Struct

    system = SqlTypeNames.STRUCT

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        return cls.artigraph(

            description=type_.description,

            fields={

                field.name: type_system.to_artigraph(field, hints=hints) for field in type_.fields

            },

            nullable=type_.is_nullable,

        )

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return _create_schema_field(

            cls.system,

            type_,

            hints,

            fields=[

                type_system.to_system(subtype, hints=hints | {BIGQUERY_HINT_FIELD_NAME: name})

                for name, subtype in type_.fields.items()

            ],

        )

Ancestors (in MRO)

  • arti.types.bigquery._BigQueryTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.field_type.upper() == cls.system  # type: ignore[no-any-return]

to_artigraph

def to_artigraph(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(

        cls, type_: bigquery.SchemaField, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        return cls.artigraph(

            description=type_.description,

            fields={

                field.name: type_system.to_artigraph(field, hints=hints) for field in type_.fields

            },

            nullable=type_.is_nullable,

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        return _create_schema_field(

            cls.system,

            type_,

            hints,

            fields=[

                type_system.to_system(subtype, hints=hints | {BIGQUERY_HINT_FIELD_NAME: name})

                for name, subtype in type_.fields.items()

            ],

        )

TableTypeAdapter

class TableTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class TableTypeAdapter(TypeAdapter):

    artigraph = types.Collection

    system = bigquery.Table

    priority = ListFieldTypeAdapter.priority + 1

    @classmethod

    def to_artigraph(

        cls, type_: bigquery.Table, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        kwargs: dict[str, Any] = {}

        if type_.time_partitioning:

            if type_.time_partitioning.type_ != bigquery.TimePartitioningType.DAY:

                raise NotImplementedError(

                    f"BigQuery time partitioning other than 'DAY' is not implemented (got '{type_.time_partitioning.type_}')"

                )

            kwargs["partition_by"] = (type_.time_partitioning.field,)

        if type_.range_partitioning:

            raise NotImplementedError("BigQuery integer range partitioning is not implemented")

        if type_.clustering_fields:

            kwargs["cluster_by"] = tuple(type_.clustering_fields)

        return cls.artigraph(

            name=f"{type_.project}.{type_.dataset_id}.{type_.table_id}",

            element=type_system.to_artigraph(

                bigquery.SchemaField(

                    types.DEFAULT_ANONYMOUS_NAME,

                    SqlTypeNames.STRUCT,

                    fields=type_.schema,

                    mode="REQUIRED",

                ),

                hints=hints,

            ),

            nullable=False,

            **kwargs,

        )

    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.system)

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        name = type_.name

        # Override invalid default (must be project/dataset qualified)

        if name == types.DEFAULT_ANONYMOUS_NAME:

            name = "project.dataset.table"

        table = cls.system(name, schema=type_system.to_system(type_.element, hints=hints).fields)

        partition, cluster = type_.partition_by, type_.cluster_by

        if partition:

            # BigQuery only supports a single partitioning field. We'll move the rest to the

            # beginning of the cluster_by. This shouldn't matter much anyway since, depending on the

            # Storage, we'll have separate tables for each unique composite key.

            head, *tail = partition

            if tail:

                cluster = (*tail, *cluster)

            if isinstance(

                type_.element.fields[head], (types.Date, types.DateTime, types.Timestamp)

            ):

                # TODO: Support other granularities than DAY

                table.time_partitioning = bigquery.TimePartitioning(

                    field=head, type_=bigquery.TimePartitioningType.DAY

                )

                table.require_partition_filter = True

            elif isinstance(type_.element.fields[head], types._Int):

                raise NotImplementedError("BigQuery integer range partitioning is not implemented")

            else:

                raise ValueError("BigQuery only supports integer range or time partitioning")

        if cluster:

            table.clustering_fields = cluster

        return table

Ancestors (in MRO)

  • arti.types.TypeAdapter

Class variables

artigraph
key
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.system)

to_artigraph

def to_artigraph(
    type_: 'bigquery.Table',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(

        cls, type_: bigquery.Table, *, hints: dict[str, Any], type_system: TypeSystem

    ) -> Type:

        kwargs: dict[str, Any] = {}

        if type_.time_partitioning:

            if type_.time_partitioning.type_ != bigquery.TimePartitioningType.DAY:

                raise NotImplementedError(

                    f"BigQuery time partitioning other than 'DAY' is not implemented (got '{type_.time_partitioning.type_}')"

                )

            kwargs["partition_by"] = (type_.time_partitioning.field,)

        if type_.range_partitioning:

            raise NotImplementedError("BigQuery integer range partitioning is not implemented")

        if type_.clustering_fields:

            kwargs["cluster_by"] = tuple(type_.clustering_fields)

        return cls.artigraph(

            name=f"{type_.project}.{type_.dataset_id}.{type_.table_id}",

            element=type_system.to_artigraph(

                bigquery.SchemaField(

                    types.DEFAULT_ANONYMOUS_NAME,

                    SqlTypeNames.STRUCT,

                    fields=type_.schema,

                    mode="REQUIRED",

                ),

                hints=hints,

            ),

            nullable=False,

            **kwargs,

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        assert isinstance(type_, cls.artigraph)

        assert isinstance(type_.element, types.Struct)

        name = type_.name

        # Override invalid default (must be project/dataset qualified)

        if name == types.DEFAULT_ANONYMOUS_NAME:

            name = "project.dataset.table"

        table = cls.system(name, schema=type_system.to_system(type_.element, hints=hints).fields)

        partition, cluster = type_.partition_by, type_.cluster_by

        if partition:

            # BigQuery only supports a single partitioning field. We'll move the rest to the

            # beginning of the cluster_by. This shouldn't matter much anyway since, depending on the

            # Storage, we'll have separate tables for each unique composite key.

            head, *tail = partition

            if tail:

                cluster = (*tail, *cluster)

            if isinstance(

                type_.element.fields[head], (types.Date, types.DateTime, types.Timestamp)

            ):

                # TODO: Support other granularities than DAY

                table.time_partitioning = bigquery.TimePartitioning(

                    field=head, type_=bigquery.TimePartitioningType.DAY

                )

                table.require_partition_filter = True

            elif isinstance(type_.element.fields[head], types._Int):

                raise NotImplementedError("BigQuery integer range partitioning is not implemented")

            else:

                raise ValueError("BigQuery only supports integer range or time partitioning")

        if cluster:

            table.clustering_fields = cluster

        return table

TimeTypeAdapter

class TimeTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class TimeTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Time

    system = SqlTypeNames.TIME

Ancestors (in MRO)

  • arti.types.bigquery._BaseTimeTypeAdapter
  • arti.types.bigquery._BigQueryTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.field_type.upper() == cls.system  # type: ignore[no-any-return]

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(

            description=type_.description, nullable=type_.is_nullable, precision=cls.precision

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return _create_schema_field(cls.system, type_, hints)

TimestampTypeAdapter

class TimestampTypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
@bigquery_type_system.register_adapter

class TimestampTypeAdapter(_BaseTimeTypeAdapter):

    artigraph = types.Timestamp

    system = SqlTypeNames.TIMESTAMP

Ancestors (in MRO)

  • arti.types.bigquery._BaseTimeTypeAdapter
  • arti.types.bigquery._BigQueryTypeAdapter
  • arti.types.TypeAdapter

Class variables

artigraph
key
precision
priority
system

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'bigquery.SchemaField',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: bigquery.SchemaField, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, bigquery.SchemaField) and type_.field_type.upper() == cls.system  # type: ignore[no-any-return]

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        assert issubclass(cls.artigraph, types._TimeMixin)

        return cls.artigraph(

            description=type_.description, nullable=type_.is_nullable, precision=cls.precision

        )

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        return _create_schema_field(cls.system, type_, hints)