gfw.common.beam.transforms#
Package for reusable and well-tested Apache Beam PTransforms.
This package provides a collection of reusable PTransform components designed to simplify and standardize data processing patterns in Apache Beam pipelines.
Each transform in this package is developed with an emphasis on clarity, testability, and composability — making it easier to write robust and maintainable pipelines across both batch and streaming modes.
These components aim to serve as building blocks to accelerate development while maintaining high code quality and reducing duplication.
Classes#
Applies sliding windows, optionally assigning event timestamps. |
|
Wrapper around |
|
Wrapper around |
|
Wrapper around |
|
Beam transform to read a PCollection from a JSON file. |
|
Wrapper around |
|
A Beam PTransform that logs elements of a PCollection. |
|
Wrapper around |
|
Writes PCollection as JSON. |
Extra classes useful for testing#
A fake ReadFromPubSub to simulate Pub/Sub messages in tests. |
|
A fake WriteToBigQuery transform for testing purposes. |
- class ApplySlidingWindows(period, offset, assign_timestamps=False, timestamp_field='timestamp', **kwargs)[source]#
Applies sliding windows, optionally assigning event timestamps.
- Parameters:
period (float) – The window period (interval between window start times), in seconds.
offset (float) – The offset to apply to window start times, in seconds.
assign_timestamps (bool) – Whether to assign timestamps using a field from each element.
timestamp_field (str) – The name of the field containing the timestamp (UNIX time, in seconds).
**kwargs (Any) – Additional keyword arguments passed to base PTransform class.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Apply sliding windows to the input PCollection.
Optionally assigns event-time timestamps using the configured timestamp field.
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class FakeReadFromPubSub(*args, messages=None, **kwargs)[source]#
A fake ReadFromPubSub to simulate Pub/Sub messages in tests.
- Parameters:
messages (list[dict] | None) – A list of dictionaries representing Pub/Sub messages. Each dictionary is passed as keyword arguments to
apache_beam.io.gcp.pubsub.PubsubMessage.
Note
Any additional
*argsand**kwargsare accepted for API compatibility withReadFromPubSub, but are ignored.- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Returns a PCollection created from self.messages list.
- Return type:
PCollection
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class FakeWriteToBigQuery(**kwargs)[source]#
A fake WriteToBigQuery transform for testing purposes.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- static get_dict_table_schema(schema)#
Transform the table schema into a dictionary instance.
- Parameters:
schema (str, dict, TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.
- Returns:
The schema to be used if the BigQuery table to write has to be created but in the dictionary format.
- Return type:
Dict[str, Any]
- static get_table_schema_from_string(schema)#
Transform the string table schema into a
TableSchemainstance.- Parameters:
schema (str) – The string schema to be used if the BigQuery table to write has to be created.
- Returns:
The schema to be used if the BigQuery table to write has to be created but in the
TableSchemaformat.- Return type:
TableSchema
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- static table_schema_to_dict(table_schema)#
Create a dictionary representation of table schema for serialization
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class GroupBy(*fields, elements='', dict_fields=True, **kwargs)[source]#
Wrapper around
beam.GroupBywith automatic labeling.This transform wraps Beam’s native
beam.GroupByand adds an automatically generated label based on the grouping keys. For example, grouping by [“user”, “country”] with elements=”Sessions” results in a label likeGroupSessionsByUserAndCountry.If
dict_fields=True(default), string positional fields are interpreted as dictionary keys and wrapped withoperator.itemgetter(). If False, strings are treated as attribute names.Example
pcoll | GroupBy("user", "country", elements="Sessions")
- Parameters:
*fields (Any) – Positional key fields to group by. If these are strings and
dict_fields=True, they will be interpreted as dictionary keys.elements (str) – A human-readable label describing the grouped elements (e.g.,
MessagesorSessions). It is used to generate the step label.dict_fields (bool) – If True (default), string fields are interpreted as dictionary keys and wrapped with
operator.itemgetter(). Set to False to use Beam’s default behavior (attribute access).**kwargs (Any) – Same as
beam.GroupByinterface.
- classmethod create_label(keys, elements)[source]#
Generate a descriptive label for the GroupBy transform based on keys and elements.
Constructs a label string combining the human-readable element description and the grouping keys, formatted in a CamelCase style joined by ‘And’.
For example, keys
['user', 'country']and elements ‘Sessions’ result inGroupSessionsByUserAndCountry.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Applies the wrapped Beam GroupBy transform to the input PCollection.
- Return type:
PCollection
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class ReadAndDecodeFromPubSub(subscription_id, project, with_attributes=True, decode=True, decode_method='utf-8', read_from_pubsub_factory=<class 'apache_beam.io.gcp.pubsub.ReadFromPubSub'>, **read_from_pubsub_kwargs)[source]#
Wrapper around
ReadFromPubSubwith optional decoding.- It supports the following features:
Reading from a specific Pub/Sub subscription.
Optionally including message attributes.
Decoding message data using a specified method (default is
UTF-8).Allowing the use of a custom or mocked
beam.ReadFromPubSubtransform for testing purposes.
- Parameters:
subscription_id (str) – The Pub/Sub subscription id from which to read messages.
with_attributes (bool) – Whether to include attributes in the Pub/Sub message. Default is True.
decode (bool) – Whether to decode the data from bytes to dictionary. Default is True.
decode_method (str) – The method used to decode the message data. Supported methods include standard encodings like
utf-8,ascii, etc. Default isutf-8.read_from_pubsub_factory (Callable[[...], ReadFromPubSub]) – A factory function to create a
ReadFromPubSubinstance. This is useful for testing when a custom or mockedbeam.ReadFromPubSubimplementation is needed. Default is the Beambeam.ReadFromPubSubclass.**read_from_pubsub_kwargs (Any) – Additional keyword arguments passed to the
ReadFromPubSubtransform. These can be used to specify custom parameters for the reading operation.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Applies the transform to the pipeline root and returns a PCollection of messages.
- Parameters:
pcoll (PCollection) – An input PCollection. This is expected to be a
PBeginwhen used with a real or mockedReadFromPubSub, since Pub/Sub sources begin from the pipeline root.- Returns:
A PCollection of dictionaries where each dictionary contains: - “data”: the decoded message string (if decoding is enabled), - “attributes”: a dictionary of message attributes (if available).
- Return type:
beam.PCollection
- classmethod get_client_factory(mocked=False)[source]#
Returns a factory for
ReadFromPubSubobjects.- Return type:
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class ReadFromBigQuery(query, output_type=<class 'dict'>, method='EXPORT', use_standard_sql=True, read_from_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.ReadFromBigQuery'>, read_from_bigquery_kwargs=None, **kwargs)[source]#
Wrapper around
ReadFromBigQuerywith optional casting.- Parameters:
query (str) – The query to execute.
output_type (type) – The Beam type hint for the output (e.g., a
NamedTuple). If not provided, defaults to dict.method (str) – The method to use to read from BigQuery. It may be
EXPORTorDIRECT_READ.use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL dialect for this query. Defaults to True.
read_from_bigquery_factory (Callable[..., io.ReadFromBigQuery]) – A factory function used to create a
ReadFromBigQueryinstance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.write_to_bigquery_kwargs – Any additional keyword arguments to be passed to
ReadFromBigQueryclass. Check official Apache Beam documentation.**kwargs (Any) – Additional keyword arguments passed to base PTransform class.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- classmethod from_query(query, use_type=False, **kwargs)[source]#
Creates a ReadFromBigQuery PTransform from a Query object.
- Parameters:
- Returns:
A configured
ReadFromBigQueryinstance.- Return type:
- classmethod get_client_factory(mocked=False)[source]#
Returns a factory for ReadFromPubSub objects.
- Return type:
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class ReadFromJson(input_file, coder=<class 'dict'>, lines=False, create_kwargs=None, **kwargs)[source]#
Beam transform to read a PCollection from a JSON file.
This transform loads a local JSON or JSONLines file eagerly (outside the pipeline), then injects the resulting records into the pipeline using
beam.Create.Useful for testing, prototyping, or controlled ingestion.
- Parameters:
coder (Callable) – Callable to apply to each decoded record. Defaults to
dict.lines (bool) – If True, interprets the input as newline-delimited JSON (JSONLines).
create_kwargs (dict | None) – Optional dictionary of keyword arguments to pass to
beam.Create. Use this to control serialization, type hints, etc.**kwargs (Any) – Additional keyword arguments passed to base PTransform class.
- Raises:
ValueError – If the input file does not exist at pipeline construction time.
Example
with beam.Pipeline() as p: pcoll = p | ReadFromJson("data/input.json", lines=True) pcoll | beam.Map(print)
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(p)[source]#
Apply transform to pipeline
p: create PCollection from loaded JSON data.- Return type:
PCollection
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class ReadMatchingAvroFiles(path, start_dt, end_dt, buffer_hours=6, record_time_fn=None, strict=False, date_format='%Y-%m-%d', time_format='%H_%M_%SZ', allow_no_time=False, decode=True, decode_method='utf-8', read_all_from_avro_kwargs=None, **kwargs)[source]#
Wrapper around
ReadAllFromAvrowith filtering.This transform’s primary function is to intelligently filter filenames based on a time range. It works by:
Generating Date-based Patterns: It first generates a list of file patterns for each day within the specified
start_dtandend_dt. This efficiently prunes the search space for large, time-partitioned datasets.Precise Datetime Filtering: After matching the daily patterns, it applies a second, more precise filter to ensure that only files with a timestamp strictly within the
start_dtandend_dtare processed.
This PTransform is a generic and reusable component for any data pipeline that needs to perform historical data backfills on time-partitioned Avro files.
- Parameters:
path (str) –
The path to the location of the Avro files. It is assumed that the data is date-partitioned, so this parameter must include a
dateplaceholder. It can be local path, a GCS location, or any other Beam-supported filesystem path. For example:gs://my-bucket/nmea-{date}/*.avrogs://my-bucket/*{date}*.avro/path/to/data/{date}/*.avro
start_dt (str) – The start datetime of the range, in ISO format (e.g.,
YYYY-MM-DDTHH:MM:SS).end_dt (str) – The end datetime of the range, in ISO format (e.g.,
YYYY-MM-DDTHH:MM:SS). Datetimes equal to this value are considered outside the range.buffer_hours (int) – Number of extra hours to include before and after the datetime range. Internally used to derive both the day-level glob patterns and the precise datetime filter. Defaults to 6.
record_time_fn (Callable[[dict], datetime] | None) – Function that extracts a event timestamp from a record. It should accept a record dictionary and return a
datetime. This allows custom logic such as accessing nested fields, parsing strings, or applying fallback values. The extracted timestamp is used for the last filtering step.strict (bool) – If True, raises an exception if the
record_time_fnfailed to extract the timestamp. If False, will skip the failing record.date_format (str) – The strftime/strptime format to use when matching dates in avro files. Defaults to
%Y-%m-%d.time_format (str) – The strftime/strptime format to use when matching times in avro files. Defaults to
%H_%M_%SZ.allow_no_time (bool) – If True, allows paths to not contain time information, and a default of 0 will be applied. If False, it will raise a
ValueError.decode (bool) – Whether to decode the data from bytes to string. Default is True.
decode_method (str) – The method used to decode the message data. Supported methods include standard encodings like
utf-8,ascii, etc. Default isutf-8.read_all_from_avro_kwargs (dict[str, Any] | None) –
Any additional keyword arguments to be passed to Beam’s
ReadAllFromAvroclass. Check official Apache Beam documentation.**kwargs (Any) – Additional keyword arguments passed to base PTransform class.
- Raises:
ValueError – When a path does not contain time information and
allow_no_timeis False.- Returns:
A PCollection of Avro records from the files within the specified datetime range.
- Return type:
PCollection
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Applies the transform to the pipeline root and returns a PCollection of messages.
- Parameters:
pcoll (PCollection) – An input PCollection. This is expected to be a
PBeginwhen used with a real or mockedReadFromPubSub, since Pub/Sub sources begin from the pipeline root.- Returns:
A PCollection of dictionaries where each dictionary contains the following keys:
data: The decoded message string (if decoding is enabled).attributes: A dictionary of message attributes (if available).
- Return type:
beam.PCollection
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- is_path_in_range(path)[source]#
Checks if a path containing a datetime is within the provided datetime range.
- Return type:
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class SampleAndLogElements(sample_size=None, window_size=60, pretty_print=False, message='Element: {e}')[source]#
A Beam PTransform that logs elements of a PCollection.
- Parameters:
sample_size (int | None) – The number of elements to log. If not provided, logs all elements.
window_size (int) – The window duration in seconds used when sampling unbounded sources; only applicable when
sample_sizeis set.pretty_print (bool) – If True, formats each element as pretty-printed JSON when possible.
message (str) – A custom string format for the log message. Must contain the placeholder
{e}.
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Log elements of a PCollection, optionally sampling a
sample_sizeelements.- Return type:
PCollection
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class WriteToBigQueryWrapper(table, schema=None, label=None, convert_timestamps=False, write_to_bigquery_factory=<class 'apache_beam.io.gcp.bigquery.WriteToBigQuery'>, **write_to_bigquery_kwargs)[source]#
Wrapper around
WriteToBigQuerywith extended functionality.Key Features: - Provides a simpler interface define a schema using a list of dictionaries. - Automatically selects writing method based on pipeline mode (streaming vs. batch) and runner. - Automatically converts TIMESTAMP fields to Timestamp objects when needed
(streaming with
STORAGE_WRITE_API).- Parameters:
table (str) – The BigQuery table to write to (in the format
project:dataset.table).schema (list[dict[str, str]] | None) – The schema for the BigQuery table.
convert_timestamps (bool) – If True, converts
TIMESTAMPfields to Timestamp objects when a streaming pipeline is usingSTORAGE_WRITE_APImethod, which requires Apache Beam Timestamp objects. See https://beam.apache.org/documentation/io/built-in/google-bigquery/.write_to_big_query_factory – A factory function used to create a
beam.io.WriteToBigQueryinstance. This is primarily useful for testing, where you may want to inject a custom or fake implementation instead of using the real transform. If not provided, the default class will be used.**write_to_bigquery_kwargs (Any) –
Any additional keyword arguments to be passed to
beam.io.WriteToBigQueryclass. Check official Apache Beam documentation.
Example
from pipe_nmea.common.beam.transforms import bigquery pcoll | "Write" >> bigquery.WriteToPartitionedBigQuery( table="project:dataset.table", schema=[{"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, ...], )
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- expand(pcoll)[source]#
Writes the input PCollection to BigQuery, creating the table if it does not exist.
Before applying the
WriteToBigQuerytransform, this method ensures that the target table is created with the specified schema, partitioning, and clustering configurations.- Parameters:
pcoll (PCollection[dict[str, Any]]) – The input PCollection to write to BigQuery.
- Returns:
An empty PCollection that acts as a signal for the completion of the write step. It can be used to chain additional transforms (e.g., logging or monitoring), but typically it contains no elements and exists primarily to signal that the write step has occurred within the pipeline.
- Return type:
- static float_to_beam_timestamp(row, fields)[source]#
Converts in-place specified fields in a dictionary from float to Beam Timestamp objects.
- classmethod get_client_factory(mocked=False)[source]#
Returns a factory for
beam.WriteToBigQueryobjects.- Return type:
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- static resolve_write_method(standard_options)[source]#
Resolves the appropriate write method to use to write to BigQuery.
The selection logic is based on the StandardOptions of the pipeline in which
beam.WriteToBigQuerytransform is used.The default behavior differs from the one in
beam.WriteToBigQuery, whereSTREAMING_INSERTSis used for streaming pipelines. Here, we preferSTORAGE_WRITE_APIfor streaming pipelines, which is Google’s recommended method for high-throughput, low-latency streaming writes.As of Apache Beam 2.64,
STORAGE_API_AT_LEAST_ONCEis not available in python, butSTORAGE_WRITE_APIcan be used for at-least-once semantics.See https://cloud.google.com/dataflow/docs/guides/write-to-bigquery.
- Parameters:
standard_options (StandardOptions) – The StandardOptions of the pipeline in which WriteToBigQuery transform is used.
- Returns:
A string representing the selected write method. One of
("STREAMING_INSERTS", "FILE_LOADS", "STORAGE_WRITE_API").- Return type:
- property schema: dict[str, Any] | None#
Returns the BigQuery schema in the format expected by
beam.WriteToBigQuery.The provided schema as a list of dictionaries (e.g.,
[{"name": ..., "type": ..., ...}]), is wrapped in a dictionary under the “fields” key.- Returns:
A dictionary of the form
{"fields": [...]}.
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform
- class WriteToJson(output_dir='workdir', output_prefix='', **kwargs)[source]#
Writes PCollection as JSON.
- Parameters:
- display_data()#
Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
- Returns:
A dictionary containing
key:valuepairs. The value might be an integer, float or string value; aDisplayDataItemfor values that have more data (e.g. short value, label, url); or aHasDisplayDatainstance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
- Return type:
Dict[str, Any]
- get_type_hints()#
Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
- get_windowing(inputs)#
Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
- Return type:
Windowing
- with_input_types(input_type_hint)#
Annotates the input type of a
PTransformwith a type-hint.- Parameters:
input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a
TypeConstraint.- Raises:
TypeError – If input_type_hint is not a valid type-hint. See
apache_beam.typehints.typehints.validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- with_output_types(type_hint, **tagged_type_hints)#
Annotates the output type of a
PTransformwith a type-hint.- Parameters:
type_hint (type) – An instance of an allowed built-in type, a custom class, or a
TypeConstraint. This is the type hint for the main output.**tagged_type_hints – Type hints for tagged outputs. Each keyword argument specifies the type for a tagged output e.g.,
errors=str.
- Raises:
TypeError – If type_hint is not a valid type-hint. See
validate_composite_type_param()for further details.- Returns:
A reference to the instance of this particular
PTransformobject. This allows chaining type-hinting related methods.- Return type:
PTransform
- Example::
- result = pcoll | beam.ParDo(MyDoFn()).with_output_types(
int, # main output type errors=str, # ‘errors’ tagged output type warnings=str # ‘warnings’ tagged output type
).with_outputs(‘errors’, ‘warnings’, main=’main’)
- with_resource_hints(**kwargs)#
Adds resource hints to the
PTransform.Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.
- Parameters:
**kwargs – key-value pairs describing hints and their values.
- Raises:
ValueError – if provided hints are unknown to the SDK. See
apache_beam.transforms.resourcesfor a list of known hints.- Returns:
A reference to the instance of this particular
PTransformobject.- Return type:
PTransform