gfw.common.beam.transforms.WriteToBigQueryWrapper#

class WriteToBigQueryWrapper(table, schema=None, label=None, convert_timestamps=False, write_to_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.WriteToBigQuery'>, **write_to_bigquery_kwargs)[source]#

Wrapper around WriteToBigQuery with extended functionality.

Key Features: - Provides a simpler interface define a schema using a list of dictionaries. - Automatically selects writing method based on pipeline mode (streaming vs. batch) and runner. - Automatically converts TIMESTAMP fields to Timestamp objects when needed

(streaming with STORAGE_WRITE_API).

Parameters:
  • table (str) – The BigQuery table to write to (in the format project:dataset.table).

  • schema (list[dict[str, str]] | None) – The schema for the BigQuery table.

  • convert_timestamps (bool) – If True, converts TIMESTAMP fields to Timestamp objects when a streaming pipeline is using STORAGE_WRITE_API method, which requires Apache Beam Timestamp objects. See https://beam.apache.org/documentation/io/built-in/google-bigquery/.

  • write_to_big_query_factory – A factory function used to create a beam.io.WriteToBigQuery instance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.

  • **write_to_bigquery_kwargs (Any) – Any additional keyword arguments to be passed to beam.io.WriteToBigQuery class. Check official Apache Beam documentation.

Example

from pipe_nmea.common.beam.transforms import bigquery

pcoll | "Write" >> bigquery.WriteToPartitionedBigQuery(
    table="project:dataset.table",
    schema=[{"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, ...],
)

Methods

annotations

default_label

default_type_hints

display_data

Returns the display data associated to a pipeline component.

expand

Writes the input PCollection to BigQuery, creating the table if it does not exist.

float_to_beam_timestamp

Converts in-place specified fields in a dictionary from float to Beam Timestamp objects.

from_runner_api

get_client_factory

Returns a factory for beam.WriteToBigQuery objects.

get_resource_hints

get_type_hints

Gets and/or initializes type hints for this object.

get_windowing

Returns the window function to be associated with transform's output.

infer_output_type

register_urn

resolve_write_method

Resolves the appropriate write method to use to write to BigQuery.

runner_api_requires_keyed_input

to_runner_api

to_runner_api_parameter

to_runner_api_pickled

type_check_inputs

type_check_inputs_or_outputs

type_check_outputs

with_input_types

Annotates the input type of a PTransform with a type-hint.

with_output_types

Annotates the output type of a PTransform with a type-hint.

with_resource_hints

Adds resource hints to the PTransform.

Attributes

label

pipeline

schema

Returns the BigQuery schema in the format expected by beam.WriteToBigQuery.

side_inputs

timestamp_fields

Extract the field names of type TIMESTAMP from the schema.

classmethod get_client_factory(mocked=False)[source]#

Returns a factory for beam.WriteToBigQuery objects.

Return type:

Callable

property schema: dict[str, Any] | None#

Returns the BigQuery schema in the format expected by beam.WriteToBigQuery.

The provided schema as a list of dictionaries (e.g., [{"name": ..., "type": ..., ...}]), is wrapped in a dictionary under the “fields” key.

Returns:

A dictionary of the form {"fields": [...]}.

property timestamp_fields: List[str]#

Extract the field names of type TIMESTAMP from the schema.

expand(pcoll)[source]#

Writes the input PCollection to BigQuery, creating the table if it does not exist.

Before applying the WriteToBigQuery transform, this method ensures that the target table is created with the specified schema, partitioning, and clustering configurations.

Parameters:

pcoll (PCollection[dict[str, Any]]) – The input PCollection to write to BigQuery.

Returns:

An empty PCollection that acts as a signal for the completion of the write step. It can be used to chain additional transforms (e.g., logging or monitoring), but typically it contains no elements and exists primarily to signal that the write step has occurred within the pipeline.

Return type:

PCollection[dict[str, Any]]

static resolve_write_method(standard_options)[source]#

Resolves the appropriate write method to use to write to BigQuery.

The selection logic is based on the StandardOptions of the pipeline in which beam.WriteToBigQuery transform is used.

The default behavior differs from the one in beam.WriteToBigQuery, where STREAMING_INSERTS is used for streaming pipelines. Here, we prefer STORAGE_WRITE_API for streaming pipelines, which is Google’s recommended method for high-throughput, low-latency streaming writes.

As of Apache Beam 2.64, STORAGE_API_AT_LEAST_ONCE is not available in python, but STORAGE_WRITE_API can be used for at-least-once semantics.

See https://cloud.google.com/dataflow/docs/guides/write-to-bigquery.

Parameters:

standard_options (StandardOptions) – The StandardOptions of the pipeline in which WriteToBigQuery transform is used.

Returns:

A string representing the selected write method. One of ("STREAMING_INSERTS", "FILE_LOADS", "STORAGE_WRITE_API").

Return type:

str

static float_to_beam_timestamp(row, fields)[source]#

Converts in-place specified fields in a dictionary from float to Beam Timestamp objects.

Parameters:
  • row (dict[str, Any]) – A dictionary containing data with potential float values.

  • fields (list[str]) – A tuple of field names to be converted to Timestamp.

Returns:

The input dictionary with specified fields converted to Timestamp objects.

Return type:

dict[str, Any]