Source code for gfw.common.config
"""Defines the `PipelineConfig` class used to configure data pipeline executions.
It includes:
- A dataclass `PipelineConfig` that stores date ranges and any unknown arguments.
- A custom exception `PipelineConfigError` for handling invalid configuration inputs.
Intended for use in CLI-based or programmatic pipeline setups where date ranges
and additional arguments need to be passed and validated.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from datetime import date
from functools import cached_property
from types import SimpleNamespace
from typing import Any, Callable, Sequence
from jinja2 import Environment
from gfw.common.jinja2 import EnvironmentLoader
ERROR_DATE = "Dates must be in ISO format. Got: {}."
[docs]
class PipelineConfigError(Exception):
"""Custom exception for pipeline configuration errors."""
pass
[docs]
@dataclass(frozen=True, kw_only=True)
class PipelineConfig:
"""Configuration object for data pipeline execution.
Note:
This class is completely generic and independent of any specific pipeline framework.
"""
date_range: tuple[str, str]
"""Tuple of start and end dates in ISO format (``YYYY-MM-DD``)."""
name: str = ""
"""Name of the pipeline."""
version: str = "0.1.0"
"""Version of the pipeline."""
jinja_folder: str = "assets/queries"
"""The folder that contains the jinja2 templates."""
mock_bq_clients: bool = False
"""If True, all BigQuery interactions will be mocked."""
unknown_parsed_args: dict[str, Any] = field(default_factory=dict)
"""Parsed CLI or config arguments not explicitly defined in self."""
unknown_unparsed_args: tuple[str, ...] = ()
"""Raw unparsed CLI arguments."""
[docs]
@classmethod
def from_namespace(cls, ns: SimpleNamespace, **kwargs: Any) -> PipelineConfig:
"""Creates a :class:`PipelineConfig` instance from a :class:`types.SimpleNamespace`.
Args:
ns:
Namespace containing attributes matching this :class:`PipelineConfig` fields.
**kwargs:
Any additional arguments to be passed to the class constructor.
Returns:
A new :class:`PipelineConfig` instance.
"""
ns_dict = vars(ns)
ns_dict.update(kwargs)
return cls(**ns_dict)
@cached_property
def parsed_date_range(self) -> tuple[date, date]:
"""Returns the parsed start and end dates as :class:`~datetime.date` objects.
Raises:
:class:`PipelineConfigError`:
If any of the dates are not in valid ISO format.
Returns:
A tuple containing parsed start and end dates.
"""
try:
start_str, end_str = self.date_range
return (date.fromisoformat(start_str), date.fromisoformat(end_str))
except ValueError as e:
raise PipelineConfigError(ERROR_DATE.format(self.date_range)) from e
@cached_property
def top_level_package(self) -> str:
"""Returns the top-level package from this module."""
module = self.__class__.__module__
package = module.split(".")[0]
return package
@cached_property
def jinja_env(self) -> Environment:
"""Returns a default jinja2 environment."""
return EnvironmentLoader().from_package(
package=self.top_level_package, path=self.jinja_folder
)
@property
def start_date(self) -> date:
"""Returns the start date of the configured range.
Returns:
A :class:`~datetime.date` object representing the start of the range.
"""
return self.parsed_date_range[0]
@property
def end_date(self) -> date:
"""Returns the end date of the configured range.
Returns:
A :class:`~datetime.date` object representing the end of the range.
"""
return self.parsed_date_range[1]
@property
def pre_hooks(self) -> Sequence[Callable[[Any], None]]:
"""Sequence of callables executed before pipeline run."""
return []
@property
def post_hooks(self) -> Sequence[Callable[[Any], None]]:
"""Sequence of callables executed after successful pipeline run."""
return []
[docs]
def to_dict(self) -> dict[str, Any]:
"""Converts a :class:`PipelineConfig` instance to dictionary.
Returns:
A dictionary representation of the configuration.
"""
return asdict(self)