====================================
Pipeline Monitoring
====================================
.. meta::
:description: Centralized event log monitoring and analysis across all Databricks Lakeflow Declarative Pipelines managed by Lakehouse Plumber.
This page covers Lakehouse Plumber's pipeline monitoring capabilities — declarative
event log aggregation and analysis across all your pipelines.
Overview
--------
Pipeline monitoring in LakehousePlumber provides centralized observability for all your
Databricks Lakeflow Declarative Pipelines without manual infrastructure setup. It combines
two related capabilities:
1. **Event Log Injection** — Automatically adds ``event_log`` blocks to every pipeline's
Databricks Asset Bundle resource file, directing each pipeline's operational events to
a Unity Catalog table.
2. **Monitoring Assets** — Three coordinated artifacts that aggregate all event logs
and produce analytical views:
* A **notebook** (``monitoring/{env}/union_event_logs.py``) that runs N independent
Structured Streaming queries — one per pipeline event log — into a single Delta
table. Each query has its own checkpoint directory.
* A **Lakeflow Declarative Pipeline** (``generated/{env}/{pipeline_name}/monitoring.py``)
containing only materialized views that read from the union Delta table.
* A **Databricks Workflow job** (``resources/lhp/{pipeline_name}.job.yml``) that chains
the notebook and the pipeline.
Together, these features give you a single pane of glass for pipeline health, performance,
and event analysis — configured entirely through ``lhp.yaml``.
.. note::
Pipeline monitoring is entirely optional. Your existing pipelines work unchanged without
it. You can enable event log injection on its own, or combine it with the monitoring
assets for full centralized observability.
**Prerequisites:**
* Databricks Asset Bundles integration enabled (``databricks.yml`` exists in project root)
* Unity Catalog enabled workspace
* A Unity Catalog volume (or other cloud storage path) available for streaming checkpoints
* At least one pipeline generating code via ``lhp generate``
.. _monitoring-architecture:
**Architecture**
.. mermaid::
flowchart LR
P1["Pipeline A"] --> EL1["Event Log A"]
P2["Pipeline B"] --> EL2["Event Log B"]
P3["Pipeline N"] --> ELN["Event Log N"]
subgraph notebook ["union_event_logs.py (notebook)"]
EL1 --> S1["Stream A
checkpoint_path/A"]
EL2 --> S2["Stream B
checkpoint_path/B"]
ELN --> SN["Stream N
checkpoint_path/N"]
end
S1 --> UT["all_pipelines_event_log
(Delta table)"]
S2 --> UT
SN --> UT
UT --> MV1["events_summary
(Materialized View)"]
UT --> MV2["Custom MVs
(Optional)"]
subgraph job ["Workflow Job"]
NT["notebook_task"] --> PT["pipeline_task
(MVs)"]
end
subgraph opt ["enable_job_monitoring: true"]
PL["Python Load
(Databricks SDK)"] --> JS["jobs_stats
(Materialized View)"]
end
style P1 fill:#e1f5fe
style P2 fill:#e1f5fe
style P3 fill:#e1f5fe
style EL1 fill:#fff3e0
style EL2 fill:#fff3e0
style ELN fill:#fff3e0
style S1 fill:#e3f2fd
style S2 fill:#e3f2fd
style SN fill:#e3f2fd
style UT fill:#e8f5e8
style MV1 fill:#fce4ec
style MV2 fill:#fce4ec
style NT fill:#f3e5f5
style PT fill:#f3e5f5
style PL fill:#e0f2f1
style JS fill:#fce4ec
style opt fill:none,stroke:#999,stroke-dasharray: 5 5
style job fill:none,stroke:#999,stroke-dasharray: 5 5
style notebook fill:none,stroke:#999,stroke-dasharray: 5 5
.. note::
Each pipeline's stream owns an independent checkpoint directory
(``{checkpoint_path}/{pipeline_name}/``). Adding or removing a pipeline only creates or
leaves a checkpoint directory — it does **not** invalidate any existing stream's
checkpoint. Streams run in a ``ThreadPoolExecutor`` and use
``trigger(availableNow=True)`` so the notebook terminates once all data has been
processed.
Quick Start
-----------
Get centralized pipeline monitoring in three steps:
**Step 1: Add event log and monitoring to lhp.yaml**
.. code-block:: yaml
:caption: lhp.yaml
:emphasize-lines: 4-7,9-10
name: my_project
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
.. tip::
``checkpoint_path`` and ``job_config_path`` are the only required fields under
``monitoring``. Everything else has sensible defaults: the pipeline is named
``${project_name}_event_log_monitoring``, uses the same catalog/schema as
``event_log``, creates a Delta table called
``all_pipelines_event_log``, and exposes a default ``events_summary`` materialized view
that summarizes pipeline run status, duration, and row metrics.
**Step 2: Generate code and resources**
.. code-block:: bash
lhp generate -e dev
You will see output indicating the monitoring artifacts were generated:
.. code-block:: text
✅ Generated: my_project_event_log_monitoring/monitoring.py
✅ Generated monitoring notebook: monitoring/dev/union_event_logs.py
✅ Generated monitoring job resource: resources/lhp/my_project_event_log_monitoring.job.yml
**Step 3: Inspect the generated output**
.. code-block:: text
generated/
└── dev/
├── my_pipeline_a/
│ └── ...
├── my_pipeline_b/
│ └── ...
└── my_project_event_log_monitoring/ ← MVs-only DLT pipeline
└── monitoring.py
monitoring/
└── dev/
└── union_event_logs.py ← Streaming union notebook
resources/
└── lhp/
├── my_pipeline_a.pipeline.yml ← Now includes event_log block
├── my_pipeline_b.pipeline.yml ← Now includes event_log block
├── my_project_event_log_monitoring.pipeline.yml ← MVs pipeline resource
└── my_project_event_log_monitoring.job.yml ← Workflow job
Event Log Configuration
-----------------------
Event log configuration controls how Databricks pipeline event logs are stored. When
defined in ``lhp.yaml``, event log blocks are automatically injected into all pipeline
resource files during ``lhp generate`` — no ``-pc`` flag or ``pipeline_config.yaml`` required.
Configuration Reference
~~~~~~~~~~~~~~~~~~~~~~~
.. list-table::
:header-rows: 1
:widths: 20 10 15 55
* - Option
- Type
- Default
- Description
* - ``enabled``
- boolean
- ``true``
- Enable/disable event log injection. Set to ``false`` to define the section without activating it.
* - ``catalog``
- string
- (required)
- Unity Catalog name for the event log table. Supports LHP token substitution.
* - ``schema``
- string
- (required)
- Schema name for the event log table. Supports LHP token substitution.
* - ``name_prefix``
- string
- ``""``
- Prefix prepended to the generated event log table name.
* - ``name_suffix``
- string
- ``""``
- Suffix appended to the generated event log table name.
.. note::
All ``event_log`` fields support LHP token substitution. Tokens like ``${catalog}``
are resolved from your ``substitutions/{env}.yaml`` files, just like all other
configuration fields.
Event Log Table Naming
~~~~~~~~~~~~~~~~~~~~~~
The event log table name for each pipeline is generated using the formula:
``{name_prefix}{pipeline_name}{name_suffix}``
**Examples:**
.. list-table::
:header-rows: 1
:widths: 25 15 15 45
* - Pipeline Name
- name_prefix
- name_suffix
- Generated Event Log Table Name
* - ``bronze_load``
- ``""``
- ``_event_log``
- ``bronze_load_event_log``
* - ``silver_transform``
- ``el_``
- ``""``
- ``el_silver_transform``
* - ``gold_analytics``
- ``""``
- ``_events``
- ``gold_analytics_events``
Pipeline-Level Overrides
~~~~~~~~~~~~~~~~~~~~~~~~
Individual pipelines can override or opt out of project-level event logging through
``pipeline_config.yaml``.
**Full replace:** A pipeline-specific ``event_log`` in ``pipeline_config.yaml`` **completely
replaces** the project-level configuration for that pipeline:
.. code-block:: yaml
:caption: config/pipeline_config.yaml
---
pipeline: silver_analytics
event_log:
name: custom_event_log
catalog: analytics_catalog
schema: monitoring
.. important::
Override is a **full replace**, not a merge. When a pipeline defines its own ``event_log``
dict in ``pipeline_config.yaml``, the entire project-level event_log is ignored for that
pipeline.
**Pipeline-level opt-out:** Set ``event_log: false`` to disable event logging for a
specific pipeline, even when project-level event logging is enabled:
.. code-block:: yaml
:caption: config/pipeline_config.yaml
---
pipeline: temp_debug_pipeline
event_log: false
.. note::
Project-level event logging does **not** require the ``-pc`` flag. It is applied
automatically during ``lhp generate``. The ``-pc`` flag is only needed if you want
to use ``pipeline_config.yaml`` for pipeline-specific overrides or other settings.
Generated Resource Output
~~~~~~~~~~~~~~~~~~~~~~~~~
Here is a concrete example showing how ``lhp.yaml`` event log configuration translates to
a generated pipeline resource file.
**Input:**
.. code-block:: yaml
:caption: lhp.yaml (excerpt)
event_log:
catalog: acme_edw_dev
schema: _meta
name_suffix: "_event_log"
**Generated output** for a pipeline named ``event_log_basic``:
.. code-block:: yaml
:caption: resources/lhp/event_log_basic.pipeline.yml (excerpt)
:emphasize-lines: 4-6
# ...pipeline configuration...
channel: CURRENT
event_log:
name: event_log_basic_event_log
schema: _meta
catalog: acme_edw_dev
Monitoring Configuration
-------------------------
The monitoring assets are configured in ``lhp.yaml`` under the ``monitoring`` key. LHP
generates a notebook, an MVs-only Lakeflow Declarative Pipeline, and a Databricks
Workflow job that chains them.
.. warning::
Monitoring requires ``event_log`` to be enabled. If ``monitoring`` is configured but
``event_log`` is missing or disabled, LHP raises error ``LHP-CFG-008``.
.. warning::
``checkpoint_path`` is **required** when ``monitoring.enabled`` is ``true``. LHP raises
``LHP-CFG-008`` if it is missing. Prior releases accepted ``monitoring: {}`` — that
form is no longer valid.
.. warning::
``job_config_path`` is **required** when ``monitoring.enabled`` is ``true``. It must
point (relative to the project root) to a flat single-document YAML file describing
the monitoring Workflow job (cluster, tags, notifications, schedule, etc.). LHP
raises ``LHP-CFG-008`` if the setting is missing and ``LHP-IO-001`` if the configured
path does not resolve to an existing file. Token substitution (``${...}``) applies to
the file contents, resolved from the active environment's
``substitutions/.yaml``.
.. note::
The legacy auto-pickup of ``templates/bundle/job_config.yaml`` for the monitoring
job — and the reserved ``__eventlog_monitoring`` alias inside it — has been
**removed**. The ``__eventlog_monitoring`` alias still works in the generic
``config/job_config.yaml`` used by ``lhp deps`` to customize *orchestration* jobs;
that surface is unchanged.
Configuration Reference
~~~~~~~~~~~~~~~~~~~~~~~
.. list-table::
:header-rows: 1
:widths: 25 10 25 40
* - Option
- Type
- Default
- Description
* - ``enabled``
- boolean
- ``true``
- Enable/disable monitoring. When ``false``, no notebook, pipeline, or job is generated
and any previously generated monitoring artifacts are cleaned up on the next
``lhp generate``.
* - ``checkpoint_path``
- string
- **required**
- Base path for streaming checkpoints. Each monitored pipeline gets a subdirectory:
``{checkpoint_path}/{pipeline_name}/``. Typically a Unity Catalog volume path
(e.g., ``/Volumes/my_catalog/_meta/checkpoints/event_logs``). Supports LHP token
substitution.
* - ``job_config_path``
- string
- **required**
- Relative path (from the project root) to a dedicated YAML file describing the
generated monitoring Workflow job (cluster, tags, notifications, schedule,
permissions, …). Flat single-document mapping — no ``project_defaults:`` wrapper
and no ``job_name:`` key. Contents are deep-merged over LHP's default job config
(``max_concurrent_runs=1``, ``performance_target=STANDARD``, ``queue.enabled=true``)
and then token-substituted via the active env's ``substitutions/.yaml``.
``lhp init`` scaffolds ``config/monitoring_job_config_env.yaml.tmpl`` — rename it
to ``config/monitoring_job_config.yaml`` (or your preferred path) and point
``job_config_path`` at it.
* - ``max_concurrent_streams``
- integer
- ``10``
- Maximum number of concurrent streaming queries inside the notebook
(``ThreadPoolExecutor`` ``max_workers``). Must be at least 1.
* - ``pipeline_name``
- string
- ``${project_name}_event_log_monitoring``
- Custom name for the MVs-only DLT pipeline and the Workflow job. Also used as the
directory name under ``generated/{env}/``.
* - ``catalog``
- string
- Inherits from ``event_log.catalog``
- Unity Catalog for monitoring tables. Overrides the event_log default.
* - ``schema``
- string
- Inherits from ``event_log.schema``
- Schema for monitoring tables. Overrides the event_log default.
* - ``streaming_table``
- string
- ``all_pipelines_event_log``
- Name of the **Delta table** that the notebook writes into. The table is created
implicitly by Structured Streaming on first run. It is *not* a DLT streaming table.
* - ``materialized_views``
- list
- One default ``events_summary`` MV
- List of materialized view definitions. Set to ``[]`` to suppress both the default MV
and the entire DLT pipeline (notebook and job are still generated).
* - ``enable_job_monitoring``
- boolean
- ``false``
- When enabled, adds a Python load + ``jobs_stats`` materialized view to the DLT
pipeline. See :ref:`monitoring-jobs`.
.. note::
The ``streaming_table`` value no longer refers to a DLT streaming table. Under the
current implementation, the notebook uses ``writeStream.toTable(...)`` to write the
union into a regular Delta table. The catalog and schema must exist; the table itself
is created on first write. Users only need write and checkpoint permissions on the
target catalog/schema/volume for the identity that runs the notebook.
Minimal Configuration
~~~~~~~~~~~~~~~~~~~~~
The simplest valid monitoring configuration specifies the checkpoint path and the
path to the dedicated monitoring-job config file:
.. code-block:: yaml
:caption: lhp.yaml
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
This creates:
* Notebook named ``union_event_logs.py`` under ``monitoring/${env}/``
* DLT pipeline named ``${project_name}_event_log_monitoring``
* Delta table ``all_pipelines_event_log`` in the same catalog/schema as event_log
* Default ``events_summary`` materialized view (pipeline run summary with status,
duration, and row metrics)
* Workflow job ``${project_name}_event_log_monitoring_job`` chaining the notebook and the
pipeline
Custom Pipeline Name
~~~~~~~~~~~~~~~~~~~~
.. code-block:: yaml
:caption: lhp.yaml
monitoring:
pipeline_name: "my_custom_monitor"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
The pipeline name affects:
* The directory name under ``generated/`` (e.g., ``generated/dev/my_custom_monitor/``)
* The DLT pipeline resource file (e.g., ``resources/lhp/my_custom_monitor.pipeline.yml``)
* The Workflow job resource file (``resources/lhp/my_custom_monitor.job.yml``)
* The job's ``name`` field (``my_custom_monitor_job``)
* The pipeline identifier in Databricks
Custom Catalog and Schema
~~~~~~~~~~~~~~~~~~~~~~~~~
By default, the monitoring assets write to the same catalog and schema as configured
in ``event_log``. You can override either or both:
.. code-block:: yaml
:caption: lhp.yaml
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
catalog: "analytics_cat"
schema: "_analytics"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
**Override priority:**
1. Monitoring-level ``catalog``/``schema`` (highest — if specified)
2. Event log ``catalog``/``schema`` (default fallback)
Generated Artifacts
-------------------
Enabling monitoring produces up to three artifacts per environment. Each is described
below.
Union Notebook
~~~~~~~~~~~~~~
The notebook aggregates all eligible pipeline event logs into the Delta table named by
``streaming_table``. It runs N independent streaming queries — one per pipeline — each
with its own checkpoint directory.
.. code-block:: text
:caption: Path
monitoring/{env}/union_event_logs.py
.. code-block:: python
:caption: monitoring/dev/union_event_logs.py (excerpt)
TARGET_TABLE = "analytics_cat._analytics.all_pipelines_event_log"
CHECKPOINT_BASE = "/Volumes/acme_edw_dev/_meta/checkpoints/event_logs"
MAX_WORKERS = 10
SOURCES = [
("bronze_load", "acme_edw_dev._meta.bronze_load_event_log"),
("silver_transform","acme_edw_dev._meta.silver_transform_event_log"),
("gold_analytics", "acme_edw_dev._meta.gold_analytics_event_log"),
]
def _ensure_target_exists() -> None:
# Pre-create TARGET_TABLE (idempotent) so parallel streams do not race
# on CREATE during a cold run.
if spark.catalog.tableExists(TARGET_TABLE):
return
for sample_name, sample_ref in SOURCES:
try:
empty = (
spark.read.format("delta").table(sample_ref).limit(0)
.withColumn("_source_pipeline", F.lit(sample_name))
)
empty.write.format("delta").saveAsTable(TARGET_TABLE)
return
except AnalysisException:
continue
raise RuntimeError("no readable source event log")
if SOURCES:
_ensure_target_exists()
def process_source(pipeline_name: str, table_ref: str) -> str:
checkpoint = f"{CHECKPOINT_BASE}/{pipeline_name}"
query = (
spark.readStream
.format("delta")
.table(table_ref)
.withColumn("_source_pipeline", F.lit(pipeline_name))
.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(TARGET_TABLE)
)
query.awaitTermination()
return pipeline_name
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(process_source, n, r): n for n, r in SOURCES}
...
Key aspects:
* **Pre-created target table.** Before launching the executor pool,
``_ensure_target_exists()`` pre-creates the target Delta table using the schema
of the first readable source event log (event-log schemas are uniform across
Lakeflow Declarative Pipelines). Without this prologue, N parallel streams all
race to ``CREATE`` the target via ``.toTable()`` on a cold run; one wins, the
rest fail with ``TABLE_OR_VIEW_ALREADY_EXISTS``. The function is idempotent —
warm runs short-circuit via ``spark.catalog.tableExists`` — and iterates through
``SOURCES`` until one event log is readable, so partially-deployed environments
still bootstrap cleanly.
* **Per-pipeline checkpoints.** Each source has its own directory at
``{CHECKPOINT_BASE}/{pipeline_name}/``. Changing the set of sources never invalidates
existing checkpoints.
* **Append-only.** Streams use ``outputMode("append")`` with ``mergeSchema=true`` so the
target Delta table can absorb schema evolution across event log sources.
* **Finite batches.** ``trigger(availableNow=True)`` processes all available data and
then terminates — ideal for scheduled job execution rather than always-on streaming.
* **Parallel execution.** Sources run concurrently via ``ThreadPoolExecutor`` bounded by
``max_concurrent_streams``.
* **Error aggregation.** Per-source failures are collected and reported together. The
notebook exits with a ``RuntimeError`` if any source failed.
* **Alphabetical source order.** Pipeline names are sorted for deterministic output.
MVs-Only Lakeflow Declarative Pipeline
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The Lakeflow Declarative Pipeline generated by LHP for monitoring contains only
materialized views that read from the Delta table populated by the notebook. There is
**no** DLT streaming table, no source view, and no append flow — those were removed in
favor of the notebook-based union.
.. code-block:: text
:caption: Path
generated/{env}/{pipeline_name}/monitoring.py
.. code-block:: python
:caption: monitoring.py (excerpt, default MV)
PIPELINE_ID = "my_project_event_log_monitoring"
FLOWGROUP_ID = "monitoring"
@dp.materialized_view(
name="acme_edw_dev._meta.events_summary",
comment="Materialized view: events_summary",
table_properties={},
)
def events_summary():
df = spark.sql("""WITH run_info AS (
SELECT origin.pipeline_name, origin.pipeline_id, origin.update_id,
MIN(`timestamp`) AS run_start_time,
MAX(`timestamp`) AS run_end_time,
...
FROM acme_edw_dev._meta.all_pipelines_event_log
GROUP BY origin.pipeline_name, origin.pipeline_id, origin.update_id
),
run_metrics AS (...),
run_config AS (...)
SELECT ri.pipeline_name, ri.pipeline_id, ri.update_id, ri.run_status, ...
FROM run_info ri LEFT JOIN run_metrics rm ON ... LEFT JOIN run_config rc ON ...
ORDER BY ri.run_start_time DESC""")
return df
.. important::
When ``materialized_views: []`` and ``enable_job_monitoring`` is ``false``, the DLT
pipeline has no actions — LHP omits the pipeline entirely (no ``monitoring.py``, no
pipeline resource, no ``pipeline_task`` in the job). Only the notebook and the
notebook task of the job are generated.
Workflow Job
~~~~~~~~~~~~
LHP generates a Databricks Workflow job resource that orchestrates the notebook and the
DLT pipeline. Task ``union_event_logs`` runs the notebook, then the DLT pipeline task is
triggered via ``depends_on``.
.. code-block:: text
:caption: Path
resources/lhp/{pipeline_name}.job.yml
.. code-block:: yaml
:caption: resources/lhp/my_project_event_log_monitoring.job.yml
# Generated by LakehousePlumber - Monitoring Job for my_project_event_log_monitoring
resources:
jobs:
my_project_event_log_monitoring_job:
name: my_project_event_log_monitoring_job
max_concurrent_runs: 1
tasks:
- task_key: union_event_logs
notebook_task:
notebook_path: ${workspace.file_path}/monitoring/${bundle.target}/union_event_logs
source: WORKSPACE
- task_key: my_project_event_log_monitoring_pipeline
depends_on:
- task_key: union_event_logs
pipeline_task:
pipeline_id: ${resources.pipelines.my_project_event_log_monitoring_pipeline.id}
full_refresh: false
queue:
enabled: true
performance_target: STANDARD
Customizing the job (cluster, schedule, permissions, notifications, tags, etc.) is
described in :ref:`monitoring-job-config`.
Materialized Views
-------------------
Default events_summary MV
~~~~~~~~~~~~~~~~~~~~~~~~~
When ``materialized_views`` is omitted from ``monitoring``, LHP creates a default
``events_summary`` materialized view — a pipeline run summary that extracts run status,
duration, row metrics, and configuration from the union Delta table:
.. code-block:: python
:caption: monitoring.py (excerpt) — default events_summary MV
@dp.materialized_view(
name="acme_edw_dev._meta.events_summary",
comment="Materialized view: events_summary",
table_properties={},
)
def events_summary():
df = spark.sql("""WITH run_info AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.update_id,
MIN(`timestamp`) AS run_start_time,
MAX(`timestamp`) AS run_end_time,
MAX_BY(
CASE WHEN event_type = 'update_progress'
THEN details:update_progress:state::STRING END,
CASE WHEN event_type = 'update_progress'
THEN `timestamp` END
) AS run_status
FROM acme_edw_dev._meta.all_pipelines_event_log
GROUP BY origin.pipeline_name, origin.pipeline_id, origin.update_id
),
...
SELECT
ri.pipeline_name, ri.pipeline_id, ri.update_id, ri.run_status,
rc.trigger_cause, rc.is_full_refresh, rc.dbr_version, rc.compute_type,
ri.run_start_time, ri.run_end_time,
ROUND((...) / 60, 2) AS duration_minutes,
COALESCE(rm.tables_processed, 0) AS tables_processed,
COALESCE(rm.total_upserted_rows, 0) AS total_upserted_rows,
...
FROM run_info ri
LEFT JOIN run_metrics rm ON ...
LEFT JOIN run_config rc ON ...
ORDER BY ri.run_start_time DESC""")
return df
The default SQL joins three CTEs from the event log:
* **run_info** — pipeline name, update ID, start/end time, final run status
* **run_metrics** — upserted rows, deleted rows, dropped records, tables processed
* **run_config** — DBR version, compute type (Serverless/Classic), trigger cause, full refresh flag
**``events_summary`` schema:**
.. list-table::
:header-rows: 1
:widths: 25 15 60
* - Column
- Type
- Description
* - ``pipeline_name``
- STRING
- Name of the Lakeflow pipeline
* - ``pipeline_id``
- STRING
- Unique pipeline identifier
* - ``update_id``
- STRING
- Unique identifier for this pipeline run (update)
* - ``run_status``
- STRING
- Final status of the run (e.g., ``COMPLETED``, ``FAILED``, ``CANCELED``)
* - ``trigger_cause``
- STRING
- What triggered the run (e.g., ``USER_ACTION``, ``SCHEDULED``, ``API_CALL``)
* - ``is_full_refresh``
- BOOLEAN
- Whether this was a full refresh or incremental update
* - ``dbr_version``
- STRING
- Databricks Runtime version used for the run
* - ``compute_type``
- STRING
- ``Serverless`` or ``Classic``
* - ``run_start_time``
- TIMESTAMP
- When the pipeline run started
* - ``run_end_time``
- TIMESTAMP
- When the pipeline run ended
* - ``duration_minutes``
- DOUBLE
- Run duration in minutes (rounded to 2 decimal places)
* - ``tables_processed``
- BIGINT
- Number of distinct tables (flows) processed in this run
* - ``total_upserted_rows``
- BIGINT
- Total rows upserted across all tables
* - ``total_deleted_rows``
- BIGINT
- Total rows deleted across all tables
* - ``total_rows_affected``
- BIGINT
- Sum of upserted + deleted rows
* - ``total_dropped_records``
- BIGINT
- Total records dropped by data quality expectations
At generation time, LHP substitutes the ``{streaming_table}`` placeholder in the default
SQL with the fully-qualified Delta table name (e.g.,
``acme_edw_dev._meta.all_pipelines_event_log``).
Custom Materialized Views
~~~~~~~~~~~~~~~~~~~~~~~~~
You can fully customize the materialized views created by the monitoring pipeline, from
inline SQL to external files, or disable them entirely.
Inline SQL
^^^^^^^^^^
Define materialized views with inline SQL using the ``sql`` property:
.. code-block:: yaml
:caption: lhp.yaml
:emphasize-lines: 3-8
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views:
- name: "error_events"
sql: "SELECT * FROM all_pipelines_event_log WHERE event_type = 'error'"
- name: "pipeline_latency"
sql: >-
SELECT _source_pipeline, avg(duration_ms) as avg_duration
FROM all_pipelines_event_log GROUP BY _source_pipeline
This generates two materialized view functions instead of the default ``events_summary``:
.. code-block:: python
:caption: monitoring.py (excerpt) — custom inline MVs
@dp.materialized_view(
name="acme_edw_dev._meta.error_events",
comment="Materialized view: error_events",
table_properties={},
)
def error_events():
df = spark.sql(
"""SELECT * FROM all_pipelines_event_log WHERE event_type = 'error'"""
)
return df
@dp.materialized_view(
name="acme_edw_dev._meta.pipeline_latency",
comment="Materialized view: pipeline_latency",
table_properties={},
)
def pipeline_latency():
df = spark.sql(
"""SELECT _source_pipeline, avg(duration_ms) as avg_duration FROM all_pipelines_event_log GROUP BY _source_pipeline"""
)
return df
External SQL Files
^^^^^^^^^^^^^^^^^^
For complex queries, use ``sql_path`` to reference an external SQL file:
.. code-block:: yaml
:caption: lhp.yaml
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views:
- name: "custom_analysis"
sql_path: "sql/monitoring_custom_analysis.sql"
.. code-block:: sql
:caption: sql/monitoring_custom_analysis.sql
SELECT
_source_pipeline,
event_type,
date_trunc('DAY', timestamp) AS event_day,
count(*) AS daily_event_count
FROM all_pipelines_event_log
WHERE event_type IN ('FLOW_PROGRESS', 'DATASET_CREATED')
GROUP BY _source_pipeline, event_type, date_trunc('DAY', timestamp)
.. note::
``sql_path`` is resolved relative to the project root directory (where ``lhp.yaml`` lives).
Disabling Materialized Views
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To generate only the notebook and the notebook task of the job, set
``materialized_views`` to an empty list:
.. code-block:: yaml
:caption: lhp.yaml
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
materialized_views: []
When omitted entirely (or set to ``null``), the default ``events_summary`` MV is created.
This means there are three behaviors:
======================= ================================================================
Setting Behavior
======================= ================================================================
Omitted / ``null`` Default ``events_summary`` MV is created
``[]`` (empty list) No MVs and no DLT pipeline — only notebook + notebook-only job
Explicit list Only the specified MVs are created
======================= ================================================================
.. note::
When ``materialized_views: []`` and ``enable_job_monitoring`` is ``false``, LHP does
**not** emit a DLT pipeline file, a pipeline resource file, or a ``pipeline_task`` in
the Workflow job. The job contains only the ``union_event_logs`` notebook task.
Validation Rules
^^^^^^^^^^^^^^^^
LHP validates materialized view definitions at configuration load time:
* **Name required** — Each MV must have a ``name`` field
* **Unique names** — MV names must not repeat within the ``materialized_views`` list
* **Mutual exclusion** — Each MV must specify either ``sql`` or ``sql_path``, not both
Violations raise ``LHP-CFG-008`` with a descriptive error message.
.. _monitoring-job-config:
Customizing the Workflow Job
----------------------------
The generated Workflow job is configured from a **dedicated** YAML file referenced by
``monitoring.job_config_path`` in ``lhp.yaml``. This file is required when monitoring
is enabled.
.. code-block:: yaml
:caption: config/monitoring_job_config.yaml
max_concurrent_runs: 1
performance_target: PERFORMANCE_OPTIMIZED
queue:
enabled: true
timeout_seconds: 3600
notebook_cluster:
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "Standard_D4ds_v5"
num_workers: 2
schedule:
quartz_cron_expression: "0 0 * * * ?"
timezone_id: UTC
pause_status: UNPAUSED
tags:
purpose: event_log_monitoring
team: data-platform
environment: ${bundle_target}
email_notifications:
on_failure:
- monitoring-alerts@company.com
How it is resolved:
1. LHP reads the file at ``project_root / monitoring.job_config_path``.
2. Token substitution (``${...}``) is applied using the active environment's
``substitutions/.yaml``.
3. The result is deep-merged over LHP's default job config
(``max_concurrent_runs=1``, ``performance_target=STANDARD``, ``queue.enabled=true``)
— nested dicts like ``tags`` and ``queue`` merge recursively; lists are replaced
wholesale.
4. The monitoring job name is always ``${pipeline_name}_job``. Do **not** add a
``job_name`` key to this file.
Available job fields (all optional):
* ``notebook_cluster.new_cluster`` or ``notebook_cluster.existing_cluster_id`` — cluster
for the notebook task (Serverless used when neither is set)
* ``queue.enabled`` — job-run queueing
* ``performance_target``, ``timeout_seconds``, ``max_concurrent_runs``
* ``schedule`` — Quartz cron schedule
* ``tags`` — free-form tags
* ``email_notifications`` — ``on_start``/``on_success``/``on_failure``
* ``webhook_notifications`` — same events as email
* ``permissions`` — user/group permission entries
Rules
~~~~~
* The file must be a **flat single-document YAML mapping**. Do not wrap it in a
``project_defaults:`` key and do not add a ``job_name:`` key.
* The ``pipeline_task`` in the job is always generated automatically — do not redefine
it in this file.
* If the file is missing at generation time, LHP raises ``LHP-IO-001`` pointing to the
resolved absolute path.
.. note::
The legacy auto-pickup of ``templates/bundle/job_config.yaml`` for the monitoring
job — and the reserved ``__eventlog_monitoring`` alias inside it — has been
**removed**. Use ``monitoring.job_config_path`` instead. The ``__eventlog_monitoring``
alias continues to work in the generic ``config/job_config.yaml`` used by
``lhp deps`` for *orchestration* jobs; that surface is unchanged.
Pipeline Configuration for Monitoring
--------------------------------------
The monitoring DLT pipeline (materialized views) can also be configured through
``pipeline_config.yaml``. Because the pipeline name is dynamic, LHP provides a reserved
alias to avoid hardcoding.
Using the __eventlog_monitoring Alias in pipeline_config.yaml
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use the ``__eventlog_monitoring`` reserved keyword in ``pipeline_config.yaml`` to target
the monitoring pipeline without knowing its exact name:
.. code-block:: yaml
:caption: config/pipeline_config.yaml
---
pipeline: __eventlog_monitoring
serverless: false
edition: ADVANCED
clusters:
- label: default
node_type_id: Standard_D4ds_v5
autoscale:
min_workers: 1
max_workers: 4
notifications:
- email_recipients:
- monitoring-alerts@company.com
alerts:
- on-update-failure
- on-update-fatal-failure
tags:
purpose: event_log_monitoring
At generation time, ``__eventlog_monitoring`` automatically resolves to the actual monitoring
pipeline name defined in ``lhp.yaml``. The ``project_defaults`` section still applies and
merges as usual.
Behavior and Rules
~~~~~~~~~~~~~~~~~~
- If monitoring is **not configured or disabled** in ``lhp.yaml``, the alias entry is
silently ignored with a warning
- If **both** the alias and the actual monitoring pipeline name appear in the config,
an error is raised (``LHP-VAL-010``)
- The alias must be used as a **standalone** pipeline entry, not in a pipeline list
(``LHP-VAL-011``)
.. code-block:: yaml
:caption: Incorrect — alias in a list (triggers LHP-VAL-011)
---
pipeline:
- bronze_pipeline
- __eventlog_monitoring
.. code-block:: yaml
:caption: Correct — separate documents
---
pipeline: bronze_pipeline
serverless: false
---
pipeline: __eventlog_monitoring
serverless: false
.. _monitoring-jobs:
Job Monitoring
--------------
When ``enable_job_monitoring: true`` is set, LHP adds an additional Python load chain to
the monitoring DLT pipeline that correlates Databricks Jobs with their associated
pipeline runs using the Databricks SDK. The results are written to a separate
``jobs_stats`` materialized view alongside the user-specified MVs.
.. code-block:: yaml
:caption: lhp.yaml
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
enable_job_monitoring: true
**What it generates:**
In addition to the notebook and the default/user MVs, the DLT pipeline adds:
1. **Python Load → ``v_jobs_stats``** — calls a ``get_jobs_stats`` function from a
generated ``jobs_stats_loader.py`` module to fetch job run statistics via the
Databricks SDK.
2. **Write → ``jobs_stats``** — a materialized view in the same catalog/schema as the
monitoring pipeline, populated from the ``v_jobs_stats`` view. A materialized view
is used (rather than a streaming table) because the Python SDK source returns batch
data, not a streaming DataFrame.
**Generated files:**
.. code-block:: text
generated/
└── dev/
└── my_project_event_log_monitoring/
├── monitoring.py ← includes Python load + jobs_stats write
└── jobs_stats_loader.py ← shipped alongside the pipeline
The ``jobs_stats_loader.py`` module uses the Databricks SDK to scan recent job runs
(default lookback: 7 days), find pipeline tasks, and correlate each pipeline update to its
triggering job. It also enriches rows with pipeline tags (from ``spec.tags``) and job tags
(from ``settings.tags``). The lookback window is configurable via the ``lookback_hours``
pipeline parameter.
.. note::
The ``jobs_stats`` materialized view inherits its catalog and schema from the monitoring
pipeline configuration (which itself defaults to the ``event_log`` catalog/schema).
**``jobs_stats`` schema:**
.. list-table::
:header-rows: 1
:widths: 25 15 60
* - Column
- Type
- Description
* - ``pipeline_id``
- STRING
- Unique pipeline identifier
* - ``pipeline_name``
- STRING
- Name of the Lakeflow pipeline
* - ``update_id``
- STRING
- Pipeline update (run) identifier correlated to the job run
* - ``job_id``
- STRING
- Databricks Job ID that triggered this pipeline run
* - ``job_run_id``
- STRING
- Specific job run identifier
* - ``job_name``
- STRING
- Name of the triggering job
* - ``job_run_start_time``
- TIMESTAMP
- When the job run started
* - ``job_run_end_time``
- TIMESTAMP
- When the job run ended
* - ``job_run_status``
- STRING
- Final job run status (e.g., ``SUCCESS``, ``FAILED``, ``UNKNOWN``)
* - ``pipeline_tags``
- STRING
- JSON map of pipeline ``spec.tags`` (e.g., ``{"team": "data-platform"}``)
* - ``job_tags``
- STRING
- JSON map of job ``settings.tags`` (e.g., ``{"environment": "production"}``)
Automatic Cleanup
-----------------
LHP automatically reconciles monitoring artifacts on every ``lhp generate``:
* **Notebook directory** — ``monitoring/{env}/`` is cleared before the new notebook is
written. Empty ``monitoring/`` is removed.
* **Job resources** — any ``resources/*.job.yml`` whose header matches the monitoring
job comment is removed before a new job is written (so renaming ``pipeline_name``
cleans up the old file).
* **DLT pipeline directory** — when monitoring is *disabled or removed*, LHP scans
``generated/{env}/*`` for ``monitoring.py`` files with ``FLOWGROUP_ID = "monitoring"``
and removes the directory.
This means toggling monitoring on/off, renaming the pipeline, or switching
``materialized_views`` between populated and empty forms never leaves stale files behind.
Common Patterns
---------------
Minimal Setup
~~~~~~~~~~~~~
The simplest possible monitoring configuration:
.. code-block:: yaml
:caption: lhp.yaml
name: my_project
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
This gives you:
* Event log injection on all pipelines
* A notebook at ``monitoring/${env}/union_event_logs.py``
* A DLT pipeline named ``my_project_event_log_monitoring`` with a default
``events_summary`` materialized view
* A Workflow job ``my_project_event_log_monitoring_job`` chaining the two
Full Customization
~~~~~~~~~~~~~~~~~~
A fully customized monitoring setup:
.. code-block:: yaml
:caption: lhp.yaml
name: acme_edw
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_prefix: ""
name_suffix: "_event_log"
monitoring:
pipeline_name: "central_observability"
catalog: "analytics_catalog"
schema: "_monitoring"
streaming_table: "unified_event_stream"
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
max_concurrent_streams: 20
materialized_views:
- name: "error_events"
sql: "SELECT * FROM unified_event_stream WHERE event_type = 'error'"
- name: "hourly_summary"
sql: >-
SELECT _source_pipeline, date_trunc('HOUR', timestamp) AS hour,
count(*) AS cnt FROM unified_event_stream
GROUP BY _source_pipeline, date_trunc('HOUR', timestamp)
- name: "daily_analysis"
sql_path: "sql/monitoring_custom_analysis.sql"
Selective Pipeline Monitoring
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To exclude specific pipelines from event log monitoring, use ``pipeline_config.yaml``
to opt individual pipelines out:
.. code-block:: yaml
:caption: lhp.yaml — event log enabled for all by default
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/_meta/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
.. code-block:: yaml
:caption: config/pipeline_config.yaml — opt out specific pipelines
---
pipeline: temp_debug_pipeline
event_log: false
---
pipeline: experimental_pipeline
event_log: false
Pipelines that opt out with ``event_log: false`` are excluded from the notebook's
``SOURCES`` list and therefore do not contribute rows to the union Delta table.
Environment-Specific Configuration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use LHP substitution tokens for environment-aware monitoring:
.. code-block:: yaml
:caption: lhp.yaml
event_log:
catalog: "${catalog}"
schema: "${monitoring_schema}"
name_suffix: "_event_log"
monitoring:
checkpoint_path: "/Volumes/${catalog}/${monitoring_schema}/checkpoints/event_logs"
job_config_path: "config/monitoring_job_config.yaml"
.. code-block:: yaml
:caption: substitutions/dev.yaml
dev:
catalog: acme_edw_dev
monitoring_schema: _meta
.. code-block:: yaml
:caption: substitutions/prod.yaml
prod:
catalog: acme_edw_prod
monitoring_schema: _monitoring
This produces environment-specific event log table and checkpoint references at
generation time:
* **Dev:** ``acme_edw_dev._meta.bronze_load_event_log`` →
checkpoint ``/Volumes/acme_edw_dev/_meta/checkpoints/event_logs/bronze_load/``
* **Prod:** ``acme_edw_prod._monitoring.bronze_load_event_log`` →
checkpoint ``/Volumes/acme_edw_prod/_monitoring/checkpoints/event_logs/bronze_load/``
Migrating From the Pre-V0.8.2 Monitoring Pipeline
--------------------------------------------------
Before V0.8.2, LHP generated a single Lakeflow Declarative Pipeline containing a
``UNION ALL`` SQL source view, a DLT streaming table, and the materialized views. That
architecture had a structural limitation: adding or removing a monitored pipeline
changed the UNION schema and could invalidate the checkpoint, forcing a full reload.
V0.8.2 replaces that with the notebook + MVs-only pipeline + job design described above.
Migration steps:
1. **Add ``checkpoint_path``** to your ``monitoring`` block (now required — typically a
Unity Catalog volume path).
2. **Add ``job_config_path``** to your ``monitoring`` block (now required). Create the
file it points to — ``lhp init`` scaffolds
``config/monitoring_job_config_env.yaml.tmpl`` as a starting point. If you previously
used the ``__eventlog_monitoring`` alias inside ``templates/bundle/job_config.yaml``
for the monitoring job, move those settings into the new dedicated file and drop the
``job_name: __eventlog_monitoring`` wrapper plus the ``project_defaults:`` wrapper —
the monitoring config is a flat single-document mapping.
3. **Re-run ``lhp generate``.** LHP will clean up the old single-pipeline artifacts and
write the new notebook, MVs-only pipeline, and job resource.
4. **Redeploy via Databricks Asset Bundles.** The old pipeline is removed by the bundle
deploy; the new job and (MVs-only) pipeline are deployed in its place.
5. **Backfill the union Delta table** if needed. Historical event log rows written before
the new notebook's first run are not replayed into the union table unless you manually
run a one-off backfill from each event log table.
Troubleshooting
---------------
Monitoring-related errors use codes ``LHP-CFG-006`` through ``LHP-CFG-008`` (event log
and monitoring configuration) and ``LHP-VAL-010``/``LHP-VAL-011`` (pipeline config alias
issues).
Common issues:
* **``LHP-CFG-008`` — "Monitoring checkpoint_path is required"** — add
``checkpoint_path`` under ``monitoring`` or set ``monitoring.enabled: false``.
* **``LHP-CFG-008`` — "Monitoring job_config_path is required"** — add
``job_config_path`` under ``monitoring`` pointing to your dedicated monitoring job
config file, or set ``monitoring.enabled: false``.
* **``LHP-CFG-008`` — "Monitoring job_config file not found"** — the file referenced by
``job_config_path`` does not exist. Check the path (resolved relative to project root)
and create the file if needed. ``lhp init`` provides
``config/monitoring_job_config_env.yaml.tmpl`` as a starter.
* **``LHP-IO-001`` — "Monitoring job_config file not found"** — raised at generate time
when the configured ``job_config_path`` cannot be opened. The error message includes
the resolved absolute path.
* **No rows in ``all_pipelines_event_log``** — check that the Workflow job has run
successfully. The notebook writes via Structured Streaming; the Delta table is created
on first successful write.
* **``mergeSchema`` errors** — the notebook enables ``mergeSchema`` automatically on
write. If you receive schema mismatch errors, ensure your checkpoint directories are
fresh (each per-pipeline directory under ``checkpoint_path`` can be deleted
independently without affecting the others).
* **Missing pipelines in the union** — pipelines that set ``event_log: false`` in
``pipeline_config.yaml`` are excluded by design.
.. seealso::
:doc:`errors_reference` for detailed before/after examples and resolution steps for
each error code.
Related Documentation
---------------------
* :doc:`databricks_bundles` — Bundle integration, pipeline configuration, and resource generation
* :doc:`concepts` — Understanding pipelines, flowgroups, and project configuration
* :doc:`actions/test_reporting` — publish DQ test results to external systems
* :doc:`errors_reference` — Complete error code reference
* :doc:`cli` — Command-line reference