Test Result Reporting (Publishing) ==================================== .. meta:: :description: Publish DQ test results from Lakehouse Plumber pipelines to external systems like Azure DevOps, Delta audit tables, or custom providers. Test result reporting extends :doc:`test_actions` by **publishing DQ expectation results to external systems** after each pipeline run. Lakehouse Plumber generates a ``@dp.on_event_hook()`` per pipeline that listens for DQ metrics in the Databricks event stream, accumulates pass/fail results, and calls a user-supplied **provider function** at pipeline completion. The design is pluggable: LHP generates the hook and wiring; you supply the provider module that decides *where* results go — Azure DevOps Test Plans, a Delta audit table, a REST API, or anything reachable from the Databricks driver. .. note:: **Prerequisites:** * ``test_reporting`` section configured in ``lhp.yaml`` * ``test_id`` set on each test action you want to report * ``--include-tests`` flag passed to ``lhp generate`` Architecture -------------------------------------------- The generated hook runs inside the Databricks pipeline process on the driver node: .. mermaid:: flowchart LR A["Pipeline Execution"] --> B["flow_progress events
(DQ expectations)"] B --> C["Hook accumulates
results per test_id"] C --> D{"Terminal state?
COMPLETED / FAILED
STOPPING / CANCELED"} D -->|Yes| E["Provider function
publish_results()"] E --> F["External System
(ADO / Delta / Custom)"] 1. As each test action's temporary table materializes, Databricks emits ``flow_progress`` events containing expectation pass/fail counts. 2. The generated hook filters events to only those tables mapped via ``test_id``, building an in-memory results list. 3. When the pipeline reaches a terminal state, the hook calls your provider function exactly once with all accumulated results. Quick Start -------------------------------------------- **Step 1 — Add** ``test_reporting`` **to** ``lhp.yaml`` .. code-block:: yaml # lhp.yaml test_reporting: module_path: py_functions/test_reporting_publisher.py function_name: publish_results **Step 2 — Add** ``test_id`` **to test actions** .. code-block:: yaml # pipelines/02_bronze/tst_customer_dq.yaml actions: - name: tst_customer_pk_uniqueness type: test test_type: uniqueness source: v_customer_bronze_DQE columns: [customer_id] on_violation: warn test_id: "SIT-G01" **Step 3 — Create a provider module** .. code-block:: python # py_functions/test_reporting_publisher.py def publish_results(results, config, context, spark): for r in results: print(f"[{r['test_id']}] {r['status']}: {r['expectation_name']}") return {"published": len(results), "failed": 0} **Step 4 — Generate with** ``--include-tests`` .. code-block:: bash lhp generate --env dev --include-tests **Step 5 — Inspect the generated output** .. code-block:: text generated/dev/my_pipeline/ ├── _test_reporting_hook.py # Event hook (generated) ├── test_reporting_providers/ │ ├── __init__.py │ └── test_reporting_publisher.py # Your provider module (copied) └── tst_customer_dq.py # Test action code Configuration Reference -------------------------------------------- ``lhp.yaml`` test_reporting Section ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Field - Required - Description * - ``module_path`` - Yes - Path to the provider Python module, relative to project root. * - ``function_name`` - Yes - Name of the callable inside the module (e.g., ``publish_results``). * - ``config_file`` - No - Path to a YAML file loaded as a dict and passed to the provider as ``config``. Useful for connection strings, API endpoints, or test-case mappings. ``test_id`` Field on Test Actions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``test_id`` is an **opt-in** field on any test action. Only actions with ``test_id`` set are included in reporting — this lets you control exactly which tests publish results while keeping other tests as internal validation. .. code-block:: yaml :caption: Actions with and without test_id actions: - name: tst_customer_pk_uniqueness type: test test_type: uniqueness source: v_customer_bronze_DQE columns: [customer_id] on_violation: warn test_id: "SIT-G01" # ← reported to external system - name: tst_customer_balance_range type: test test_type: range source: v_customer_bronze_DQE column: account_balance min_value: -1000 max_value: 10000 on_violation: warn # No test_id — runs normally but is NOT reported **Table name mapping rules:** The hook maps each test action's materialized table name to its ``test_id``. The table name used depends on whether the action has an explicit ``target``: * **Explicit target** — uses the ``target`` value directly (e.g., ``target: my_custom_name``). * **No target** — defaults to ``tmp_test_`` (e.g., action ``tst_customer_completeness`` → ``tmp_test_tst_customer_completeness``). .. _provider-interface: Provider Interface -------------------------------------------- Function Contract ~~~~~~~~~~~~~~~~~~ The provider function is called once per pipeline run with this signature: .. code-block:: python def publish_results(results, config, context, spark): """ Args: results: list[dict] — accumulated DQ results config: dict — from config_file (or empty dict) context: dict — pipeline execution context spark: SparkSession Returns: dict with "published" (int) and "failed" (int) counts """ **results** — Each entry is a dict with these keys: .. code-block:: python { "test_id": "SIT-G01", # From test action YAML "flow_name": "tst_customer_pk_uniqueness", # Unqualified table name "expectation_name": "uniqueness_check", # DQ expectation name "passed_records": 1000, # Records passing "failed_records": 0, # Records failing "status": "PASS", # "PASS" if failed == 0, else "FAIL" "collected_at": "2026-01-15T10:30:00+00:00", # ISO 8601 UTC timestamp } **config** — The parsed content of ``config_file`` if specified, otherwise ``{}``. **context** — Pipeline execution metadata: .. code-block:: python { "pipeline_id": "abc123-...", "update_id": "def456-...", "pipeline_name": "acmi_edw_bronze", "terminal_state": "COMPLETED", # or FAILED, STOPPING, CANCELED } **Return value** — A dict with counts: .. code-block:: python {"published": 5, "failed": 0} See `Writing a Custom Provider`_ below for a skeleton, error handling rules, and how substitution tokens work in provider modules. Generated Output -------------------------------------------- When ``test_reporting`` is configured and at least one test action has ``test_id``, ``lhp generate --include-tests`` produces these additional files per pipeline: .. code-block:: text generated/// ├── _test_reporting_hook.py # Generated event hook └── test_reporting_providers/ ├── __init__.py # Package init └── .py # Copy of your provider module The hook file is fully generated from a template — **do not edit it**. Key sections: .. code-block:: python :caption: _test_reporting_hook.py (annotated excerpt) from pyspark import pipelines as dp # Maps unqualified table names to external test IDs _TEST_ID_MAP = { "tst_customer_pk_uniqueness": "SIT-G01", "tmp_test_tst_customer_completeness": "SIT-G02", } _PROVIDER_CONFIG = {} # Populated from config_file if set _collected_results = [] # Accumulates results across events _TERMINAL_STATES = frozenset({"STOPPING", "FAILED", "CANCELED", "COMPLETED"}) @dp.on_event_hook(max_allowable_consecutive_failures=5) def test_reporting_hook(event): """Listens to pipeline events, collects DQ results, publishes at end.""" ... The hook only processes events for tables present in ``_TEST_ID_MAP`` — other test actions and pipeline tables are ignored. Validation -------------------------------------------- ``lhp validate`` performs test reporting checks when the ``test_reporting`` section exists in ``lhp.yaml``: **Always checked:** * ``module_path`` — the provider module file must exist at the specified path. * ``config_file`` — if specified, the file must exist. **With** ``--include-tests``: * At least one test action across the validated pipelines must have ``test_id`` set. A configuration with no ``test_id`` on any action triggers a validation error. .. code-block:: bash # Basic validation (checks file existence) lhp validate --env dev # Extended validation (also checks test_id presence) lhp validate --env dev --include-tests Built-in Providers -------------------------------------------- LHP ships three ready-to-use provider modules in the ``providers/`` directory of the repository. To use one, **copy it into your LHP project** (e.g., into ``py_functions/``) and reference it in ``lhp.yaml``. .. list-table:: :header-rows: 1 :widths: 30 70 * - Provider - Use Case * - ``delta_test_reporter.py`` - Write results to a Delta audit table. Recommended default — always have a queryable history of test outcomes. * - ``ado_test_reporter.py`` - Publish to ADO Test Plans using a **config mapping** file that translates friendly ``test_id`` labels (e.g., ``"SIT-G01"``) to ADO Test Case IDs. * - ``ado_test_reporter_inline.py`` - Publish to ADO Test Plans where ``test_id`` values in the YAML **are the ADO Test Case IDs directly** (e.g., ``test_id: "2272983"``). No mapping file needed. Delta Audit Table (Recommended Default) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For teams that want a persistent record of all DQ results without an external test management system. This is the **recommended starting point**. **1. Create the audit table** (run once in your catalog): .. code-block:: sql :force: CREATE TABLE IF NOT EXISTS ..lhp_test_results ( pipeline_name STRING NOT NULL, pipeline_id STRING NOT NULL, update_id STRING NOT NULL, test_id STRING NOT NULL, flow_name STRING NOT NULL, expectation_name STRING NOT NULL, passed_records BIGINT NOT NULL, failed_records BIGINT NOT NULL, status STRING NOT NULL, terminal_state STRING NOT NULL, collected_at STRING NOT NULL, published_at STRING NOT NULL ); **2. Copy the provider and configure** ``lhp.yaml``: .. code-block:: bash cp providers/delta_test_reporter.py py_functions/ .. code-block:: yaml # lhp.yaml test_reporting: module_path: py_functions/delta_test_reporter.py function_name: publish_results The provider uses ``${catalog}.${audit_schema}.lhp_test_results`` as the default table name. The ``${catalog}`` and ``${audit_schema}`` tokens are resolved by LHP's :doc:`/substitutions` system at generate time, so the same module works across environments. The provider also supports ``dry_run`` and ``log_level`` options via ``config_file``, and verifies the target table exists before writing. Azure DevOps — Config Mapping (``ado_test_reporter.py``) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For enterprise teams using ADO Test Plans where ``test_id`` values in flowgroup YAML are **friendly labels** (e.g., ``"SIT-G01"``) translated to ADO Test Case IDs via a config mapping file. **1. Copy the provider:** .. code-block:: bash cp providers/ado_test_reporter.py py_functions/ **2. Create a config file** (``config/ado_config.yaml``): .. code-block:: yaml ado: organization: my-org project: my-project pat_secret_scope: my-scope # Databricks secret scope pat_secret_key: ado-pat # Key within that scope api_version: "7.1" # Optional, defaults to 7.1 test_plan: plan_id: 12345 suite_id: 67890 # Translates friendly test_id → ADO Test Case ID test_case_mapping: SIT-G01: 2272983 SIT-G02: 2272984 SIT-G03: 2272985 **3. Configure** ``lhp.yaml``: .. code-block:: yaml test_reporting: module_path: py_functions/ado_test_reporter.py function_name: publish_results config_file: config/ado_config.yaml **4. Use friendly labels in flowgroup YAML:** .. code-block:: yaml actions: - name: tst_billing_pk_check type: test test_type: uniqueness source: v_billing_bronze columns: [transaction_id] on_violation: warn test_id: "SIT-G01" # Mapped to ADO Test Case 2272983 The provider creates one ADO Test Run per pipeline execution, batches all results, and completes the run (4 API calls total). PAT authentication uses Databricks secrets — no credentials in config files. Azure DevOps — Inline (``ado_test_reporter_inline.py``) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For teams that prefer to put the **ADO Test Case ID directly** as the ``test_id`` value — no mapping file needed. Simpler setup, but less readable YAML. **1. Copy the provider:** .. code-block:: bash cp providers/ado_test_reporter_inline.py py_functions/ **2. Create a config file** (``config/ado_config.yaml``) — same as above but **without** ``test_case_mapping``: .. code-block:: yaml ado: organization: my-org project: my-project pat_secret_scope: my-scope pat_secret_key: ado-pat test_plan: plan_id: 12345 suite_id: 67890 **3. Configure** ``lhp.yaml``: .. code-block:: yaml test_reporting: module_path: py_functions/ado_test_reporter_inline.py function_name: publish_results config_file: config/ado_config.yaml **4. Use ADO Test Case IDs directly in flowgroup YAML:** .. code-block:: yaml actions: - name: tst_billing_pk_check type: test test_type: uniqueness source: v_billing_bronze columns: [transaction_id] on_violation: warn test_id: "2272983" # This IS the ADO Test Case ID .. tip:: **Which ADO variant to choose?** * Use **config mapping** (``ado_test_reporter.py``) when you want readable, project-specific test IDs in your YAML and a centralized mapping file. * Use **inline** (``ado_test_reporter_inline.py``) when you want the simplest setup and don't mind numeric ADO IDs in your YAML. Writing a Custom Provider ~~~~~~~~~~~~~~~~~~~~~~~~~~ If the built-in providers don't fit your needs, write your own. Your module must define a function matching the :ref:`provider interface contract `: .. code-block:: python def publish_results(results, config, context, spark): """ Args: results: list[dict] — one dict per DQ expectation, with keys: test_id, flow_name, expectation_name, passed_records, failed_records, status, collected_at config: dict — parsed from config_file (empty dict if not set) context: dict — pipeline_id, update_id, pipeline_name, terminal_state spark: SparkSession Returns: dict with "published" (int) and "failed" (int) counts """ published, failed = 0, 0 for result in results: try: # Your logic here — API call, Delta write, webhook, etc. published += 1 except Exception: failed += 1 return {"published": published, "failed": failed} Key rules: * The function name must match ``function_name`` in ``lhp.yaml``. * **Return** ``{"published": N, "failed": M}`` — the hook logs these counts. * **Fatal errors** — raise an exception. The hook catches it and prints a diagnostic; the pipeline itself is not affected. * **Partial failures** — return the counts (e.g., ``{"published": 3, "failed": 2}``). * The module is **copied** into ``test_reporting_providers/`` in the generated output. :doc:`/substitutions` tokens (``${catalog}``, ``${env_token}``, etc.) in the source are resolved at generate time. The built-in providers in ``providers/`` are good reference implementations showing error handling, dry-run support, table existence checks, and logging patterns. Important Considerations -------------------------------------------- .. warning:: **on_violation: fail and expect_or_fail** — When a test action uses ``on_violation: fail``, the generated expectation uses ``@dp.expect_all_or_fail``. If the expectation fails, the pipeline aborts the flow *before* recording metrics in the event log. This means the hook **will not receive** DQ results for failed ``fail``-mode expectations. Use ``on_violation: warn`` for test actions whose results you want to report. .. note:: **Event hooks are Public Preview** — The ``@dp.on_event_hook()`` decorator was introduced in Databricks in January 2026 and is currently in Public Preview. Refer to Databricks documentation for the latest status. .. note:: **Best-effort delivery** — Databricks documentation notes approximately a 10% event miss rate for event hooks. The hook is best-effort, not guaranteed delivery. For critical audit requirements, consider supplementing with a post-run query against the pipeline event log. * **Substitution tokens in provider modules** are resolved at generate time, not at runtime. Tokens like ``${catalog}`` or ``${secret:scope/key}`` in your provider source code are replaced when ``lhp generate`` runs. * **One hook per pipeline** — LHP generates a single ``_test_reporting_hook.py`` per pipeline. If a pipeline has test actions across multiple flowgroups, all ``test_id`` mappings are merged into one hook. Related Documentation -------------------------------------------- * :doc:`test_actions` — test action types, configuration, and best practices * :doc:`/monitoring` — centralized pipeline event log monitoring * :doc:`/cli` — command-line reference (``--include-tests`` flag) * :doc:`/substitutions` — environment tokens and secret references in provider modules * :doc:`/errors_reference` — error codes and resolution steps