Source code for gfw.common.beam.transforms.write_to_json

"""Module with reusable PTransforms for writing output PCollections."""

import json

from datetime import datetime
from pathlib import Path
from typing import Any

import apache_beam as beam

from apache_beam.pvalue import PCollection


[docs] class WriteToJson(beam.PTransform): """Writes PCollection as JSON. Args: output_dir: Output directory. output_prefix: Prefix to use in filename/s. **kwargs: Additional keyword arguments passed to base PTransform class. """ WORKDIR_DEFAULT = "workdir" def __init__( self, output_dir: str = WORKDIR_DEFAULT, output_prefix: str = "", **kwargs: Any ) -> None: super().__init__(**kwargs) self._output_dir = Path(output_dir) time = datetime.now().isoformat(timespec="seconds").replace("-", "").replace(":", "") self._output_prefix = f"beam-{output_prefix}-{time}" self._prefix = self._output_dir.joinpath(self._output_prefix).as_posix() self._shard_name_template = "" self._suffix = ".json" # This is what beam.io.WriteToText does to construct the path. self.path = Path("".join([self._prefix, self._shard_name_template, self._suffix]))
[docs] def expand(self, pcoll: PCollection) -> PCollection: """Writes the input PCollection to a JSON file.""" return pcoll | "WriteToJson" >> ( beam.Map(json.dumps) | beam.io.WriteToText( self._prefix, shard_name_template=self._shard_name_template, file_name_suffix=self._suffix, ) ) """ Why not use :class:`beam.io.WriteToJson`? `WriteToJson` has issues writing to local files. WriteToJson raises a ValueError when the path does not point to a GCS location. It works when used together with `ReadFromBigQuery` and a GCS location is specified there. This makes it unreliable for local development or testing. Additionally, it internally relies on :meth:`pandas.DataFrame.to_json`, which introduces extra dependencies and may not preserve the original structure of dict-like records. https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToJson Example usage of :class:`beam.io.WriteToJson`: .. code-block:: python from apache_beam.io.fileio import default_file_naming file_naming = default_file_naming(prefix=self._output_prefix, suffix=".json") return pcoll | beam.io.WriteToJson( self._output_dir.as_posix(), file_naming=file_naming, lines=True, indent=4, ) For these reasons, we use :class:`beam.io.WriteToText` + :func:``json.dumps``, which is lightweight, predictable, and preserves control over formatting and encoding. https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText """