gfw.common.beam.transforms.ReadMatchingAvroFiles#

class ReadMatchingAvroFiles(path, start_dt, end_dt, buffer_hours=6, record_time_fn=None, strict=False, date_format='%Y-%m-%d', time_format='%H_%M_%SZ', allow_no_time=False, decode=True, decode_method='utf-8', read_all_from_avro_kwargs=None, **kwargs)[source]#

Wrapper around ReadAllFromAvro with filtering.

This transform’s primary function is to intelligently filter filenames based on a time range. It works by:

  1. Generating Date-based Patterns: It first generates a list of file patterns for each day within the specified start_dt and end_dt. This efficiently prunes the search space for large, time-partitioned datasets.

  2. Precise Datetime Filtering: After matching the daily patterns, it applies a second, more precise filter to ensure that only files with a timestamp strictly within the start_dt and end_dt are processed.

This PTransform is a generic and reusable component for any data pipeline that needs to perform historical data backfills on time-partitioned Avro files.

Parameters:
  • path (str) –

    The path to the location of the Avro files. It is assumed that the data is date-partitioned, so this parameter must include a date placeholder. It can be local path, a GCS location, or any other Beam-supported filesystem path. For example:

    • gs://my-bucket/nmea-{date}/*.avro

    • gs://my-bucket/*{date}*.avro

    • /path/to/data/{date}/*.avro

  • start_dt (str) – The start datetime of the range, in ISO format (e.g., YYYY-MM-DDTHH:MM:SS).

  • end_dt (str) – The end datetime of the range, in ISO format (e.g., YYYY-MM-DDTHH:MM:SS). Datetimes equal to this value are considered outside the range.

  • buffer_hours (int) – Number of extra hours to include before and after the datetime range. Internally used to derive both the day-level glob patterns and the precise datetime filter. Defaults to 6.

  • record_time_fn (Callable[[dict], datetime] | None) – Function that extracts a event timestamp from a record. It should accept a record dictionary and return a datetime. This allows custom logic such as accessing nested fields, parsing strings, or applying fallback values. The extracted timestamp is used for the last filtering step.

  • strict (bool) – If True, raises an exception if the record_time_fn failed to extract the timestamp. If False, will skip the failing record.

  • date_format (str) – The strftime/strptime format to use when matching dates in avro files. Defaults to %Y-%m-%d.

  • time_format (str) – The strftime/strptime format to use when matching times in avro files. Defaults to %H_%M_%SZ.

  • allow_no_time (bool) – If True, allows paths to not contain time information, and a default of 0 will be applied. If False, it will raise a ValueError.

  • decode (bool) – Whether to decode the data from bytes to string. Default is True.

  • decode_method (str) – The method used to decode the message data. Supported methods include standard encodings like utf-8, ascii, etc. Default is utf-8.

  • read_all_from_avro_kwargs (dict[str, Any] | None) – Any additional keyword arguments to be passed to Beam’s ReadAllFromAvro class. Check official Apache Beam documentation.

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

Raises:

ValueError – When a path does not contain time information and allow_no_time is False.

Returns:

A PCollection of Avro records from the files within the specified datetime range.

Return type:

PCollection

Methods

annotations

default_label

default_type_hints

display_data

Returns the display data associated to a pipeline component.

expand

Applies the transform to the pipeline root and returns a PCollection of messages.

from_runner_api

get_resource_hints

get_type_hints

Gets and/or initializes type hints for this object.

get_windowing

Returns the window function to be associated with transform's output.

infer_output_type

is_path_in_range

Checks if a path containing a datetime is within the provided datetime range.

register_urn

runner_api_requires_keyed_input

to_runner_api

to_runner_api_parameter

to_runner_api_pickled

type_check_inputs

type_check_inputs_or_outputs

type_check_outputs

with_input_types

Annotates the input type of a PTransform with a type-hint.

with_output_types

Annotates the output type of a PTransform with a type-hint.

with_resource_hints

Adds resource hints to the PTransform.

Attributes

MSG_FAILED_EXTRACTING_TIMESTAMP

label

pipeline

side_inputs

is_path_in_range(path)[source]#

Checks if a path containing a datetime is within the provided datetime range.

Return type:

bool

expand(pcoll)[source]#

Applies the transform to the pipeline root and returns a PCollection of messages.

Parameters:

pcoll (PCollection) – An input PCollection. This is expected to be a PBegin when used with a real or mocked ReadFromPubSub, since Pub/Sub sources begin from the pipeline root.

Returns:

A PCollection of dictionaries where each dictionary contains the following keys:

  • data: The decoded message string (if decoding is enabled).

  • attributes: A dictionary of message attributes (if available).

Return type:

beam.PCollection