Operational Metadata¶
Column Definitions¶
Operational metadata are automatically generated columns that provide lineage, data provenance, and processing context. These columns are added to your tables without requiring manual SQL modifications.
Note
Operational metadata columns are defined in the project level configuration file. under the operational_metadata key.
Project-level configuration:
1# LakehousePlumber Project Configuration
2name: my_lakehouse_project
3version: "1.0"
4
5operational_metadata:
6 columns:
7 _processing_timestamp:
8 expression: "F.current_timestamp()"
9 description: "When the record was processed by the pipeline"
10 applies_to: ["streaming_table", "materialized_view", "view"]
11
12 _source_file_path:
13 expression: "F.col('_metadata.file_path')"
14 description: "Source file path for lineage tracking"
15 applies_to: ["view"]
16
17 _record_hash:
18 expression: "F.xxhash64(*[F.col(c) for c in df.columns])"
19 description: "Hash of all record fields for change detection"
20 applies_to: ["streaming_table", "materialized_view", "view"]
21 additional_imports:
22 - "from pyspark.sql.functions import xxhash64"
23
24 _pipeline_name:
25 expression: "F.lit('${pipeline_name}')"
26 description: "Name of the processing pipeline"
27 applies_to: ["streaming_table", "materialized_view", "view"]
Version Requirements¶
LakehousePlumber supports version enforcement to ensure consistent code generation across development and CI environments. This prevents “works on my machine” issues and ensures reproducible builds.
Basic configuration:
1# LakehousePlumber Project Configuration
2name: my_lakehouse_project
3version: "1.0"
4
5# Enforce version requirements (optional)
6required_lhp_version: ">=0.4.1,<0.5.0" # Allow patch updates within 0.4.x
Version specification formats:
# Exact version pin (strict)
required_lhp_version: "==0.4.1"
# Allow patch updates only
required_lhp_version: "~=0.4.1" # Equivalent to >=0.4.1,<0.5.0
# Range with exclusions
required_lhp_version: ">=0.4.1,<0.5.0,!=0.4.3" # Exclude known bad version
# Allow minor updates
required_lhp_version: ">=0.4.0,<1.0.0"
Behavior:
When
required_lhp_versionis set,lhp validateandlhp generatewill fail if the installed version doesn’t satisfy the requirementInformational commands like
lhp showskip version checking to allow inspection even with mismatchesVersion checking uses PEP 440 version specifiers
Emergency bypass:
# Temporarily bypass version checking
export LHP_IGNORE_VERSION=1
lhp generate -e dev
# Or inline
LHP_IGNORE_VERSION=1 lhp validate -e prod
CI/CD integration:
# Install exact version matching project requirements
pip install "lakehouse-plumber$(yq -r .required_lhp_version lhp.yaml | sed 's/^//')"
# Or use range-compatible version
pip install "lakehouse-plumber>=0.4.1,<0.5.0"
# Validate and generate (will fail if version mismatch)
lhp validate -e prod
lhp generate -e prod
Note
Version enforcement is optional. Projects without required_lhp_version work normally with any installed LakehousePlumber version.
Warning
Use the bypass environment variable (LHP_IGNORE_VERSION=1) only in emergencies. It’s not recommended for production environments as it defeats the purpose of version consistency.
Event Log Configuration¶
LakehousePlumber supports project-level event log configuration in lhp.yaml. When
configured, event log blocks are automatically injected into all pipeline resource files
during generation — no -pc flag or pipeline_config.yaml required.
name: my_lakehouse_project
version: "1.0"
event_log:
catalog: "${catalog}"
schema: _meta
name_suffix: "_event_log"
When event_log is defined, each generated pipeline resource will include an event_log
block with the table name derived from the pipeline name (e.g., bronze_load_event_log).
Individual pipelines can override or opt out of project-level event logging through
pipeline_config.yaml.
See also
For complete details including per-pipeline overrides, opt-out, monitoring pipeline setup, and all configuration options, see Pipeline Monitoring.
Target Type Compatibility¶
The applies_to field controls which DLT table types can use each operational metadata column.
LHP automatically filters columns based on the target type to prevent runtime errors.
Purpose of target type restrictions:
When defining operational metadata columns at the project level, the applies_to field serves as a
safeguard mechanism to protect end users from accidentally using incompatible columns in their
pipeline configurations. This is a defensive design pattern that prevents common mistakes.
Best practice for project administrators:
Set restrictive
applies_tovalues for source-specific columns (e.g., CloudFiles metadata)Use broader
applies_tovalues for universal columns (e.g., timestamps, pipeline names)This protects pipeline developers from runtime failures and provides clear usage guidance
Target types:
``view`` - Source views created by load actions (
@dp.temporary_view())``streaming_table`` - Live tables with streaming updates (
@dp.materialized_view())``materialized_view`` - Batch-computed views for analytics (
@dp.temporary_view())
Source-specific metadata limitations:
Warning
Metadata columns that depend on CloudFiles features (like
_metadata.file_path) are only available in views that load data from CloudFiles sources. These columns will cause runtime errors if used with JDBC, SQL, Delta, or custom_datasource sources.Custom data sources may provide their own metadata columns depending on their implementation, but CloudFiles-specific metadata will not be available.
See also
For complete details on file metadata columns available in Databricks CloudFiles, refer to the Databricks documentation: File Metadata Columns
Examples of source-restricted columns:
1operational_metadata:
2 columns:
3 _source_file_name:
4 expression: "F.col('_metadata.file_name')"
5 description: "Original file name with extension"
6 applies_to: ["view"] # Only views, and only CloudFiles sources
7
8 _file_modification_time:
9 expression: "F.col('_metadata.file_modification_time')"
10 description: "When the source file was last modified"
11 applies_to: ["view"] # Only views, and only CloudFiles sources
12
13 _processing_timestamp:
14 expression: "F.current_timestamp()"
15 description: "When record was processed (works everywhere)"
16 applies_to: ["streaming_table", "materialized_view", "view"]
Safe usage patterns:
1# CloudFiles load action - can use file metadata
2- name: load_files
3 type: load
4 source:
5 type: cloudfiles
6 path: "/mnt/data/*.json"
7 operational_metadata:
8 - "_source_file_name" # ✓ Available in CloudFiles
9 - "_file_modification_time" # ✓ Available in CloudFiles
10 - "_processing_timestamp" # ✓ Available everywhere
11 target: v_file_data
12
13# JDBC load action - file metadata not available
14- name: load_database
15 type: load
16 source:
17 type: jdbc
18 table: "customers"
19 operational_metadata:
20 - "_processing_timestamp" # ✓ Available everywhere
21 # DO NOT USE: "_source_file_name" would cause runtime error
22 target: v_database_data
23
24# Custom data source - metadata depends on implementation
25- name: load_api_data
26 type: load
27 module_path: "data_sources/api_source.py"
28 custom_datasource_class: "APIDataSource"
29 options:
30 api_endpoint: "https://api.example.com/data"
31 operational_metadata:
32 - "_processing_timestamp" # ✓ Available everywhere
33 # Custom metadata depends on DataSource implementation
34 target: v_api_data
Usage in YAML Files¶
Operational metadata can be configured at multiple levels with additive behavior - columns from all levels are combined together:
Important
Additive Behavior: Operational metadata columns are never overridden between levels.
Instead, columns from preset + flowgroup + action levels are combined together.
The only exception is operational_metadata: false at action level, which disables all metadata.
Preset level
1name: bronze_layer
2version: "1.0"
3
4defaults:
5 operational_metadata: ["_processing_timestamp", "_source_file_path"]
FlowGroup level
1pipeline: customer_ingestion
2flowgroup: load_customers
3presets: ["bronze_layer"]
4operational_metadata: ["_record_hash"] # Adds to preset columns
5
6actions:
7 - name: load_customer_files
8 type: load
9 source:
10 type: cloudfiles
11 path: "/mnt/landing/customers/*.json"
12 format: json
13 target: v_customers_raw
Action level
1actions:
2 - name: load_with_custom_metadata
3 type: load
4 source:
5 type: cloudfiles
6 path: "/mnt/data/*.parquet"
7 format: parquet
8 operational_metadata: # Adds to flowgroup + preset columns
9 - "_pipeline_name"
10 - "_custom_business_logic"
11 target: v_enriched_data
12
13 - name: load_without_metadata
14 type: load
15 source:
16 type: sql
17 sql: "SELECT * FROM source_table"
18 operational_metadata: false # Disables all metadata
19 target: v_clean_data
Additive behavior example:
1# Preset defines base columns
2# presets/bronze_layer.yaml
3defaults:
4 operational_metadata: ["_processing_timestamp"]
5
6# FlowGroup adds more columns
7pipeline: customer_ingestion
8flowgroup: load_customers
9operational_metadata: ["_source_file_path", "_record_hash"]
10
11actions:
12 - name: load_customer_files
13 type: load
14 source:
15 type: cloudfiles
16 path: "/mnt/data/*.json"
17 # Action adds even more columns
18 operational_metadata:
19 - "_pipeline_name"
20 - "_custom_business_logic"
21 target: v_customers_raw
22
23# Final result: ALL columns combined
24# ✓ _processing_timestamp (from preset)
25# ✓ _source_file_path (from flowgroup)
26# ✓ _record_hash (from flowgroup)
27# ✓ _pipeline_name (from action)
28# ✓ _custom_business_logic (from action)
Usage Patterns¶
Enable all available columns:
operational_metadata: true
Select specific columns:
operational_metadata:
- "_processing_timestamp"
- "_source_file_path"
- "_record_hash"
Disable metadata:
operational_metadata: false
Generated Python code:
1@dp.temporary_view()
2def v_customers_raw():
3 """Load customer files from landing zone"""
4 df = spark.readStream \
5 .format("cloudFiles") \
6 .option("cloudFiles.format", "json") \
7 .load("/mnt/landing/customers/*.json")
8
9 # Add operational metadata columns
10 df = df.withColumn('_processing_timestamp', F.current_timestamp())
11 df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
12 df = df.withColumn('_record_hash', F.xxhash64(*[F.col(c) for c in df.columns]))
13
14 return df
Danger
When you add operational metadata columns to an upstream action, if your downstream action is a transformation, for example SQL transform, you need to make sure they are included in the SQL query.
Internal Implementation Note¶
The codebase maintains strict semantic separation between single and multi-document YAML files:
load_yaml_file()- For single-document files (configs, templates, presets)Validates exactly one document exists
Raises
MultiDocumentError(LHP-IO-003) for empty files or files with multiple documentsUsed for templates, presets, configs, and other single-document files
load_yaml_documents_all()- For multi-document files (flowgroup files only)Returns list of all documents
Used exclusively for flowgroup YAML files that may contain multiple flowgroups
This strict validation prevents accidental misuse and catches bugs early. If you encounter a
MultiDocumentError, the error message will guide you to the correct loading method.