Source code for gfw.common.beam.transforms.sample_and_log

"""This module defines a reusable Apache Beam PTransform for logging elements of a PCollection."""

import json
import logging

from typing import Any, Optional

import apache_beam as beam

from apache_beam import PTransform
from apache_beam.pvalue import PCollection
from apache_beam.transforms.combiners import Sample
from apache_beam.transforms.window import FixedWindows


logger = logging.getLogger(__name__)


[docs] class SampleAndLogElements(PTransform): """A Beam PTransform that logs elements of a PCollection. Args: sample_size: The number of elements to log. If not provided, logs all elements. window_size: The window duration in seconds used when sampling unbounded sources; only applicable when ``sample_size`` is set. pretty_print: If True, formats each element as pretty-printed JSON when possible. message: A custom string format for the log message. Must contain the placeholder ``{e}``. """ def __init__( self, sample_size: Optional[int] = None, window_size: int = 60, pretty_print: bool = False, message: str = "Element: {e}", ) -> None: self._sample_size = sample_size self._window_size = window_size self._pretty_print = pretty_print self._message = message
[docs] def expand(self, pcoll: PCollection) -> PCollection: """Log elements of a PCollection, optionally sampling a ``sample_size`` elements.""" samples = pcoll if self._sample_size: samples = ( pcoll # Windowing is needed for sampling on unbounded sources. | "Apply Fixed Window" >> beam.WindowInto(FixedWindows(self._window_size)) # Defaults are not supported if you are not using a Global Window. | "Sample" >> Sample.FixedSizeGlobally(self._sample_size).without_defaults() | "Flatten Samples" >> beam.FlatMap(lambda elements: elements) ) _ = samples | "Log Elements" >> beam.Map(self._log_element) return pcoll
def _log_element(self, element: Any) -> Any: formatted = element if self._pretty_print: formatted = json.dumps(element, indent=4) log_message = self._message.format(e=formatted) logger.debug(log_message) return element