Source code for gfw.common.beam.pipeline.hooks
"""Pipeline hooks for pre- or post-processing operations.
This module provides helper functions that generate hooks to be executed
during a pipeline's lifecycle. Hooks are callable functions that take a
:class:`~gfw.common.beam.pipeline.Pipeline` object and perform arbitrary operations,
such as creating views, deleting data, or any other custom task.
"""
import logging
from datetime import date
from typing import Callable, Optional
from gfw.common.bigquery.helper import BigQueryHelper
from gfw.common.bigquery.table_config import TableConfig
from .base import Pipeline
logger = logging.getLogger(__name__)
[docs]
def create_view_hook(
table_config: TableConfig,
mock: bool = False,
) -> Callable[[Pipeline], None]:
"""Returns a hook function to create a view of a BigQuery table.
Args:
table_config:
:class:`~gfw.common.bigquery.TableConfig` instance containing view details.
mock:
If True, uses a mocked BQ client instead of performing real operations.
Returns:
A callable hook that accepts a :class:`~gfw.common.beam.pipeline.Pipeline`
instance and creates the view.
"""
def _hook(p: Pipeline) -> None:
view_id = table_config.view_id
view_query = table_config.view_query()
logger.info(f"Creating view: {view_id}...")
client_factory = BigQueryHelper.get_client_factory(mocked=mock)
bq_client = BigQueryHelper(client_factory=client_factory, project=p.cloud_options.project)
bq_client.create_view(view_id=view_id, view_query=view_query)
logger.info("Done.")
return _hook
[docs]
def delete_events_hook(
table_config: TableConfig,
start_date: date,
end_date: Optional[date] = None,
mock: bool = False,
) -> Callable[[Pipeline], None]:
"""Returns a hook function to delete events from a BigQuery table.
Args:
table_config:
:class:`~gfw.common.bigquery.TableConfig` object containing
table detailsand delete query.
start_date:
Date after which events should be deleted.
end_date:
Date up to which events should be deleted (exclusive).
mock:
If True, uses a mocked BQ client instead of performing real operations.
Returns:
A callable hook that accepts a :class:`~gfw.common.beam.pipeline.Pipeline` instance
and deletes events.
"""
def _hook(p: Pipeline) -> None:
table_id = table_config.table_id
logger.info(f"Deleting events from '{table_id}' after '{start_date}'...")
delete_query = table_config.delete_query(start_date=start_date, end_date=end_date)
client_factory = BigQueryHelper.get_client_factory(mocked=mock)
bq_client = BigQueryHelper(client_factory=client_factory, project=p.cloud_options.project)
bq_client.run_query(query_str=delete_query)
logger.info("Done.")
return _hook
[docs]
def create_table_hook(
table_config: TableConfig,
mock: bool = False,
) -> Callable[[Pipeline], None]:
"""Returns a hook function to create a BigQuery table.
Args:
table_config:
:class:`~gfw.common.bigquery.TableConfig` instance containing view details.
mock:
If True, uses a mocked BQ client instead of performing real operations.
Returns:
A callable hook that accepts a :class:`~gfw.common.beam.pipeline.Pipeline`
instance and creates the view.
"""
def _hook(p: Pipeline) -> None:
view_id = table_config.table_id
logger.info(f"Creating table: {view_id}...")
client_factory = BigQueryHelper.get_client_factory(mocked=mock)
bq_client = BigQueryHelper(client_factory=client_factory, project=p.cloud_options.project)
params = table_config.to_bigquery_params()
bq_client.create_table(**params, exists_ok=True)
logger.info("Done.")
return _hook