Pipeline Monitoring¶
This page covers Lakehouse Plumber’s pipeline monitoring capabilities — declarative event log aggregation and analysis across all your pipelines.
Overview¶
Pipeline monitoring in LakehousePlumber provides centralized observability for all your Databricks Lakeflow Declarative Pipelines without manual infrastructure setup. It combines two related capabilities:
Event Log Injection — Automatically adds
event_logblocks to every pipeline’s Databricks Asset Bundle resource file, directing each pipeline’s operational events to a Unity Catalog table.Monitoring Assets — Three coordinated artifacts that aggregate all event logs and produce analytical views:
A notebook (
monitoring/{env}/union_event_logs.py) that runs N independent Structured Streaming queries — one per pipeline event log — into a single Delta table. Each query has its own checkpoint directory.A Lakeflow Declarative Pipeline (
generated/{env}/{pipeline_name}/monitoring.py) containing only materialized views that read from the union Delta table.A Databricks Workflow job (
resources/lhp/{pipeline_name}.job.yml) that chains the notebook and the pipeline.
Together, these features give you a single pane of glass for pipeline health, performance,
and event analysis — configured entirely through lhp.yaml.
Note
Pipeline monitoring is entirely optional. Your existing pipelines work unchanged without it. You can enable event log injection on its own, or combine it with the monitoring assets for full centralized observability.
Prerequisites:
Databricks Asset Bundles integration enabled (
databricks.ymlexists in project root)Unity Catalog enabled workspace
A Unity Catalog volume (or other cloud storage path) available for streaming checkpoints
At least one pipeline generating code via
lhp generate
Architecture
flowchart LR
P1["Pipeline A"] --> EL1["Event Log A"]
P2["Pipeline B"] --> EL2["Event Log B"]
P3["Pipeline N"] --> ELN["Event Log N"]
subgraph notebook ["union_event_logs.py (notebook)"]
EL1 --> S1["Stream A<br/>checkpoint_path/A"]
EL2 --> S2["Stream B<br/>checkpoint_path/B"]
ELN --> SN["Stream N<br/>checkpoint_path/N"]
end
S1 --> UT["all_pipelines_event_log<br/>(Delta table)"]
S2 --> UT
SN --> UT
UT --> MV1["events_summary<br/>(Materialized View)"]
UT --> MV2["Custom MVs<br/>(Optional)"]
subgraph job ["Workflow Job"]
NT["notebook_task"] --> PT["pipeline_task<br/>(MVs)"]
end
subgraph opt ["enable_job_monitoring: true"]
PL["Python Load<br/>(Databricks SDK)"] --> JS["jobs_stats<br/>(Materialized View)"]
end
style P1 fill:#e1f5fe
style P2 fill:#e1f5fe
style P3 fill:#e1f5fe
style EL1 fill:#fff3e0
style EL2 fill:#fff3e0
style ELN fill:#fff3e0
style S1 fill:#e3f2fd
style S2 fill:#e3f2fd
style SN fill:#e3f2fd
style UT fill:#e8f5e8
style MV1 fill:#fce4ec
style MV2 fill:#fce4ec
style NT fill:#f3e5f5
style PT fill:#f3e5f5
style PL fill:#e0f2f1
style JS fill:#fce4ec
style opt fill:none,stroke:#999,stroke-dasharray: 5 5
style job fill:none,stroke:#999,stroke-dasharray: 5 5
style notebook fill:none,stroke:#999,stroke-dasharray: 5 5
Note
Each pipeline’s stream owns an independent checkpoint directory
({checkpoint_path}/{pipeline_name}/). Adding or removing a pipeline only creates or
leaves a checkpoint directory — it does not invalidate any existing stream’s
checkpoint. Streams run in a ThreadPoolExecutor and use
trigger(availableNow=True) so the notebook terminates once all data has been
processed.
Quick Start¶
Get centralized pipeline monitoring in three steps:
Step 1: Add event log and monitoring to lhp.yaml
name: my_project
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
Tip
checkpoint_path and job_config_path are the only required fields under
monitoring. Everything else has sensible defaults: the pipeline is named
${project_name}_event_log_monitoring, uses the same catalog/schema as
event_log, creates a Delta table called
all_pipelines_event_log, and exposes a default events_summary materialized view
that summarizes pipeline run status, duration, and row metrics.
Step 2: Generate code and resources
lhp generate -e dev
You will see output indicating the monitoring artifacts were generated:
✅ Generated: my_project_event_log_monitoring/monitoring.py
✅ Generated monitoring notebook: monitoring/dev/union_event_logs.py
✅ Generated monitoring job resource: resources/lhp/my_project_event_log_monitoring.job.yml
Step 3: Inspect the generated output
generated/
└── dev/
├── my_pipeline_a/
│ └── ...
├── my_pipeline_b/
│ └── ...
└── my_project_event_log_monitoring/ ← MVs-only DLT pipeline
└── monitoring.py
monitoring/
└── dev/
└── union_event_logs.py ← Streaming union notebook
resources/
└── lhp/
├── my_pipeline_a.pipeline.yml ← Now includes event_log block
├── my_pipeline_b.pipeline.yml ← Now includes event_log block
├── my_project_event_log_monitoring.pipeline.yml ← MVs pipeline resource
└── my_project_event_log_monitoring.job.yml ← Workflow job
Event Log Configuration¶
Event log configuration controls how Databricks pipeline event logs are stored. When
defined in lhp.yaml, event log blocks are automatically injected into all pipeline
resource files during lhp generate — no -pc flag or pipeline_config.yaml required.
Configuration Reference¶
Option |
Type |
Default |
Description |
|---|---|---|---|
|
boolean |
|
Enable/disable event log injection. Set to |
|
string |
(required) |
Unity Catalog name for the event log table. Supports LHP token substitution. |
|
string |
(required) |
Schema name for the event log table. Supports LHP token substitution. |
|
string |
|
Prefix prepended to the generated event log table name. |
|
string |
|
Suffix appended to the generated event log table name. |
Note
All event_log fields support LHP token substitution. Tokens like ${catalog}
are resolved from your substitutions/{env}.yaml files, just like all other
configuration fields.
Event Log Table Naming¶
The event log table name for each pipeline is generated using the formula:
{name_prefix}{pipeline_name}{name_suffix}
Examples:
Pipeline Name |
name_prefix |
name_suffix |
Generated Event Log Table Name |
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Pipeline-Level Overrides¶
Individual pipelines can override or opt out of project-level event logging through
pipeline_config.yaml.
Full replace: A pipeline-specific event_log in pipeline_config.yaml completely
replaces the project-level configuration for that pipeline:
---
pipeline: silver_analytics
event_log:
name: custom_event_log
catalog: analytics_catalog
schema: monitoring
Important
Override is a full replace, not a merge. When a pipeline defines its own event_log
dict in pipeline_config.yaml, the entire project-level event_log is ignored for that
pipeline.
Pipeline-level opt-out: Set event_log: false to disable event logging for a
specific pipeline, even when project-level event logging is enabled:
---
pipeline: temp_debug_pipeline
event_log: false
Note
Project-level event logging does not require the -pc flag. It is applied
automatically during lhp generate. The -pc flag is only needed if you want
to use pipeline_config.yaml for pipeline-specific overrides or other settings.
Generated Resource Output¶
Here is a concrete example showing how lhp.yaml event log configuration translates to
a generated pipeline resource file.
Input:
event_log:
catalog: acme_edw_dev
schema: _meta
name_suffix: "_event_log"
Generated output for a pipeline named event_log_basic:
# ...pipeline configuration...
channel: CURRENT
event_log:
name: event_log_basic_event_log
schema: _meta
catalog: acme_edw_dev
Monitoring Configuration¶
The monitoring assets are configured in lhp.yaml under the monitoring key. LHP
generates a notebook, an MVs-only Lakeflow Declarative Pipeline, and a Databricks
Workflow job that chains them.
Warning
Monitoring requires event_log to be enabled. If monitoring is configured but
event_log is missing or disabled, LHP raises error LHP-CFG-008.
Warning
checkpoint_path is required when monitoring.enabled is true. LHP raises
LHP-CFG-008 if it is missing. Prior releases accepted monitoring: {} — that
form is no longer valid.
Warning
job_config_path is required when monitoring.enabled is true. It must
point (relative to the project root) to a flat single-document YAML file describing
the monitoring Workflow job (cluster, tags, notifications, schedule, etc.). LHP
raises LHP-CFG-008 if the setting is missing and LHP-IO-001 if the configured
path does not resolve to an existing file. Token substitution (${...}) applies to
the file contents, resolved from the active environment’s
substitutions/<env>.yaml.
Note
The legacy auto-pickup of templates/bundle/job_config.yaml for the monitoring
job — and the reserved __eventlog_monitoring alias inside it — has been
removed. The __eventlog_monitoring alias still works in the generic
config/job_config.yaml used by lhp deps to customize orchestration jobs;
that surface is unchanged.
Configuration Reference¶
Option |
Type |
Default |
Description |
|---|---|---|---|
|
boolean |
|
Enable/disable monitoring. When |
|
string |
required |
Base path for streaming checkpoints. Each monitored pipeline gets a subdirectory:
|
|
string |
required |
Relative path (from the project root) to a dedicated YAML file describing the
generated monitoring Workflow job (cluster, tags, notifications, schedule,
permissions, …). Flat single-document mapping — no |
|
integer |
|
Maximum number of concurrent streaming queries inside the notebook
( |
|
string |
|
Custom name for the MVs-only DLT pipeline and the Workflow job. Also used as the
directory name under |
|
string |
Inherits from |
Unity Catalog for monitoring tables. Overrides the event_log default. |
|
string |
Inherits from |
Schema for monitoring tables. Overrides the event_log default. |
|
string |
|
Name of the Delta table that the notebook writes into. The table is created implicitly by Structured Streaming on first run. It is not a DLT streaming table. |
|
list |
One default |
List of materialized view definitions. Set to |
|
boolean |
|
When enabled, adds a Python load + |
Note
The streaming_table value no longer refers to a DLT streaming table. Under the
current implementation, the notebook uses writeStream.toTable(...) to write the
union into a regular Delta table. The catalog and schema must exist; the table itself
is created on first write. Users only need write and checkpoint permissions on the
target catalog/schema/volume for the identity that runs the notebook.
Minimal Configuration¶
The simplest valid monitoring configuration specifies the checkpoint path and the path to the dedicated monitoring-job config file:
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
This creates:
Notebook named
union_event_logs.pyundermonitoring/${env}/DLT pipeline named
${project_name}_event_log_monitoringDelta table
all_pipelines_event_login the same catalog/schema as event_logDefault
events_summarymaterialized view (pipeline run summary with status, duration, and row metrics)Workflow job
${project_name}_event_log_monitoring_jobchaining the notebook and the pipeline
Custom Pipeline Name¶
monitoring:
pipeline_name: "my_custom_monitor"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
The pipeline name affects:
The directory name under
generated/(e.g.,generated/dev/my_custom_monitor/)The DLT pipeline resource file (e.g.,
resources/lhp/my_custom_monitor.pipeline.yml)The Workflow job resource file (
resources/lhp/my_custom_monitor.job.yml)The job’s
namefield (my_custom_monitor_job)The pipeline identifier in Databricks
Custom Catalog and Schema¶
By default, the monitoring assets write to the same catalog and schema as configured
in event_log. You can override either or both:
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
catalog: "analytics_cat"
schema: "_analytics"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
Override priority:
Monitoring-level
catalog/schema(highest — if specified)Event log
catalog/schema(default fallback)
Generated Artifacts¶
Enabling monitoring produces up to three artifacts per environment. Each is described below.
Union Notebook¶
The notebook aggregates all eligible pipeline event logs into the Delta table named by
streaming_table. It runs N independent streaming queries — one per pipeline — each
with its own checkpoint directory.
monitoring/{env}/union_event_logs.py
TARGET_TABLE = "analytics_cat._analytics.all_pipelines_event_log"
CHECKPOINT_BASE = "/Volumes/acme_edw_dev/_meta/checkpoints/event_logs"
MAX_WORKERS = 10
SOURCES = [
("bronze_load", "acme_edw_dev._meta.bronze_load_event_log"),
("silver_transform","acme_edw_dev._meta.silver_transform_event_log"),
("gold_analytics", "acme_edw_dev._meta.gold_analytics_event_log"),
]
def _ensure_target_exists() -> None:
# Pre-create TARGET_TABLE (idempotent) so parallel streams do not race
# on CREATE during a cold run.
if spark.catalog.tableExists(TARGET_TABLE):
return
for sample_name, sample_ref in SOURCES:
try:
empty = (
spark.read.format("delta").table(sample_ref).limit(0)
.withColumn("_source_pipeline", F.lit(sample_name))
)
empty.write.format("delta").saveAsTable(TARGET_TABLE)
return
except AnalysisException:
continue
raise RuntimeError("no readable source event log")
if SOURCES:
_ensure_target_exists()
def process_source(pipeline_name: str, table_ref: str) -> str:
checkpoint = f"{CHECKPOINT_BASE}/{pipeline_name}"
query = (
spark.readStream
.format("delta")
.table(table_ref)
.withColumn("_source_pipeline", F.lit(pipeline_name))
.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(TARGET_TABLE)
)
query.awaitTermination()
return pipeline_name
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(process_source, n, r): n for n, r in SOURCES}
...
Key aspects:
Pre-created target table. Before launching the executor pool,
_ensure_target_exists()pre-creates the target Delta table using the schema of the first readable source event log (event-log schemas are uniform across Lakeflow Declarative Pipelines). Without this prologue, N parallel streams all race toCREATEthe target via.toTable()on a cold run; one wins, the rest fail withTABLE_OR_VIEW_ALREADY_EXISTS. The function is idempotent — warm runs short-circuit viaspark.catalog.tableExists— and iterates throughSOURCESuntil one event log is readable, so partially-deployed environments still bootstrap cleanly.Per-pipeline checkpoints. Each source has its own directory at
{CHECKPOINT_BASE}/{pipeline_name}/. Changing the set of sources never invalidates existing checkpoints.Append-only. Streams use
outputMode("append")withmergeSchema=trueso the target Delta table can absorb schema evolution across event log sources.Finite batches.
trigger(availableNow=True)processes all available data and then terminates — ideal for scheduled job execution rather than always-on streaming.Parallel execution. Sources run concurrently via
ThreadPoolExecutorbounded bymax_concurrent_streams.Error aggregation. Per-source failures are collected and reported together. The notebook exits with a
RuntimeErrorif any source failed.Alphabetical source order. Pipeline names are sorted for deterministic output.
MVs-Only Lakeflow Declarative Pipeline¶
The Lakeflow Declarative Pipeline generated by LHP for monitoring contains only materialized views that read from the Delta table populated by the notebook. There is no DLT streaming table, no source view, and no append flow — those were removed in favor of the notebook-based union.
generated/{env}/{pipeline_name}/monitoring.py
PIPELINE_ID = "my_project_event_log_monitoring"
FLOWGROUP_ID = "monitoring"
@dp.materialized_view(
name="acme_edw_dev._meta.events_summary",
comment="Materialized view: events_summary",
table_properties={},
)
def events_summary():
df = spark.sql("""WITH run_info AS (
SELECT origin.pipeline_name, origin.pipeline_id, origin.update_id,
MIN(`timestamp`) AS run_start_time,
MAX(`timestamp`) AS run_end_time,
...
FROM acme_edw_dev._meta.all_pipelines_event_log
GROUP BY origin.pipeline_name, origin.pipeline_id, origin.update_id
),
run_metrics AS (...),
run_config AS (...)
SELECT ri.pipeline_name, ri.pipeline_id, ri.update_id, ri.run_status, ...
FROM run_info ri LEFT JOIN run_metrics rm ON ... LEFT JOIN run_config rc ON ...
ORDER BY ri.run_start_time DESC""")
return df
Important
When materialized_views: [] and enable_job_monitoring is false, the DLT
pipeline has no actions — LHP omits the pipeline entirely (no monitoring.py, no
pipeline resource, no pipeline_task in the job). Only the notebook and the
notebook task of the job are generated.
Workflow Job¶
LHP generates a Databricks Workflow job resource that orchestrates the notebook and the
DLT pipeline. Task union_event_logs runs the notebook, then the DLT pipeline task is
triggered via depends_on.
resources/lhp/{pipeline_name}.job.yml
# Generated by LakehousePlumber - Monitoring Job for my_project_event_log_monitoring
resources:
jobs:
my_project_event_log_monitoring_job:
name: my_project_event_log_monitoring_job
max_concurrent_runs: 1
tasks:
- task_key: union_event_logs
notebook_task:
notebook_path: ${workspace.file_path}/monitoring/${bundle.target}/union_event_logs
source: WORKSPACE
- task_key: my_project_event_log_monitoring_pipeline
depends_on:
- task_key: union_event_logs
pipeline_task:
pipeline_id: ${resources.pipelines.my_project_event_log_monitoring_pipeline.id}
full_refresh: false
queue:
enabled: true
performance_target: STANDARD
Customizing the job (cluster, schedule, permissions, notifications, tags, etc.) is described in Customizing the Workflow Job.
Materialized Views¶
Default events_summary MV¶
When materialized_views is omitted from monitoring, LHP creates a default
events_summary materialized view — a pipeline run summary that extracts run status,
duration, row metrics, and configuration from the union Delta table:
@dp.materialized_view(
name="acme_edw_dev._meta.events_summary",
comment="Materialized view: events_summary",
table_properties={},
)
def events_summary():
df = spark.sql("""WITH run_info AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.update_id,
MIN(`timestamp`) AS run_start_time,
MAX(`timestamp`) AS run_end_time,
MAX_BY(
CASE WHEN event_type = 'update_progress'
THEN details:update_progress:state::STRING END,
CASE WHEN event_type = 'update_progress'
THEN `timestamp` END
) AS run_status
FROM acme_edw_dev._meta.all_pipelines_event_log
GROUP BY origin.pipeline_name, origin.pipeline_id, origin.update_id
),
...
SELECT
ri.pipeline_name, ri.pipeline_id, ri.update_id, ri.run_status,
rc.trigger_cause, rc.is_full_refresh, rc.dbr_version, rc.compute_type,
ri.run_start_time, ri.run_end_time,
ROUND((...) / 60, 2) AS duration_minutes,
COALESCE(rm.tables_processed, 0) AS tables_processed,
COALESCE(rm.total_upserted_rows, 0) AS total_upserted_rows,
...
FROM run_info ri
LEFT JOIN run_metrics rm ON ...
LEFT JOIN run_config rc ON ...
ORDER BY ri.run_start_time DESC""")
return df
The default SQL joins three CTEs from the event log:
run_info — pipeline name, update ID, start/end time, final run status
run_metrics — upserted rows, deleted rows, dropped records, tables processed
run_config — DBR version, compute type (Serverless/Classic), trigger cause, full refresh flag
``events_summary`` schema:
Column |
Type |
Description |
|---|---|---|
|
STRING |
Name of the Lakeflow pipeline |
|
STRING |
Unique pipeline identifier |
|
STRING |
Unique identifier for this pipeline run (update) |
|
STRING |
Final status of the run (e.g., |
|
STRING |
What triggered the run (e.g., |
|
BOOLEAN |
Whether this was a full refresh or incremental update |
|
STRING |
Databricks Runtime version used for the run |
|
STRING |
|
|
TIMESTAMP |
When the pipeline run started |
|
TIMESTAMP |
When the pipeline run ended |
|
DOUBLE |
Run duration in minutes (rounded to 2 decimal places) |
|
BIGINT |
Number of distinct tables (flows) processed in this run |
|
BIGINT |
Total rows upserted across all tables |
|
BIGINT |
Total rows deleted across all tables |
|
BIGINT |
Sum of upserted + deleted rows |
|
BIGINT |
Total records dropped by data quality expectations |
At generation time, LHP substitutes the {streaming_table} placeholder in the default
SQL with the fully-qualified Delta table name (e.g.,
acme_edw_dev._meta.all_pipelines_event_log).
Custom Materialized Views¶
You can fully customize the materialized views created by the monitoring pipeline, from inline SQL to external files, or disable them entirely.
Inline SQL¶
Define materialized views with inline SQL using the sql property:
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views:
- name: "error_events"
sql: "SELECT * FROM all_pipelines_event_log WHERE event_type = 'error'"
- name: "pipeline_latency"
sql: >-
SELECT _source_pipeline, avg(duration_ms) as avg_duration
FROM all_pipelines_event_log GROUP BY _source_pipeline
This generates two materialized view functions instead of the default events_summary:
@dp.materialized_view(
name="acme_edw_dev._meta.error_events",
comment="Materialized view: error_events",
table_properties={},
)
def error_events():
df = spark.sql(
"""SELECT * FROM all_pipelines_event_log WHERE event_type = 'error'"""
)
return df
@dp.materialized_view(
name="acme_edw_dev._meta.pipeline_latency",
comment="Materialized view: pipeline_latency",
table_properties={},
)
def pipeline_latency():
df = spark.sql(
"""SELECT _source_pipeline, avg(duration_ms) as avg_duration FROM all_pipelines_event_log GROUP BY _source_pipeline"""
)
return df
External SQL Files¶
For complex queries, use sql_path to reference an external SQL file:
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views:
- name: "custom_analysis"
sql_path: "sql/monitoring_custom_analysis.sql"
SELECT
_source_pipeline,
event_type,
date_trunc('DAY', timestamp) AS event_day,
count(*) AS daily_event_count
FROM all_pipelines_event_log
WHERE event_type IN ('FLOW_PROGRESS', 'DATASET_CREATED')
GROUP BY _source_pipeline, event_type, date_trunc('DAY', timestamp)
Note
sql_path is resolved relative to the project root directory (where lhp.yaml lives).
Disabling Materialized Views¶
To generate only the notebook and the notebook task of the job, set
materialized_views to an empty list:
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views: []
When omitted entirely (or set to null), the default events_summary MV is created.
This means there are three behaviors:
Setting |
Behavior |
|---|---|
Omitted / |
Default |
|
No MVs and no DLT pipeline — only notebook + notebook-only job |
Explicit list |
Only the specified MVs are created |
Note
When materialized_views: [] and enable_job_monitoring is false, LHP does
not emit a DLT pipeline file, a pipeline resource file, or a pipeline_task in
the Workflow job. The job contains only the union_event_logs notebook task.
Validation Rules¶
LHP validates materialized view definitions at configuration load time:
Name required — Each MV must have a
namefieldUnique names — MV names must not repeat within the
materialized_viewslistMutual exclusion — Each MV must specify either
sqlorsql_path, not both
Violations raise LHP-CFG-008 with a descriptive error message.
Customizing the Workflow Job¶
The generated Workflow job is configured from a dedicated YAML file referenced by
monitoring.job_config_path in lhp.yaml. This file is required when monitoring
is enabled.
max_concurrent_runs: 1
performance_target: PERFORMANCE_OPTIMIZED
queue:
enabled: true
timeout_seconds: 3600
notebook_cluster:
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "Standard_D4ds_v5"
num_workers: 2
schedule:
quartz_cron_expression: "0 0 * * * ?"
timezone_id: UTC
pause_status: UNPAUSED
tags:
purpose: event_log_monitoring
team: data-platform
environment: ${bundle_target}
email_notifications:
on_failure:
- monitoring-alerts@company.com
How it is resolved:
LHP reads the file at
project_root / monitoring.job_config_path.Token substitution (
${...}) is applied using the active environment’ssubstitutions/<env>.yaml.The result is deep-merged over LHP’s default job config (
max_concurrent_runs=1,performance_target=STANDARD,queue.enabled=true) — nested dicts liketagsandqueuemerge recursively; lists are replaced wholesale.The monitoring job name is always
${pipeline_name}_job. Do not add ajob_namekey to this file.
Available job fields (all optional):
notebook_cluster.new_clusterornotebook_cluster.existing_cluster_id— cluster for the notebook task (Serverless used when neither is set)queue.enabled— job-run queueingperformance_target,timeout_seconds,max_concurrent_runsschedule— Quartz cron scheduletags— free-form tagsemail_notifications—on_start/on_success/on_failurewebhook_notifications— same events as emailpermissions— user/group permission entries
Rules¶
The file must be a flat single-document YAML mapping. Do not wrap it in a
project_defaults:key and do not add ajob_name:key.The
pipeline_taskin the job is always generated automatically — do not redefine it in this file.If the file is missing at generation time, LHP raises
LHP-IO-001pointing to the resolved absolute path.
Note
The legacy auto-pickup of templates/bundle/job_config.yaml for the monitoring
job — and the reserved __eventlog_monitoring alias inside it — has been
removed. Use monitoring.job_config_path instead. The __eventlog_monitoring
alias continues to work in the generic config/job_config.yaml used by
lhp deps for orchestration jobs; that surface is unchanged.
Pipeline Configuration for Monitoring¶
The monitoring DLT pipeline (materialized views) can also be configured through
pipeline_config.yaml. Because the pipeline name is dynamic, LHP provides a reserved
alias to avoid hardcoding.
Using the __eventlog_monitoring Alias in pipeline_config.yaml¶
Use the __eventlog_monitoring reserved keyword in pipeline_config.yaml to target
the monitoring pipeline without knowing its exact name:
---
pipeline: __eventlog_monitoring
serverless: false
edition: ADVANCED
clusters:
- label: default
node_type_id: Standard_D4ds_v5
autoscale:
min_workers: 1
max_workers: 4
notifications:
- email_recipients:
- monitoring-alerts@company.com
alerts:
- on-update-failure
- on-update-fatal-failure
tags:
purpose: event_log_monitoring
At generation time, __eventlog_monitoring automatically resolves to the actual monitoring
pipeline name defined in lhp.yaml. The project_defaults section still applies and
merges as usual.
Behavior and Rules¶
If monitoring is not configured or disabled in
lhp.yaml, the alias entry is silently ignored with a warningIf both the alias and the actual monitoring pipeline name appear in the config, an error is raised (
LHP-VAL-010)The alias must be used as a standalone pipeline entry, not in a pipeline list (
LHP-VAL-011)
---
pipeline:
- bronze_pipeline
- __eventlog_monitoring
---
pipeline: bronze_pipeline
serverless: false
---
pipeline: __eventlog_monitoring
serverless: false
Job Monitoring¶
When enable_job_monitoring: true is set, LHP adds an additional Python load chain to
the monitoring DLT pipeline that correlates Databricks Jobs with their associated
pipeline runs using the Databricks SDK. The results are written to a separate
jobs_stats materialized view alongside the user-specified MVs.
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
enable_job_monitoring: true
What it generates:
In addition to the notebook and the default/user MVs, the DLT pipeline adds:
Python Load → ``v_jobs_stats`` — calls a
get_jobs_statsfunction from a generatedjobs_stats_loader.pymodule to fetch job run statistics via the Databricks SDK.Write → ``jobs_stats`` — a materialized view in the same catalog/schema as the monitoring pipeline, populated from the
v_jobs_statsview. A materialized view is used (rather than a streaming table) because the Python SDK source returns batch data, not a streaming DataFrame.
Generated files:
generated/
└── dev/
└── my_project_event_log_monitoring/
├── monitoring.py ← includes Python load + jobs_stats write
└── jobs_stats_loader.py ← shipped alongside the pipeline
The jobs_stats_loader.py module uses the Databricks SDK to scan recent job runs
(default lookback: 7 days), find pipeline tasks, and correlate each pipeline update to its
triggering job. It also enriches rows with pipeline tags (from spec.tags) and job tags
(from settings.tags). The lookback window is configurable via the lookback_hours
pipeline parameter.
Note
The jobs_stats materialized view inherits its catalog and schema from the monitoring
pipeline configuration (which itself defaults to the event_log catalog/schema).
``jobs_stats`` schema:
Column |
Type |
Description |
|---|---|---|
|
STRING |
Unique pipeline identifier |
|
STRING |
Name of the Lakeflow pipeline |
|
STRING |
Pipeline update (run) identifier correlated to the job run |
|
STRING |
Databricks Job ID that triggered this pipeline run |
|
STRING |
Specific job run identifier |
|
STRING |
Name of the triggering job |
|
TIMESTAMP |
When the job run started |
|
TIMESTAMP |
When the job run ended |
|
STRING |
Final job run status (e.g., |
|
STRING |
JSON map of pipeline |
|
STRING |
JSON map of job |
Automatic Cleanup¶
LHP automatically reconciles monitoring artifacts on every lhp generate:
Notebook directory —
monitoring/{env}/is cleared before the new notebook is written. Emptymonitoring/is removed.Job resources — any
resources/*.job.ymlwhose header matches the monitoring job comment is removed before a new job is written (so renamingpipeline_namecleans up the old file).DLT pipeline directory — when monitoring is disabled or removed, LHP scans
generated/{env}/*formonitoring.pyfiles withFLOWGROUP_ID = "monitoring"and removes the directory.
This means toggling monitoring on/off, renaming the pipeline, or switching
materialized_views between populated and empty forms never leaves stale files behind.
Common Patterns¶
Minimal Setup¶
The simplest possible monitoring configuration:
name: my_project
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
This gives you:
Event log injection on all pipelines
A notebook at
monitoring/${env}/union_event_logs.pyA DLT pipeline named
my_project_event_log_monitoringwith a defaultevents_summarymaterialized viewA Workflow job
my_project_event_log_monitoring_jobchaining the two
Full Customization¶
A fully customized monitoring setup:
name: acme_edw
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_prefix: ""
name_suffix: "_event_log"
monitoring:
pipeline_name: "central_observability"
catalog: "analytics_catalog"
schema: "_monitoring"
streaming_table: "unified_event_stream"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
max_concurrent_streams: 20
materialized_views:
- name: "error_events"
sql: "SELECT * FROM unified_event_stream WHERE event_type = 'error'"
- name: "hourly_summary"
sql: >-
SELECT _source_pipeline, date_trunc('HOUR', timestamp) AS hour,
count(*) AS cnt FROM unified_event_stream
GROUP BY _source_pipeline, date_trunc('HOUR', timestamp)
- name: "daily_analysis"
sql_path: "sql/monitoring_custom_analysis.sql"
Selective Pipeline Monitoring¶
To exclude specific pipelines from event log monitoring, use pipeline_config.yaml
to opt individual pipelines out:
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
---
pipeline: temp_debug_pipeline
event_log: false
---
pipeline: experimental_pipeline
event_log: false
Pipelines that opt out with event_log: false are excluded from the notebook’s
SOURCES list and therefore do not contribute rows to the union Delta table.
Environment-Specific Configuration¶
Use LHP substitution tokens for environment-aware monitoring:
event_log:
catalog: "${catalog}"
schema: "${monitoring_schema}"
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/${monitoring_schema}/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
dev:
catalog: acme_edw_dev
monitoring_schema: _meta
prod:
catalog: acme_edw_prod
monitoring_schema: _monitoring
This produces environment-specific event log table and checkpoint references at generation time:
Dev:
acme_edw_dev._meta.bronze_load_event_log→ checkpoint/Volumes/acme_edw_dev/_meta/checkpoints/event_logs/bronze_load/Prod:
acme_edw_prod._monitoring.bronze_load_event_log→ checkpoint/Volumes/acme_edw_prod/_monitoring/checkpoints/event_logs/bronze_load/
Migrating From the Pre-V0.8.2 Monitoring Pipeline¶
Before V0.8.2, LHP generated a single Lakeflow Declarative Pipeline containing a
UNION ALL SQL source view, a DLT streaming table, and the materialized views. That
architecture had a structural limitation: adding or removing a monitored pipeline
changed the UNION schema and could invalidate the checkpoint, forcing a full reload.
V0.8.2 replaces that with the notebook + MVs-only pipeline + job design described above. Migration steps:
Add ``checkpoint_path`` to your
monitoringblock (now required — typically a Unity Catalog volume path).Add ``job_config_path`` to your
monitoringblock (now required). Create the file it points to —lhp initscaffoldsconfig/monitoring_job_config_env.yaml.tmplas a starting point. If you previously used the__eventlog_monitoringalias insidetemplates/bundle/job_config.yamlfor the monitoring job, move those settings into the new dedicated file and drop thejob_name: __eventlog_monitoringwrapper plus theproject_defaults:wrapper — the monitoring config is a flat single-document mapping.Re-run ``lhp generate``. LHP will clean up the old single-pipeline artifacts and write the new notebook, MVs-only pipeline, and job resource.
Redeploy via Databricks Asset Bundles. The old pipeline is removed by the bundle deploy; the new job and (MVs-only) pipeline are deployed in its place.
Backfill the union Delta table if needed. Historical event log rows written before the new notebook’s first run are not replayed into the union table unless you manually run a one-off backfill from each event log table.
Troubleshooting¶
Monitoring-related errors use codes LHP-CFG-006 through LHP-CFG-008 (event log
and monitoring configuration) and LHP-VAL-010/LHP-VAL-011 (pipeline config alias
issues).
Common issues:
``LHP-CFG-008`` — “Monitoring checkpoint_path is required” — add
checkpoint_pathundermonitoringor setmonitoring.enabled: false.``LHP-CFG-008`` — “Monitoring job_config_path is required” — add
job_config_pathundermonitoringpointing to your dedicated monitoring job config file, or setmonitoring.enabled: false.``LHP-CFG-008`` — “Monitoring job_config file not found” — the file referenced by
job_config_pathdoes not exist. Check the path (resolved relative to project root) and create the file if needed.lhp initprovidesconfig/monitoring_job_config_env.yaml.tmplas a starter.``LHP-IO-001`` — “Monitoring job_config file not found” — raised at generate time when the configured
job_config_pathcannot be opened. The error message includes the resolved absolute path.No rows in ``all_pipelines_event_log`` — check that the Workflow job has run successfully. The notebook writes via Structured Streaming; the Delta table is created on first successful write.
``mergeSchema`` errors — the notebook enables
mergeSchemaautomatically on write. If you receive schema mismatch errors, ensure your checkpoint directories are fresh (each per-pipeline directory undercheckpoint_pathcan be deleted independently without affecting the others).Missing pipelines in the union — pipelines that set
event_log: falseinpipeline_config.yamlare excluded by design.
See also
Error Reference for detailed before/after examples and resolution steps for each error code.