gfw.common.beam.transforms.WriteToBigQueryWrapper#
- class WriteToBigQueryWrapper(table, schema=None, label=None, convert_timestamps=False, write_to_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.WriteToBigQuery'>, **write_to_bigquery_kwargs)[source]#
Wrapper around
WriteToBigQuerywith extended functionality.Key Features: - Provides a simpler interface define a schema using a list of dictionaries. - Automatically selects writing method based on pipeline mode (streaming vs. batch) and runner. - Automatically converts TIMESTAMP fields to Timestamp objects when needed
(streaming with
STORAGE_WRITE_API).- Parameters:
table (str) – The BigQuery table to write to (in the format
project:dataset.table).schema (list[dict[str, str]] | None) – The schema for the BigQuery table.
convert_timestamps (bool) – If True, converts
TIMESTAMPfields to Timestamp objects when a streaming pipeline is usingSTORAGE_WRITE_APImethod, which requires Apache Beam Timestamp objects. See https://beam.apache.org/documentation/io/built-in/google-bigquery/.write_to_big_query_factory – A factory function used to create a
beam.io.WriteToBigQueryinstance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.**write_to_bigquery_kwargs (Any) – Any additional keyword arguments to be passed to
beam.io.WriteToBigQueryclass. Check official Apache Beam documentation.
Example
from pipe_nmea.common.beam.transforms import bigquery pcoll | "Write" >> bigquery.WriteToPartitionedBigQuery( table="project:dataset.table", schema=[{"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, ...], )
Methods
annotationsdefault_labeldefault_type_hintsReturns the display data associated to a pipeline component.
Writes the input PCollection to BigQuery, creating the table if it does not exist.
Converts in-place specified fields in a dictionary from float to Beam Timestamp objects.
from_runner_apiReturns a factory for
beam.WriteToBigQueryobjects.get_resource_hintsGets and/or initializes type hints for this object.
Returns the window function to be associated with transform's output.
infer_output_typeregister_urnResolves the appropriate write method to use to write to BigQuery.
runner_api_requires_keyed_inputto_runner_apito_runner_api_parameterto_runner_api_pickledtype_check_inputstype_check_inputs_or_outputstype_check_outputsAnnotates the input type of a
PTransformwith a type-hint.Annotates the output type of a
PTransformwith a type-hint.Adds resource hints to the
PTransform.Attributes
labelpipelineReturns the BigQuery schema in the format expected by
beam.WriteToBigQuery.side_inputsExtract the field names of type TIMESTAMP from the schema.
- classmethod get_client_factory(mocked=False)[source]#
Returns a factory for
beam.WriteToBigQueryobjects.- Return type:
- property schema: dict[str, Any] | None#
Returns the BigQuery schema in the format expected by
beam.WriteToBigQuery.The provided schema as a list of dictionaries (e.g.,
[{"name": ..., "type": ..., ...}]), is wrapped in a dictionary under the “fields” key.- Returns:
A dictionary of the form
{"fields": [...]}.
- expand(pcoll)[source]#
Writes the input PCollection to BigQuery, creating the table if it does not exist.
Before applying the
WriteToBigQuerytransform, this method ensures that the target table is created with the specified schema, partitioning, and clustering configurations.- Parameters:
pcoll (PCollection[dict[str, Any]]) – The input PCollection to write to BigQuery.
- Returns:
An empty PCollection that acts as a signal for the completion of the write step. It can be used to chain additional transforms (e.g., logging or monitoring), but typically it contains no elements and exists primarily to signal that the write step has occurred within the pipeline.
- Return type:
- static resolve_write_method(standard_options)[source]#
Resolves the appropriate write method to use to write to BigQuery.
The selection logic is based on the StandardOptions of the pipeline in which
beam.WriteToBigQuerytransform is used.The default behavior differs from the one in
beam.WriteToBigQuery, whereSTREAMING_INSERTSis used for streaming pipelines. Here, we preferSTORAGE_WRITE_APIfor streaming pipelines, which is Google’s recommended method for high-throughput, low-latency streaming writes.As of Apache Beam 2.64,
STORAGE_API_AT_LEAST_ONCEis not available in python, butSTORAGE_WRITE_APIcan be used for at-least-once semantics.See https://cloud.google.com/dataflow/docs/guides/write-to-bigquery.
- Parameters:
standard_options (StandardOptions) – The StandardOptions of the pipeline in which WriteToBigQuery transform is used.
- Returns:
A string representing the selected write method. One of
("STREAMING_INSERTS", "FILE_LOADS", "STORAGE_WRITE_API").- Return type: