Source code for gfw.common.beam.transforms.read_from_json

"""Module with reusable source PTransform for reading JSON inputs."""

from pathlib import Path
from typing import Any, Callable, Optional, Union

import apache_beam as beam

from apache_beam.pvalue import PCollection

from gfw.common.io import json_load


[docs] class ReadFromJson(beam.PTransform): """Beam transform to read a PCollection from a JSON file. This transform loads a local JSON or JSONLines file eagerly (outside the pipeline), then injects the resulting records into the pipeline using :class:`beam.Create`. Useful for testing, prototyping, or controlled ingestion. Args: input_file: Path to the local file to read. coder: Callable to apply to each decoded record. Defaults to :class:`dict`. lines: If True, interprets the input as newline-delimited JSON (JSONLines). create_kwargs: Optional dictionary of keyword arguments to pass to :class:`beam.Create`. Use this to control serialization, type hints, etc. **kwargs: Additional keyword arguments passed to base PTransform class. Raises: ValueError: If the input file does not exist at pipeline construction time. Example: .. code-block:: python with beam.Pipeline() as p: pcoll = p | ReadFromJson("data/input.json", lines=True) pcoll | beam.Map(print) """ def __init__( self, input_file: Union[str, Path], coder: Callable = dict, lines: bool = False, create_kwargs: Optional[dict] = None, **kwargs: Any, ) -> None: """Builds a ReadFromJson instance.""" super().__init__(**kwargs) self._input_file = Path(input_file) self._coder = coder self._lines = lines self._create_kwargs = create_kwargs or {}
[docs] def expand(self, p: PCollection) -> PCollection: """Apply transform to pipeline ``p``: create PCollection from loaded JSON data.""" # Why not use beam.io.ReadFromJson instead of (beam.Create + json_load)? # Because ReadFromJson returns BeamSchema objects, not plain dicts, # and requires conversion like: dict(x._asdict()). # inputs = ( # p # | beam.io.ReadFromJson(str(input_file), lines=False, convert_dates=False) # | beam.Map(lambda x: dict(x._asdict())).with_output_types(Message) # ) # In our case, json_load + beam.Create gives us full control over parsing # and works better for small, local test/config files where eager loading is acceptable. if not self._input_file.exists(): raise ValueError(f"Input file does not exist: {self._input_file}") data = json_load(self._input_file, lines=self._lines, coder=self._coder) return p | beam.Create(data, **self._create_kwargs).with_output_types(self._coder)