Source code for gfw.common.beam.transforms.apply_sliding_windows
"""Transforms for applying sliding windows in Apache Beam."""
from typing import Any, Dict
import apache_beam as beam
from apache_beam.pvalue import PCollection
from apache_beam.transforms.window import SlidingWindows, TimestampedValue
[docs]
class ApplySlidingWindows(beam.PTransform):
"""Applies sliding windows, optionally assigning event timestamps.
Args:
period:
The window period (interval between window start times), in seconds.
offset:
The offset to apply to window start times, in seconds.
assign_timestamps:
Whether to assign timestamps using a field from each element.
timestamp_field:
The name of the field containing the timestamp (UNIX time, in seconds).
**kwargs:
Additional keyword arguments passed to base PTransform class.
"""
def __init__(
self,
period: float,
offset: float,
assign_timestamps: bool = False,
timestamp_field: str = "timestamp",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self._period = period
self._offset = offset
self._assign_timestamps = assign_timestamps
self._timestamp_field = timestamp_field
[docs]
def expand(self, pcoll: PCollection[Dict[str, Any]]) -> PCollection[Dict[str, Any]]:
"""Apply sliding windows to the input PCollection.
Optionally assigns event-time timestamps using the configured timestamp field.
Args:
pcoll:
A PCollection of dictionaries containing a timestamp field.
Returns:
A windowed PCollection with sliding windows applied.
"""
size = self._period + self._offset
if self._assign_timestamps:
pcoll = pcoll | "AddTimestamps" >> beam.Map(
lambda e: TimestampedValue(e, e[self._timestamp_field])
)
return pcoll | "ApplySlidingWindows" >> beam.WindowInto(
SlidingWindows(size=size, period=self._period, offset=self._offset)
)