Source code for gfw.common.beam.transforms.pubsub
"""This module contains a custom Apache Beam `PTransform` called `ReadAndDecodeFromPubSub`.
The `ReadAndDecodeFromPubSub` class is designed to read messages from a Google Cloud Pub/Sub
subscription, decode them (if necessary),
and return the messages in a structured format as a dictionary.
"""
import codecs
import logging
from functools import cached_property
from typing import Any, Callable, Optional
import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage, ReadFromPubSub
from apache_beam.pvalue import PCollection
logger = logging.getLogger(__name__)
[docs]
class FakeReadFromPubSub(beam.PTransform[Any, Any]):
"""A fake ReadFromPubSub to simulate Pub/Sub messages in tests.
Args:
messages:
A list of dictionaries representing Pub/Sub messages. Each
dictionary is passed as keyword arguments to
:class:`apache_beam.io.gcp.pubsub.PubsubMessage`.
Note:
Any additional ``*args`` and ``**kwargs`` are accepted for API
compatibility with :class:`ReadFromPubSub`, but are ignored.
"""
def __init__(
self,
*args: Any,
messages: Optional[list[dict]] = None,
**kwargs: Any,
) -> None:
self.messages = messages or []
[docs]
def expand(self, pcoll: PCollection) -> PCollection:
"""Returns a PCollection created from self.messages list."""
return pcoll | beam.Create([PubsubMessage(**m) for m in self.messages])
[docs]
class ReadAndDecodeFromPubSub(beam.PTransform[Any, Any]):
"""Wrapper around :class:`~beam.ReadFromPubSub` with optional decoding.
It supports the following features:
- Reading from a specific Pub/Sub subscription.
- Optionally including message attributes.
- Decoding message data using a specified method (default is ``UTF-8``).
- Allowing the use of a custom or mocked :class:`beam.ReadFromPubSub`
transform for testing purposes.
Args:
subscription_id:
The Pub/Sub subscription id from which to read messages.
with_attributes:
Whether to include attributes in the Pub/Sub message.
Default is True.
decode:
Whether to decode the data from bytes to dictionary.
Default is True.
decode_method:
The method used to decode the message data.
Supported methods include standard encodings like ``utf-8``, ``ascii``, etc.
Default is ``utf-8``.
read_from_pubsub_factory:
A factory function to create a :class:`~beam.ReadFromPubSub` instance.
This is useful for testing when a custom or mocked :class:`beam.ReadFromPubSub`
implementation is needed. Default is the Beam :class:`beam.ReadFromPubSub` class.
**read_from_pubsub_kwargs:
Additional keyword arguments passed to the :class:`~beam.ReadFromPubSub` transform.
These can be used to specify custom parameters for the reading operation.
"""
SUBSCRIPTION = "projects/{project}/subscriptions/{subscription}"
def __init__(
self,
subscription_id: str,
project: str,
with_attributes: bool = True,
decode: bool = True,
decode_method: str = "utf-8",
read_from_pubsub_factory: Callable[..., ReadFromPubSub] = ReadFromPubSub,
**read_from_pubsub_kwargs: Any,
) -> None:
self._subscription_id = subscription_id
self._project = project
self._with_attributes = with_attributes
self._decode = decode
self._decode_method = decode_method
self._read_from_pubsub_factory = read_from_pubsub_factory
self._read_from_pubsub_kwargs = read_from_pubsub_kwargs
self._validate_decode_method()
[docs]
@classmethod
def get_client_factory(cls, mocked: bool = False) -> Callable:
"""Returns a factory for :class:`~beam.ReadFromPubSub` objects."""
if mocked:
return FakeReadFromPubSub
return ReadFromPubSub
@cached_property
def subscription(self) -> str:
"""Generates the full subscription path from project and subscription id."""
return self.SUBSCRIPTION.format(project=self._project, subscription=self._subscription_id)
[docs]
def expand(self, pcoll: PCollection) -> PCollection:
"""Applies the transform to the pipeline root and returns a PCollection of messages.
Args:
pcoll:
An input PCollection. This is expected to be a ``PBegin`` when used with a real
or mocked :class:`ReadFromPubSub`,
since Pub/Sub sources begin from the pipeline root.
Returns:
beam.PCollection:
A PCollection of dictionaries where each dictionary contains:
- "data": the decoded message string (if decoding is enabled),
- "attributes": a dictionary of message attributes (if available).
"""
messages = pcoll | self._read_from_pubsub_factory(
subscription=self.subscription,
with_attributes=self._with_attributes,
**self._read_from_pubsub_kwargs,
)
return messages | "ToDict" >> beam.Map(self._to_dict)
def _to_dict(self, message: PubsubMessage) -> dict:
data = message.data
if self._decode:
data = message.data.decode(self._decode_method)
return {
"data": data,
"attributes": message.attributes,
"publish_time": message.publish_time,
"message_id": message.message_id,
}
def _validate_decode_method(self) -> None:
try:
codecs.lookup(self._decode_method)
except LookupError as e:
raise ValueError(f"Unsupported decode method: {self._decode_method}") from e
logging.debug(f"Using decode method: {self._decode_method}.")