Source code for gfw.common.beam.pipeline.factory

"""Factory for constructing Beam pipelines from configuration and DAG factories.

This module defines the PipelineFactory class, which builds a fully configured
Pipeline instance from a given PipelineConfig and DagFactory.
"""

from typing import Any

from gfw.common.config import PipelineConfig

from .base import Pipeline
from .dag import DagFactory


[docs] class PipelineFactory: """Builds a :class:`Pipeline` instance from :class:`PipelineConfig` and :class:`DagFactory`. Args: config: Configuration for the pipeline. dag_factory: Factory that produces the pipeline's :class:`~gfw.common.beam.pipeline.Dag`. **kwargs: Any additional parameters to be passed to :class:`Pipeline` constructor. """ def __init__( self, config: PipelineConfig, dag_factory: DagFactory, **kwargs: Any, ) -> None: """Initializes the factory with config, DAG factory, and optional name.""" self._config = config self._dag_factory = dag_factory self._kwargs = kwargs
[docs] def build_pipeline(self) -> Pipeline: """Constructs and returns a fully configured Pipeline instance. Returns: A pipeline with DAG, version, name, and CLI arguments. """ return Pipeline( name=self._config.name, version=self._config.version, dag=self._dag_factory.build_dag(), pre_hooks=self._config.pre_hooks, post_hooks=self._config.post_hooks, unparsed_args=self._config.unknown_unparsed_args, **self._config.unknown_parsed_args, **self._kwargs, )