gfw.common.beam.transforms#

Package for reusable and well-tested Apache Beam PTransforms.

This package provides a collection of reusable PTransform components designed to simplify and standardize data processing patterns in Apache Beam pipelines.

Each transform in this package is developed with an emphasis on clarity, testability, and composability — making it easier to write robust and maintainable pipelines across both batch and streaming modes.

These components aim to serve as building blocks to accelerate development while maintaining high code quality and reducing duplication.

Classes#

ApplySlidingWindows

Applies sliding windows, optionally assigning event timestamps.

GroupBy

Wrapper around beam.GroupBy with automatic labeling.

ReadAndDecodeFromPubSub

Wrapper around ReadFromPubSub with optional decoding.

ReadFromBigQuery

Wrapper around ReadFromBigQuery with optional casting.

ReadFromJson

Beam transform to read a PCollection from a JSON file.

ReadMatchingAvroFiles

Wrapper around ReadAllFromAvro with filtering.

SampleAndLogElements

A Beam PTransform that logs elements of a PCollection.

WriteToBigQueryWrapper

Wrapper around WriteToBigQuery with extended functionality.

WriteToJson

Writes PCollection as JSON.

Extra classes useful for testing#

FakeReadFromPubSub

A fake ReadFromPubSub to simulate Pub/Sub messages in tests.

FakeWriteToBigQuery

A fake WriteToBigQuery transform for testing purposes.

class ApplySlidingWindows(period, offset, assign_timestamps=False, timestamp_field='timestamp', **kwargs)[source]#

Applies sliding windows, optionally assigning event timestamps.

Parameters:
  • period (float) – The window period (interval between window start times), in seconds.

  • offset (float) – The offset to apply to window start times, in seconds.

  • assign_timestamps (bool) – Whether to assign timestamps using a field from each element.

  • timestamp_field (str) – The name of the field containing the timestamp (UNIX time, in seconds).

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Apply sliding windows to the input PCollection.

Optionally assigns event-time timestamps using the configured timestamp field.

Parameters:

pcoll (PCollection[Dict[str, Any]]) – A PCollection of dictionaries containing a timestamp field.

Returns:

A windowed PCollection with sliding windows applied.

Return type:

PCollection[Dict[str, Any]]

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class FakeReadFromPubSub(*args, messages=None, **kwargs)[source]#

A fake ReadFromPubSub to simulate Pub/Sub messages in tests.

Parameters:

messages (list[dict] | None) – A list of dictionaries representing Pub/Sub messages. Each dictionary is passed as keyword arguments to apache_beam.io.gcp.pubsub.PubsubMessage.

Note

Any additional *args and **kwargs are accepted for API compatibility with ReadFromPubSub, but are ignored.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Returns a PCollection created from self.messages list.

Return type:

PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class FakeWriteToBigQuery(**kwargs)[source]#

A fake WriteToBigQuery transform for testing purposes.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Overrides the expand method to do nothing.

Return type:

PCollection[Any]

static get_dict_table_schema(schema)#

Transform the table schema into a dictionary instance.

Parameters:

