gfw.common.beam.pipeline#

Simplifies Apache Beam pipeline configuration and DAG management.

These components help build configurable, maintainable Beam pipelines with less boilerplate.

Classes#

Pipeline

Wrapper around beam.Pipeline with extended functionality.

PipelineFactory

Builds a Pipeline instance from PipelineConfig and DagFactory.

Dag

Abstract base class for DAG construction logic.

DagFactory

Abstract base class for DAG factories producing Dag objects.

LinearDag

A linear DAG implementation for Apache Beam pipelines.

LinearDagFactory

Abstract base class for factories producing LinearDag objects.

Modules#

hooks

Pipeline hooks for pre- or post-processing operations.


class Dag[source]#

Abstract base class for DAG construction logic.

abstractmethod apply(pipeline)[source]#

Applies a series of PTransforms to the pipeline.

Return type:

PCollection

class DagFactory(config)[source]#

Abstract base class for DAG factories producing Dag objects.

Provides factory properties for BigQuery read/write clients and helpers.

property bigquery_helper_factory: Callable[[...], BigQueryHelper]#

Returns a factory for BigQueryHelper instances.

Returns:

Callable that creates BigQueryHelper instances with the appropriate client factory.

abstractmethod build_dag()[source]#

Builds the DAG.

Must be implemented in subclasses.

Returns:

A tuple of PTransforms representing the DAG components.

Return type:

Dag

property read_from_bigquery_factory: Callable[[...], ReadFromBigQuery]#

Returns a factory for ReadFromBigQuery clients.

Uses mocked clients if configured.

property write_to_bigquery_factory: Callable[[...], WriteToBigQuery]#

Returns a factory for WriteToPartitionedBigQuery clients.

Uses mocked clients if configured.

class LinearDag(sources=(), core=None, side_inputs=None, sinks=())[source]#

A linear DAG implementation for Apache Beam pipelines.

This DAG:
  1. Applies multiple sources PTransforms and merges outputs into a single PCollection.

  2. Applies a single core PTransform, with an optional side inputs PTransform.

  3. Applies multiple sinks PTransforms.

Parameters:
  • sources (Tuple[PTransform[Any, Any], ...]) – A list of PTransforms that read input data.

  • core (PTransform[Any, Any] | None) – The core PTransform that processes the data.

  • side_inputs (PTransform[Any, Any] | None) – A PTransform used to read side inputs that will be injected into the core transform.

  • sinks (Tuple[PTransform[Any, Any], ...]) – A list of PTransforms that write the output data.

output_paths#

A list of output paths for each sink, if they contain the path attribute.

apply(p)[source]#

Applies the linear DAG implementation to an Apache Beam pipeline.

Return type:

PCollection

property output_paths: List[Path | str]#

Resolves and returns a list of output paths for each sink in the pipeline.

class LinearDagFactory(config)[source]#

Abstract base class for factories producing LinearDag objects.

property bigquery_helper_factory: Callable[[...], BigQueryHelper]#

Returns a factory for BigQueryHelper instances.

Returns:

Callable that creates BigQueryHelper instances with the appropriate client factory.

build_dag()[source]#

Builds a LinearDag instance from the configured pipeline parts.

Returns:

A LinearDag composed of sources, core, side inputs, and sinks.

Return type:

LinearDag

abstract property core: PTransform#

Returns the core PTransform for data processing.

property read_from_bigquery_factory: Callable[[...], ReadFromBigQuery]#

Returns a factory for ReadFromBigQuery clients.

Uses mocked clients if configured.

property side_inputs: PTransform | None#

Returns optional side inputs for the core PTransform.

abstract property sinks: Tuple[PTransform, ...]#

Returns the sink PTransforms to write data outputs.

abstract property sources: Tuple[PTransform, ...]#

