Source code for gfw.common.beam.testing.utils

"""General utilities for testing Apache Beam pipelines."""

from itertools import zip_longest
from typing import Any, Callable, Iterable, List, Sequence

from apache_beam.testing.util import BeamAssertException
from rich.console import Console, Group, RenderableType

from gfw.common.diff import compare_items, render_diff_panel
from gfw.common.sorting import sort_dicts


def _default_equals_fn(e: Any, a: Any) -> bool:
    return e == a


def _raise_with_diff(diffs: Sequence[RenderableType]) -> None:
    # Set up a Rich Console that records output
    console = Console(record=True, force_terminal=True, width=130)

    # Render diffs to console (only into memory, not to screen)
    console.print(Group(*diffs))

    # Export the captured diff as text with ANSI codes
    diff_text = console.export_text(styles=True)

    # Raise exception with embedded colored diff
    raise BeamAssertException(f"PCollection contents differ: \n{diff_text}.")


[docs] def equal_to( expected: List[Any], equals_fn: Callable[[Any, Any], bool] = _default_equals_fn ) -> Callable[[List[Any]], None]: """Drop-in replacement for :func:`apache_beam.testing.util.equal_to` with rich diff output. This matcher performs unordered comparison of top-level elements in actual and expected PCollection outputs, just like Apache Beam's :func:`~apache_beam.testing.util.equal_to`. However, it adds a rich diff visualization to help debug mismatches by rendering side-by-side differences. Use in tests with ``assert_that(pcoll, equal_to(expected))``. Note: - Only top-level permutations are considered equal: ``[1, 2]`` and ``[2, 1]`` are equal, but ``[[1, 2]]`` and ``[[2, 1]]`` are not. - If elements are not directly comparable, a fallback comparison using a custom equality function or deep diff is used. This helps handle: - Collections with types that don't have a deterministic sort order (e.g., :class:`pyarrow.Tables` as of 0.14.1). - Collections containing elements of different types. Args: expected: Iterable of expected PCollection elements. equals_fn: Optional function ``(expected_item, actual_item) -> bool`` to customize equality. Returns: A matcher function for use with :class:`apache_beam.testing.util.assert_that`. """ def _matcher(actual: Iterable[Any]) -> None: expected_list = [sort_dicts(e) for e in expected] actual_list = [sort_dicts(e) for e in actual] try: if actual_list == expected_list: return except TypeError: pass # Slower method, fallback comparison. unmatched_expected = expected_list[:] unmatched_actual = [] for a in actual_list: for i, e in enumerate(unmatched_expected): if equals_fn(e, a): unmatched_expected.pop(i) break else: unmatched_actual.append(a) if not unmatched_actual and not unmatched_expected: return diffs = [] for i, (a, b) in enumerate( zip_longest(unmatched_actual, unmatched_expected, fillvalue={}), 1 ): left, right, changed = compare_items(a, b) if changed: diffs.append(render_diff_panel(left, right, i)) if diffs: # Diffs found. Raise exception with colorized output. _raise_with_diff(diffs) return _matcher