Test Actions (Data Quality Unit Tests)¶
Test actions let you validate data pipelines using Databricks Lakeflow Declarative Pipelines expectations. They generate lightweight DLT temporary tables that read from existing tables/views and attach expectations that either fail the pipeline or warn on violations.
Note
CLI Flag Required: By default, both lhp generate and lhp validate skip test actions
during pipeline processing. Use --include-tests to include test action
validation and code generation:
# Skip tests (default) - faster builds
lhp generate -e dev
lhp validate -e dev
# Include tests - for development and testing
lhp generate -e dev --include-tests
lhp validate -e dev --include-tests
See also
Publishing Test Results — To publish DQ test results to external systems like Azure DevOps or a Delta audit table, see Test Result Reporting (Publishing).
Test Types Overview¶
Test actions come in the following types:
Test Type |
Purpose |
|---|---|
row_count
|
Compare row counts between two sources with tolerance.
|
uniqueness
|
Validate unique constraints (with optional filter).
|
referential_integrity
|
Check foreign-key relationships across tables.
|
completeness
|
Ensure specific columns are not null.
|
range
|
Validate a column is within min/max bounds.
|
schema_match
|
Compare schemas between two tables via information_schema.
|
all_lookups_found
|
Validate dimension lookups succeed.
|
custom_sql
|
Provide your own SQL plus expectations.
|
custom_expectations
|
Provide expectations only on a source/table.
|
Note
Default target naming:
tmp_test_<action_name>(temporary tables)Default execution: batch (sufficient for aggregate checks); use streaming only when testing streaming sources explicitly
Expectation decorators use aggregated style:
@dp.expect_all_or_fail,@dp.expect_all(warn),@dp.expect_all_or_dropon_violationsupportsfailandwarn. Usingdropis possible but generally discouraged for tests
Row Count¶
Compare record counts between two sources, with optional tolerance.
actions:
- name: test_raw_to_bronze_count
type: test
test_type: row_count
source: [raw.orders, bronze.orders]
tolerance: 0
on_violation: fail
description: "Ensure no data loss from raw to bronze"
Generated PySpark (excerpt):
1@dp.expect_all_or_fail({"row_count_match": "abs(source_count - target_count) <= 0"})
2@dp.materialized_view(
3 name="tmp_test_test_raw_to_bronze_count",
4 comment="Ensure no data loss from raw to bronze",
5 temporary=True
6)
7def tmp_test_test_raw_to_bronze_count():
8 return spark.sql("""
9 SELECT * FROM
10 (SELECT COUNT(*) AS source_count FROM raw.orders),
11 (SELECT COUNT(*) AS target_count FROM bronze.orders)
12 """)
Uniqueness (with optional filter)¶
Validate unique constraints on one or more columns. For Type 2 SCD dimensions, use filter to restrict to active rows.
# Global uniqueness
- name: test_order_id_unique
type: test
test_type: uniqueness
source: silver.orders
columns: [order_id]
on_violation: fail
# Type 2 SCD: only one active record per natural key
- name: test_customer_active_unique
type: test
test_type: uniqueness
source: silver.customer_dim
columns: [customer_id]
filter: "__END_AT IS NULL" # Only check active rows
on_violation: fail
Generated SQL (with filter):
SELECT customer_id, COUNT(*) as duplicate_count
FROM silver.customer_dim
WHERE __END_AT IS NULL
GROUP BY customer_id
HAVING COUNT(*) > 1
Referential Integrity¶
Ensure that foreign keys in a fact/reference align.
- name: test_orders_customer_fk
type: test
test_type: referential_integrity
source: silver.fact_orders
reference: silver.dim_customer
source_columns: [customer_id]
reference_columns: [customer_id]
on_violation: fail
Generated SQL (excerpt):
SELECT s.*, r.customer_id as ref_customer_id
FROM silver.fact_orders s
LEFT JOIN silver.dim_customer r ON s.customer_id = r.customer_id
@dp.expect_all_or_fail({"valid_fk": "ref_customer_id IS NOT NULL"})
@dp.materialized_view(name="tmp_test_orders_customer_fk", comment="Test description", temporary=True)
Completeness¶
Ensure required columns are populated. The generator selects only required columns for efficiency.
- name: test_customer_required_fields
type: test
test_type: completeness
source: silver.dim_customer
required_columns: [customer_key, customer_id, name, nation_id]
on_violation: fail
Generated SQL (optimized):
SELECT customer_key, customer_id, name, nation_id
FROM silver.dim_customer
@dp.expect_all_or_fail({
"required_fields_complete": "customer_key IS NOT NULL AND customer_id IS NOT NULL AND name IS NOT NULL AND nation_id IS NOT NULL"
})
@dp.materialized_view(name="tmp_test_customer_required_fields", comment="Test description", temporary=True)
Range¶
Validate that a column falls within bounds. The generator selects only the tested column.
- name: test_order_date_range
type: test
test_type: range
source: silver.orders
column: order_date
min_value: '2020-01-01'
max_value: 'current_date()'
on_violation: fail
Generated expectation: order_date >= '2020-01-01' AND order_date <= 'current_date()'
All Lookups Found¶
Validate that dimension lookups succeed (e.g., surrogate keys are present after joins).
- name: test_order_date_lookup
type: test
test_type: all_lookups_found
source: silver.fact_orders
lookup_table: silver.dim_date
lookup_columns: [order_date]
lookup_result_columns: [date_key]
on_violation: fail
Generated (excerpt):
SELECT s.*, l.date_key as lookup_date_key
FROM silver.fact_orders s
LEFT JOIN silver.dim_date l ON s.order_date = l.order_date
@dp.expect_all_or_fail({"all_lookups_found": "lookup_date_key IS NOT NULL"})
@dp.materialized_view(name="tmp_test_order_date_lookup", comment="Test description", temporary=True)
Schema Match¶
Compare schemas between two tables using information_schema.columns.
- name: test_orders_schema_match
type: test
test_type: schema_match
source: silver.fact_orders
reference: gold.fact_orders_expected
on_violation: fail
Generated (excerpt):
WITH source_schema AS (
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns WHERE table_name = 'silver.fact_orders'
), reference_schema AS (
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns WHERE table_name = 'gold.fact_orders_expected'
)
SELECT ... -- schema diff rows
@dp.expect_all_or_fail({"schemas_match": "diff_count = 0"})
@dp.materialized_view(name="tmp_test_orders_schema_match", comment="Test description", temporary=True)
Custom SQL¶
Provide a custom SQL statement and attach expectations.
- name: test_revenue_reconciliation
type: test
test_type: custom_sql
source: gold.monthly_revenue
sql: |
SELECT month, gold_revenue, silver_revenue,
(ABS(gold_revenue - silver_revenue) / silver_revenue) * 100 as pct_difference
FROM ...
expectations:
- name: revenue_matches
expression: "pct_difference < 0.5"
on_violation: fail
Custom Expectations¶
Attach arbitrary expectations to an existing table/view without custom SQL.
- name: test_orders_business_rules
type: test
test_type: custom_expectations
source: silver.fact_orders
expectations:
- name: positive_amount
expression: "total_price > 0"
on_violation: fail
- name: reasonable_discount
expression: "discount_percent <= 50"
on_violation: warn
Test Actions Configuration Reference¶
Common fields across test actions:
name: Unique name of the action within the FlowGroup
type: Must be
testtest_type: One of the supported test types listed above
source: Source table/view; for
row_countuse a list of two sourcestarget: Optional table name; defaults to
tmp_test_<name>description: Optional documentation
on_violation:
failorwarnImportant
Choosing fail vs warn:
Without test reporting — use
fail. This is the only way to get immediate visibility: the pipeline stops on a failed expectation, surfacing the issue.With test reporting — use
warn. Whenon_violation: failtriggers, the pipeline aborts the flow before recording DQ metrics to the event log, so the reporting hook never receives results for failed expectations. Usingwarnensures all outcomes (pass and fail) flow through to your provider.
See Test Result Reporting (Publishing) for the full test result reporting setup.
Type-specific fields:
row_count:
source(list of two),tolerance(int)uniqueness:
columns(list), optionalfilter(SQL WHERE clause)referential_integrity:
reference,source_columns(list),reference_columns(list)completeness:
required_columns(list)range:
column,min_value(optional),max_value(optional)schema_match:
referenceall_lookups_found:
lookup_table,lookup_columns(list),lookup_result_columns(list)custom_sql:
sql(string), optionalexpectations(list)custom_expectations:
expectations(list)
Test Actions Best Practices¶
Without test reporting: use
on_violation: failfor hard-stop visibility on failuresWith test reporting: use
on_violation: warnso all results reach the reporting provider (failaborts the flow before metrics are recorded)Scope uniqueness to active/current records in SCD Type 2 dimensions using
filterKeep SQL minimal – expectations should express the rule; queries should project only required columns
Group expectations by severity to get consolidated reporting in DLT UI
Use reference templates in
Reference_Templates/as starting points