schema (str, dict, TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the dictionary format.

Return type:

Dict[str, Any]

static get_table_schema_from_string(schema)#

Transform the string table schema into a TableSchema instance.

Parameters:

schema (str) – The string schema to be used if the BigQuery table to write has to be created.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the TableSchema format.

Return type:

TableSchema

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

static table_schema_to_dict(table_schema)#

Create a dictionary representation of table schema for serialization

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class GroupBy(*fields, elements='', dict_fields=True, **kwargs)[source]#

Wrapper around beam.GroupBy with automatic labeling.

This transform wraps Beam’s native beam.GroupBy and adds an automatically generated label based on the grouping keys. For example, grouping by [“user”, “country”] with elements=”Sessions” results in a label like GroupSessionsByUserAndCountry.

If dict_fields=True (default), string positional fields are interpreted as dictionary keys and wrapped with operator.itemgetter(). If False, strings are treated as attribute names.

Example

pcoll | GroupBy("user", "country", elements="Sessions")
Parameters:
  • *fields (Any) – Positional key fields to group by. If these are strings and dict_fields=True, they will be interpreted as dictionary keys.

  • elements (str) – A human-readable label describing the grouped elements (e.g., Messages or Sessions). It is used to generate the step label.

  • dict_fields (bool) – If True (default), string fields are interpreted as dictionary keys and wrapped with operator.itemgetter(). Set to False to use Beam’s default behavior (attribute access).

  • **kwargs (Any) – Same as beam.GroupBy interface.

classmethod create_label(keys, elements)[source]#

Generate a descriptive label for the GroupBy transform based on keys and elements.

Constructs a label string combining the human-readable element description and the grouping keys, formatted in a CamelCase style joined by ‘And’.

For example, keys ['user', 'country'] and elements ‘Sessions’ result in GroupSessionsByUserAndCountry.

Parameters:
  • keys (Sequence[str]) – A sequence of key field names used for grouping.

  • elements (str) – A human-readable label describing the grouped elements.

Returns:

A formatted string label for use as the PTransform’s step label.

Return type:

str

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Applies the wrapped Beam GroupBy transform to the input PCollection.

Return type:

PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class ReadAndDecodeFromPubSub(subscription_id, project, with_attributes=True, decode=True, decode_method='utf-8', read_from_pubsub_factory=<class 'apache_beam.io.gcp.pubsub.ReadFromPubSub'>, **read_from_pubsub_kwargs)[source]#

Wrapper around ReadFromPubSub with optional decoding.

It supports the following features:
  • Reading from a specific Pub/Sub subscription.

  • Optionally including message attributes.

  • Decoding message data using a specified method (default is UTF-8).

  • Allowing the use of a custom or mocked beam.ReadFromPubSub transform for testing purposes.

Parameters:
  • subscription_id (str) – The Pub/Sub subscription id from which to read messages.

  • with_attributes (bool) – Whether to include attributes in the Pub/Sub message. Default is True.

  • decode (bool) – Whether to decode the data from bytes to dictionary. Default is True.

  • decode_method (str) – The method used to decode the message data. Supported methods include standard encodings like utf-8, ascii, etc. Default is utf-8.

  • read_from_pubsub_factory (Callable[[...], ReadFromPubSub]) – A factory function to create a ReadFromPubSub instance. This is useful for testing when a custom or mocked beam.ReadFromPubSub implementation is needed. Default is the Beam beam.ReadFromPubSub class.

  • **read_from_pubsub_kwargs (Any) – Additional keyword arguments passed to the ReadFromPubSub transform. These can be used to specify custom parameters for the reading operation.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Applies the transform to the pipeline root and returns a PCollection of messages.

Parameters:

pcoll (PCollection) – An input PCollection. This is expected to be a PBegin when used with a real or mocked ReadFromPubSub, since Pub/Sub sources begin from the pipeline root.

Returns:

A PCollection of dictionaries where each dictionary contains: - “data”: the decoded message string (if decoding is enabled), - “attributes”: a dictionary of message attributes (if available).

Return type:

beam.PCollection

classmethod get_client_factory(mocked=False)[source]#

Returns a factory for ReadFromPubSub objects.

Return type:

Callable

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

property subscription: str#

Generates the full subscription path from project and subscription id.

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class ReadFromBigQuery(query, output_type=<class 'dict'>, method='EXPORT', use_standard_sql=True, read_from_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.ReadFromBigQuery'>, read_from_bigquery_kwargs=None, **kwargs)[source]#

Wrapper around ReadFromBigQuery with optional casting.

Parameters:
  • query (str) – The query to execute.

  • output_type (type) – The Beam type hint for the output (e.g., a NamedTuple). If not provided, defaults to dict.

  • method (str) – The method to use to read from BigQuery. It may be EXPORT or DIRECT_READ.

  • use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL dialect for this query. Defaults to True.

  • read_from_bigquery_factory (Callable[..., io.ReadFromBigQuery]) – A factory function used to create a ReadFromBigQuery instance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.

  • write_to_bigquery_kwargs – Any additional keyword arguments to be passed to ReadFromBigQuery class. Check official Apache Beam documentation.

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Applies PCollection to read from BigQuery.

Return type:

PCollection[Any]

classmethod from_query(query, use_type=False, **kwargs)[source]#

Creates a ReadFromBigQuery PTransform from a Query object.

Parameters:
  • query (Query) – An instance of a Query subclass. Its render method is used to produce the SQL query string.

  • use_type (bool) – If True, sets PTransform type to the provided output_type.

  • **kwargs (Any) – Any additional arguments for ReadFromBigQuery constructor.

Returns:

A configured ReadFromBigQuery instance.

Return type:

ReadFromBigQuery

classmethod get_client_factory(mocked=False)[source]#

Returns a factory for ReadFromPubSub objects.

Return type:

Callable

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class ReadFromJson(input_file, coder=<class 'dict'>, lines=False, create_kwargs=None, **kwargs)[source]#

Beam transform to read a PCollection from a JSON file.

This transform loads a local JSON or JSONLines file eagerly (outside the pipeline), then injects the resulting records into the pipeline using beam.Create.

Useful for testing, prototyping, or controlled ingestion.

Parameters:
  • input_file (str | Path) – Path to the local file to read.

  • coder (Callable) – Callable to apply to each decoded record. Defaults to dict.

  • lines (bool) – If True, interprets the input as newline-delimited JSON (JSONLines).

  • create_kwargs (dict | None) – Optional dictionary of keyword arguments to pass to beam.Create. Use this to control serialization, type hints, etc.

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

Raises:

ValueError – If the input file does not exist at pipeline construction time.

Example

with beam.Pipeline() as p:
    pcoll = p | ReadFromJson("data/input.json", lines=True)
    pcoll | beam.Map(print)
display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(p)[source]#

Apply transform to pipeline p: create PCollection from loaded JSON data.

Return type:

PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class ReadMatchingAvroFiles(path, start_dt, end_dt, buffer_hours=6, record_time_fn=None, strict=False, date_format='%Y-%m-%d', time_format='%H_%M_%SZ', allow_no_time=False, decode=True, decode_method='utf-8', read_all_from_avro_kwargs=None, **kwargs)[source]#

Wrapper around ReadAllFromAvro with filtering.

This transform’s primary function is to intelligently filter filenames based on a time range. It works by:

  1. Generating Date-based Patterns: It first generates a list of file patterns for each day within the specified start_dt and end_dt. This efficiently prunes the search space for large, time-partitioned datasets.

  2. Precise Datetime Filtering: After matching the daily patterns, it applies a second, more precise filter to ensure that only files with a timestamp strictly within the start_dt and end_dt are processed.

This PTransform is a generic and reusable component for any data pipeline that needs to perform historical data backfills on time-partitioned Avro files.

Parameters:
  • path (str) –

    The path to the location of the Avro files. It is assumed that the data is date-partitioned, so this parameter must include a date placeholder. It can be local path, a GCS location, or any other Beam-supported filesystem path. For example:

    • gs://my-bucket/nmea-{date}/*.avro

    • gs://my-bucket/*{date}*.avro

    • /path/to/data/{date}/*.avro

  • start_dt (str) – The start datetime of the range, in ISO format (e.g., YYYY-MM-DDTHH:MM:SS).

  • end_dt (str) – The end datetime of the range, in ISO format (e.g., YYYY-MM-DDTHH:MM:SS). Datetimes equal to this value are considered outside the range.

  • buffer_hours (int) – Number of extra hours to include before and after the datetime range. Internally used to derive both the day-level glob patterns and the precise datetime filter. Defaults to 6.

  • record_time_fn (Callable[[dict], datetime] | None) – Function that extracts a event timestamp from a record. It should accept a record dictionary and return a datetime. This allows custom logic such as accessing nested fields, parsing strings, or applying fallback values. The extracted timestamp is used for the last filtering step.

  • strict (bool) – If True, raises an exception if the record_time_fn failed to extract the timestamp. If False, will skip the failing record.

  • date_format (str) – The strftime/strptime format to use when matching dates in avro files. Defaults to %Y-%m-%d.

  • time_format (str) – The strftime/strptime format to use when matching times in avro files. Defaults to %H_%M_%SZ.

  • allow_no_time (bool) – If True, allows paths to not contain time information, and a default of 0 will be applied. If False, it will raise a ValueError.

  • decode (bool) – Whether to decode the data from bytes to string. Default is True.

  • decode_method (str) – The method used to decode the message data. Supported methods include standard encodings like utf-8, ascii, etc. Default is utf-8.

  • read_all_from_avro_kwargs (dict[str, Any] | None) –

    Any additional keyword arguments to be passed to Beam’s ReadAllFromAvro class. Check official Apache Beam documentation.

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

Raises:

ValueError – When a path does not contain time information and allow_no_time is False.

Returns:

A PCollection of Avro records from the files within the specified datetime range.

Return type:

PCollection

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Applies the transform to the pipeline root and returns a PCollection of messages.

Parameters:

pcoll (PCollection) – An input PCollection. This is expected to be a PBegin when used with a real or mocked ReadFromPubSub, since Pub/Sub sources begin from the pipeline root.

Returns:

A PCollection of dictionaries where each dictionary contains the following keys:

  • data: The decoded message string (if decoding is enabled).

  • attributes: A dictionary of message attributes (if available).

Return type:

beam.PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

is_path_in_range(path)[source]#

Checks if a path containing a datetime is within the provided datetime range.

Return type:

bool

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class SampleAndLogElements(sample_size=None, window_size=60, pretty_print=False, message='Element: {e}')[source]#

A Beam PTransform that logs elements of a PCollection.

Parameters:
  • sample_size (int | None) – The number of elements to log. If not provided, logs all elements.

  • window_size (int) – The window duration in seconds used when sampling unbounded sources; only applicable when sample_size is set.

  • pretty_print (bool) – If True, formats each element as pretty-printed JSON when possible.

  • message (str) – A custom string format for the log message. Must contain the placeholder {e}.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Log elements of a PCollection, optionally sampling a sample_size elements.

Return type:

PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class WriteToBigQueryWrapper(table, schema=None, label=None, convert_timestamps=False, write_to_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.WriteToBigQuery'>, **write_to_bigquery_kwargs)[source]#

Wrapper around WriteToBigQuery with extended functionality.

Key Features: - Provides a simpler interface define a schema using a list of dictionaries. - Automatically selects writing method based on pipeline mode (streaming vs. batch) and runner. - Automatically converts TIMESTAMP fields to Timestamp objects when needed

(streaming with STORAGE_WRITE_API).

Parameters:
  • table (str) – The BigQuery table to write to (in the format project:dataset.table).

  • schema (list[dict[str, str]] | None) – The schema for the BigQuery table.

  • convert_timestamps (bool) – If True, converts TIMESTAMP fields to Timestamp objects when a streaming pipeline is using STORAGE_WRITE_API method, which requires Apache Beam Timestamp objects. See https://beam.apache.org/documentation/io/built-in/google-bigquery/.

  • write_to_big_query_factory – A factory function used to create a beam.io.WriteToBigQuery instance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.

  • **write_to_bigquery_kwargs (Any) –

    Any additional keyword arguments to be passed to beam.io.WriteToBigQuery class. Check official Apache Beam documentation.

Example

from pipe_nmea.common.beam.transforms import bigquery

pcoll | "Write" >> bigquery.WriteToPartitionedBigQuery(
    table="project:dataset.table",
    schema=[{"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, ...],
)
display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Writes the input PCollection to BigQuery, creating the table if it does not exist.

Before applying the WriteToBigQuery transform, this method ensures that the target table is created with the specified schema, partitioning, and clustering configurations.

Parameters:

pcoll (PCollection[dict[str, Any]]) – The input PCollection to write to BigQuery.

Returns:

An empty PCollection that acts as a signal for the completion of the write step. It can be used to chain additional transforms (e.g., logging or monitoring), but typically it contains no elements and exists primarily to signal that the write step has occurred within the pipeline.

Return type:

PCollection[dict[str, Any]]

static float_to_beam_timestamp(row, fields)[source]#

Converts in-place specified fields in a dictionary from float to Beam Timestamp objects.

Parameters:
  • row (dict[str, Any]) – A dictionary containing data with potential float values.

  • fields (list[str]) – A tuple of field names to be converted to Timestamp.

Returns:

The input dictionary with specified fields converted to Timestamp objects.

Return type:

dict[str, Any]

classmethod get_client_factory(mocked=False)[source]#

Returns a factory for beam.WriteToBigQuery objects.

Return type:

Callable

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

static resolve_write_method(standard_options)[source]#

Resolves the appropriate write method to use to write to BigQuery.

The selection logic is based on the StandardOptions of the pipeline in which beam.WriteToBigQuery transform is used.

The default behavior differs from the one in beam.WriteToBigQuery, where STREAMING_INSERTS is used for streaming pipelines. Here, we prefer STORAGE_WRITE_API for streaming pipelines, which is Google’s recommended method for high-throughput, low-latency streaming writes.

As of Apache Beam 2.64, STORAGE_API_AT_LEAST_ONCE is not available in python, but STORAGE_WRITE_API can be used for at-least-once semantics.

See https://cloud.google.com/dataflow/docs/guides/write-to-bigquery.

Parameters:

standard_options (StandardOptions) – The StandardOptions of the pipeline in which WriteToBigQuery transform is used.

Returns:

A string representing the selected write method. One of ("STREAMING_INSERTS", "FILE_LOADS", "STORAGE_WRITE_API").

Return type:

str

property schema: dict[str, Any] | None#

Returns the BigQuery schema in the format expected by beam.WriteToBigQuery.

The provided schema as a list of dictionaries (e.g., [{"name": ..., "type": ..., ...}]), is wrapped in a dictionary under the “fields” key.

Returns:

A dictionary of the form {"fields": [...]}.

property timestamp_fields: List[str]#

Extract the field names of type TIMESTAMP from the schema.

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform

class WriteToJson(output_dir='workdir', output_prefix='', **kwargs)[source]#

Writes PCollection as JSON.

Parameters:
  • output_dir (str) – Output directory.

  • output_prefix (str) – Prefix to use in filename/s.

  • **kwargs (Any) – Additional keyword arguments passed to base PTransform class.

display_data()#

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:

A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}

Return type:

Dict[str, Any]

expand(pcoll)[source]#

Writes the input PCollection to a JSON file.

Return type:

PCollection

get_type_hints()#

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)#

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Return type:

Windowing

with_input_types(input_type_hint)#

Annotates the input type of a PTransform with a type-hint.

Parameters:

input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.

Raises:

TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

with_output_types(type_hint, **tagged_type_hints)#

Annotates the output type of a PTransform with a type-hint.

Parameters:
  • type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint. This is the type hint for the main output.

  • **tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g., errors=str.

Raises:

TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.

Returns:

A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.

Return type:

PTransform

Example::
result = pcoll | beam.ParDo(MyDoFn()).with_output_types(

int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type

).with_outputs(‘errors’, ‘warnings’, main=’main’)

with_resource_hints(**kwargs)#

Adds resource hints to the PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

Parameters:

**kwargs – key-value pairs describing hints and their values.

Raises:

ValueError – if provided hints are unknown to the SDK. See apache_beam.transforms.resources for a list of known hints.

Returns:

A reference to the instance of this particular PTransform object.

Return type:

PTransform