Apache Beam (gfw.common.beam)#

This package provides utilities for managing Apache Beam pipelines.

Defining the DAG#

The Pipeline class accepts an instance of type Dag, which is responsible for defining the Apache Beam pipeline graph. Subclasses of Dag must implement the abstract method Dag.apply(). A specific implementation provided in this package is LinearDag.

Pipeline Configuration#

The goal of the PipelineConfig class is to provide a standard way of configuring pipelines. The following code shows an example of how to inherit from the base class to add custom parameters.

import math
from dataclasses import dataclass, field
from datetime import date, timedelta

from gfw.common import PipelineConfig


@dataclass
class RawGapsConfig(PipelineConfig):
    filter_not_overlapping_and_short: bool = False
    filter_good_seg: bool = False
    open_gaps_start_date: str = "2019-01-01"
    skip_open_gaps: bool = False
    ssvids: tuple = field(default_factory=tuple)
    min_gap_length: float = 6
    n_hours_before: int = 12
    window_period_d: int = None
    eval_last: bool = True
    normalize_output: bool = True
    json_input_messages: str = None
    json_input_open_gaps: str = None
    bq_read_method: str = "EXPORT"
    bq_input_messages: str = None
    bq_input_segments: str = "pipe_ais_v3_published.segs_activity"
    bq_input_open_gaps: str = None
    bq_output_gaps: str = None
    bq_output_gaps_description: bool = False
    bq_write_disposition: str = "WRITE_APPEND"
    mock_bq_clients: bool = False
    save_json: bool = False
    work_dir: str = "workdir"

    name = "pipe-gaps"

    def __post_init__(self) -> None:
        if (
            self.json_input_messages is None
            and (self.bq_input_messages is None or self.bq_input_segments is None)
        ):
            raise ValueError("You need to provide either a JSON inputs or BQ input.")

    @property
    def open_gaps_start(self) -> date:
        return date.fromisoformat(self.open_gaps_start_date)

    @property
    def messages_query_start_date(self) -> date:
        buffer_days = math.ceil(self.n_hours_before / 24)
        return self.start_date - timedelta(days=buffer_days)

Creating a Pipeline#

The goal of the Pipeline class is to provide an easy way of instantiate and manage Apache Beam pipelines.

import logging
import apache_beam as beam

from gfw.common.beam.pipeline import LinearDag, Pipeline
logger = logging.getLogger(__name__)


class SourcePTransform(beam.PTransform):
    def expand(self, p):
        return p | beam.Create(["a", "b", "c"])


dag = LinearDag(sources=[SourcePTransform()])
pipeline = Pipeline(dag=dag, project="world-fishing-827")
result, _ = pipeline.run()
print(result.state)

Using DAG and Pipeline factories#

The use of DagFactory and PipelineFactory classes can simplify and structure to the construction of a pipeline. To use DagFactory you need to create a subclass and implement the abstract method DagFactory.build_dag(). The subclass LinearDagFactory inherits from DagFactory and declares abstract methods that help simplify the instantiation of LinearDag class. To use PipelineFactory you need to inject instances of DagFactory and PipelineConfig classes.

The following code shows an example of how to use these classes.

import logging
from types import SimpleNamespace

from gfw.common.beam.pipeline.factory import PipelineFactory

from pipe_gaps.pipeline.config import RawGapsConfig
from pipe_gaps.pipeline.factory import RawGapsLinearDagFactory
from pipe_gaps.version import __version__


logger = logging.getLogger(__name__)


def run(config: SimpleNamespace) -> None:
    config = RawGapsConfig.from_namespace(config, version=__version__)
    dag_factory = RawGapsLinearDagFactory(config)
    pipeline_factory = PipelineFactory(config, dag_factory=dag_factory)
    pipeline = pipeline_factory.build_pipeline()
    result, _ = pipeline.run()

Collection of PTransforms#

There is a series of custom PTransforms that provide extra functionality and sensible defaults around the built-in Ptransforms of Apache Beam.