Returns the source PTransforms`.

Returns:

Tuple of PTransforms serving as data sources.

property write_to_bigquery_factory: Callable[[...], WriteToBigQuery]#

Returns a factory for WriteToPartitionedBigQuery clients.

Uses mocked clients if configured.

class Pipeline(name='', version='0.1.0', dag=None, pre_hooks=(), post_hooks=(), unparsed_args=(), **options)[source]#

Wrapper around beam.Pipeline with extended functionality.

Features:
  • Merges unparsed, parsed, and default options.

  • Supports custom DAG definitions.

  • Enables Google Cloud Profiler integration.

  • Automatically adds ./setup.py when sdk_container_image is not specified.

You can implement your own Dag object to be injected in the constructor, reuse the provided LinearDag, or just override the apply_dag() method of this class.

Parameters:
  • name (str) – The name of the pipeline. Defaults to an empty string.

  • version (str) – The version of the pipeline. Defaults to 0.1.0.

  • dag (Dag | None) – The DAG to be applied to the pipeline. Defaults to an empty LinearDag.

  • pre_hooks (Sequence[Callable[[...], None]]) – Sequence of callables executed before pipeline run. Each callable receives the pipeline instance as its only argument.

  • post_hooks (Sequence[Callable[[...], None]]) – Sequence of callables executed after pipeline run completes successfully. Each callable receives the pipeline instance as its only argument.

  • unparsed_args (Tuple[str, ...]) – A list of unparsed arguments to pass to Beam options. Defaults to an empty tuple.

  • **options (Any) – Additional options to pass to the Beam pipeline.

parsed_args#

The parsed arguments from the unparsed_args list.

pipeline_options#

The merged pipeline options, including parsed args, user options, and defaults.

pipeline#

The initialized beam Pipeline object.

apply_dag()[source]#

Applies the provided DAG implementation to the self.pipeline.

Return type:

PCollection

property cloud_options: GoogleCloudOptions#

Returns the GoogleCloudOptions view of the PipelineOptions.

static default_options()[source]#

Returns the default options for the pipeline.

Return type:

dict[str, Any]

property is_streaming: bool#

Returns whether the pipeline is running in streaming mode.

property parsed_args: dict[str, Any]#

Parses the unparsed arguments using Beam’s PipelineOptions.

Returns:

A dictionary of parsed arguments.

property pipeline: Pipeline#

Returns the initialized beam.Pipeline object.

property pipeline_options: PipelineOptions#

Resolves pipeline options.

Combines parsed arguments by beam CLI, constructor parameters and defaults into a single PipelineOptions object.

Returns:

The merged pipeline options.

run(wait_until_finish=None)[source]#

Executes the Apache Beam pipeline.

Runs the configured DAG and returns the pipeline result along with its main output(s).

The execution can be either blocking or non-blocking depending on the pipeline type and the wait_until_finish parameter:

  • If wait_until_finish is None (default): - batch pipelines block until completion - streaming pipelines return immediately after submission

  • If wait_until_finish is explicitly set: - True blocks until the pipeline finishes - False returns immediately

Note that waiting on a streaming pipeline will block indefinitely unless the job is externally cancelled.

Post-hooks are only executed when the pipeline finishes successfully (i.e., in blocking mode and when the final state is DONE).

Parameters:

wait_until_finish (bool | None) – Whether to wait for the pipeline execution to complete. If None, the behavior is inferred from whether the pipeline is running in streaming mode.

Returns:

  • The PipelineResult of the executed pipeline.

  • The main output(s) produced by the DAG, which may be a PCollection, a tuple, a dict, or None.

Return type:

A tuple containing

class PipelineFactory(config, dag_factory, **kwargs)[source]#

Builds a Pipeline instance from PipelineConfig and DagFactory.

Parameters:
  • config (PipelineConfig) – Configuration for the pipeline.

  • dag_factory (DagFactory) – Factory that produces the pipeline’s Dag.

  • **kwargs (Any) – Any additional parameters to be passed to Pipeline constructor.

build_pipeline()[source]#

Constructs and returns a fully configured Pipeline instance.

Returns:

A pipeline with DAG, version, name, and CLI arguments.

Return type:

Pipeline