Source code for gfw.common.beam.transforms.read_from_bigquery

"""Module with reusable PTransforms for reading input PCollections."""

from __future__ import annotations

from typing import Any, Callable, Optional, Sequence

import apache_beam as beam

from apache_beam import io
from apache_beam.pvalue import PCollection

from gfw.common.query import Query


class FakeReadFromBigQuery(io.ReadFromBigQuery):
    """Mocks beam.io.ReadFromBigQuery.

    Args:
        elements:
            Elements to use as output Pcollection.
    """

    def __init__(self, elements: Sequence[dict] = (), **kwargs: Any) -> None:
        self._elements = elements

    def expand(self, pcoll: PCollection) -> PCollection:
        """Returns injected elements in the constructor."""
        return pcoll | beam.Create(self._elements)


[docs] class ReadFromBigQuery(beam.PTransform): """Wrapper around :class:`~beam.io.ReadFromBigQuery` with optional casting. Args: query: The query to execute. output_type: The Beam type hint for the output (e.g., a :class:`~typing.NamedTuple`). If not provided, defaults to dict. method: The method to use to read from BigQuery. It may be ``EXPORT`` or ``DIRECT_READ``. use_standard_sql: Specifies whether to use BigQuery's standard SQL dialect for this query. Defaults to True. read_from_bigquery_factory: A factory function used to create a :class:`~beam.io.ReadFromBigQuery` instance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used. write_to_bigquery_kwargs: Any additional keyword arguments to be passed to :class:`~beam.io.ReadFromBigQuery` class. Check `official Apache Beam documentation <https://beam.apache.org/releases/pydoc/2.64.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.ReadFromBigQuery>`_. **kwargs: Additional keyword arguments passed to base PTransform class. """ def __init__( self, query: str, output_type: type = dict, method: str = beam.io.ReadFromBigQuery.Method.EXPORT, use_standard_sql: bool = True, read_from_bigquery_factory: Callable[..., io.ReadFromBigQuery] = io.ReadFromBigQuery, read_from_bigquery_kwargs: Optional[dict[Any, Any]] = None, **kwargs: Any, ) -> None: """Initializes a ReadFromBigQuery instance.""" super().__init__(**kwargs) self._query = query self._output_type = output_type self._method = method self._use_standard_sql = use_standard_sql self._read_from_bigquery_factory = read_from_bigquery_factory self._read_from_bigquery_kwargs = read_from_bigquery_kwargs or {}
[docs] @classmethod def get_client_factory(cls, mocked: bool = False) -> Callable: """Returns a factory for ReadFromPubSub objects.""" if mocked: return FakeReadFromBigQuery return io.ReadFromBigQuery
[docs] @classmethod def from_query(cls, query: Query, use_type: bool = False, **kwargs: Any) -> ReadFromBigQuery: """Creates a ReadFromBigQuery PTransform from a Query object. Args: query: An instance of a :class:`~gfw.common.query.Query` subclass. Its :meth:`render <gfw.common.query.Query.render>` method is used to produce the SQL query string. use_type: If True, sets PTransform type to the provided ``output_type``. **kwargs: Any additional arguments for :class:`~beam.io.ReadFromBigQuery` constructor. Returns: A configured :class:`~beam.io.ReadFromBigQuery` instance. """ rendered_query = query.render(formatted=False) output_type: type = dict if use_type: output_type = type(query.output_type) return cls(query=rendered_query, output_type=output_type, **kwargs)
[docs] def expand(self, pcoll: PCollection) -> PCollection[Any]: """Applies PCollection to read from BigQuery.""" output = pcoll | self._read_from_bigquery_factory( use_standard_sql=self._use_standard_sql, query=self._query, method=self._method, **self._read_from_bigquery_kwargs, ).with_output_types(dict) if self._output_type not in (None, dict): output = output | beam.Map(lambda d: self._output_type(**d)).with_output_types( self._output_type ) return output