Operational Metadata ==================== .. meta:: :description: Configure operational metadata columns for lineage, provenance, and processing context in Lakehouse Plumber pipelines. Column Definitions ------------------ Operational metadata are automatically generated columns that provide lineage, data provenance, and processing context. These columns are added to your tables without requiring manual SQL modifications. .. note:: Operational metadata columns are defined in the project level configuration file. under the ``operational_metadata`` key. **Project-level configuration:** .. code-block:: yaml :caption: lhp.yaml - Project operational metadata configuration :linenos: # LakehousePlumber Project Configuration name: my_lakehouse_project version: "1.0" operational_metadata: columns: _processing_timestamp: expression: "F.current_timestamp()" description: "When the record was processed by the pipeline" applies_to: ["streaming_table", "materialized_view", "view"] _source_file_path: expression: "F.col('_metadata.file_path')" description: "Source file path for lineage tracking" applies_to: ["view"] _record_hash: expression: "F.xxhash64(*[F.col(c) for c in df.columns])" description: "Hash of all record fields for change detection" applies_to: ["streaming_table", "materialized_view", "view"] additional_imports: - "from pyspark.sql.functions import xxhash64" _pipeline_name: expression: "F.lit('${pipeline_name}')" description: "Name of the processing pipeline" applies_to: ["streaming_table", "materialized_view", "view"] Version Requirements -------------------- LakehousePlumber supports version enforcement to ensure consistent code generation across development and CI environments. This prevents "works on my machine" issues and ensures reproducible builds. **Basic configuration:** .. code-block:: yaml :caption: lhp.yaml - Version enforcement examples :linenos: # LakehousePlumber Project Configuration name: my_lakehouse_project version: "1.0" # Enforce version requirements (optional) required_lhp_version: ">=0.4.1,<0.5.0" # Allow patch updates within 0.4.x **Version specification formats:** .. code-block:: yaml :caption: Version requirement examples # Exact version pin (strict) required_lhp_version: "==0.4.1" # Allow patch updates only required_lhp_version: "~=0.4.1" # Equivalent to >=0.4.1,<0.5.0 # Range with exclusions required_lhp_version: ">=0.4.1,<0.5.0,!=0.4.3" # Exclude known bad version # Allow minor updates required_lhp_version: ">=0.4.0,<1.0.0" **Behavior:** - When ``required_lhp_version`` is set, ``lhp validate`` and ``lhp generate`` will fail if the installed version doesn't satisfy the requirement - Informational commands like ``lhp show`` skip version checking to allow inspection even with mismatches - Version checking uses `PEP 440 `_ version specifiers **Emergency bypass:** .. code-block:: bash :caption: Bypass version checking in emergencies # Temporarily bypass version checking export LHP_IGNORE_VERSION=1 lhp generate -e dev # Or inline LHP_IGNORE_VERSION=1 lhp validate -e prod **CI/CD integration:** .. code-block:: bash :caption: CI pipeline with version enforcement # Install exact version matching project requirements pip install "lakehouse-plumber$(yq -r .required_lhp_version lhp.yaml | sed 's/^//')" # Or use range-compatible version pip install "lakehouse-plumber>=0.4.1,<0.5.0" # Validate and generate (will fail if version mismatch) lhp validate -e prod lhp generate -e prod .. note:: Version enforcement is **optional**. Projects without ``required_lhp_version`` work normally with any installed LakehousePlumber version. .. warning:: Use the bypass environment variable (``LHP_IGNORE_VERSION=1``) only in emergencies. It's not recommended for production environments as it defeats the purpose of version consistency. Event Log Configuration ----------------------- LakehousePlumber supports project-level event log configuration in ``lhp.yaml``. When configured, event log blocks are automatically injected into all pipeline resource files during generation — no ``-pc`` flag or ``pipeline_config.yaml`` required. .. code-block:: yaml :caption: lhp.yaml - Event log configuration name: my_lakehouse_project version: "1.0" event_log: catalog: "${catalog}" schema: _meta name_suffix: "_event_log" When ``event_log`` is defined, each generated pipeline resource will include an ``event_log`` block with the table name derived from the pipeline name (e.g., ``bronze_load_event_log``). Individual pipelines can override or opt out of project-level event logging through ``pipeline_config.yaml``. .. seealso:: For complete details including per-pipeline overrides, opt-out, monitoring pipeline setup, and all configuration options, see :doc:`monitoring`. Target Type Compatibility ------------------------- The ``applies_to`` field controls which DLT table types can use each operational metadata column. LHP automatically filters columns based on the target type to prevent runtime errors. **Purpose of target type restrictions:** When defining operational metadata columns at the project level, the ``applies_to`` field serves as a **safeguard mechanism** to protect end users from accidentally using incompatible columns in their pipeline configurations. This is a defensive design pattern that prevents common mistakes. **Best practice for project administrators:** - Set restrictive ``applies_to`` values for source-specific columns (e.g., CloudFiles metadata) - Use broader ``applies_to`` values for universal columns (e.g., timestamps, pipeline names) - This protects pipeline developers from runtime failures and provides clear usage guidance **Target types:** - **``view``** - Source views created by load actions (``@dp.temporary_view()``) - **``streaming_table``** - Live tables with streaming updates (``@dp.materialized_view()``) - **``materialized_view``** - Batch-computed views for analytics (``@dp.temporary_view()``) **Source-specific metadata limitations:** .. warning:: - Metadata columns that depend on CloudFiles features (like ``_metadata.file_path``) are **only available in views** that load data from CloudFiles sources. These columns will cause runtime errors if used with JDBC, SQL, Delta, or custom_datasource sources. - Custom data sources may provide their own metadata columns depending on their implementation, but CloudFiles-specific metadata will not be available. .. seealso:: For complete details on file metadata columns available in Databricks CloudFiles, refer to the Databricks documentation: `File Metadata Columns `_ **Examples of source-restricted columns:** .. code-block:: yaml :caption: CloudFiles-only operational metadata :linenos: :emphasize-lines: 6 operational_metadata: columns: _source_file_name: expression: "F.col('_metadata.file_name')" description: "Original file name with extension" applies_to: ["view"] # Only views, and only CloudFiles sources _file_modification_time: expression: "F.col('_metadata.file_modification_time')" description: "When the source file was last modified" applies_to: ["view"] # Only views, and only CloudFiles sources _processing_timestamp: expression: "F.current_timestamp()" description: "When record was processed (works everywhere)" applies_to: ["streaming_table", "materialized_view", "view"] **Safe usage patterns:** .. code-block:: yaml :caption: Source-aware metadata configuration :linenos: # CloudFiles load action - can use file metadata - name: load_files type: load source: type: cloudfiles path: "/mnt/data/*.json" operational_metadata: - "_source_file_name" # ✓ Available in CloudFiles - "_file_modification_time" # ✓ Available in CloudFiles - "_processing_timestamp" # ✓ Available everywhere target: v_file_data # JDBC load action - file metadata not available - name: load_database type: load source: type: jdbc table: "customers" operational_metadata: - "_processing_timestamp" # ✓ Available everywhere # DO NOT USE: "_source_file_name" would cause runtime error target: v_database_data # Custom data source - metadata depends on implementation - name: load_api_data type: load module_path: "data_sources/api_source.py" custom_datasource_class: "APIDataSource" options: api_endpoint: "https://api.example.com/data" operational_metadata: - "_processing_timestamp" # ✓ Available everywhere # Custom metadata depends on DataSource implementation target: v_api_data Usage in YAML Files ------------------- Operational metadata can be configured at multiple levels with **additive behavior** - columns from all levels are combined together: .. important:: **Additive Behavior**: Operational metadata columns are **never overridden** between levels. Instead, columns from preset + flowgroup + action levels are **combined together**. The only exception is ``operational_metadata: false`` at action level, which disables **all** metadata. **Preset level** .. code-block:: yaml :caption: presets/bronze_layer.yaml :linenos: name: bronze_layer version: "1.0" defaults: operational_metadata: ["_processing_timestamp", "_source_file_path"] **FlowGroup level** .. code-block:: yaml :caption: pipelines/customer_ingestion/load_customers.yaml :linenos: :emphasize-lines: 4 pipeline: customer_ingestion flowgroup: load_customers presets: ["bronze_layer"] operational_metadata: ["_record_hash"] # Adds to preset columns actions: - name: load_customer_files type: load source: type: cloudfiles path: "/mnt/landing/customers/*.json" format: json target: v_customers_raw **Action level** .. code-block:: yaml :caption: Action-specific metadata configuration :linenos: :emphasize-lines: 8-11 actions: - name: load_with_custom_metadata type: load source: type: cloudfiles path: "/mnt/data/*.parquet" format: parquet operational_metadata: # Adds to flowgroup + preset columns - "_pipeline_name" - "_custom_business_logic" target: v_enriched_data - name: load_without_metadata type: load source: type: sql sql: "SELECT * FROM source_table" operational_metadata: false # Disables all metadata target: v_clean_data **Additive behavior example:** .. code-block:: yaml :caption: Complete example showing additive behavior :linenos: :emphasize-lines: 4, 9, 18-20 # Preset defines base columns # presets/bronze_layer.yaml defaults: operational_metadata: ["_processing_timestamp"] # FlowGroup adds more columns pipeline: customer_ingestion flowgroup: load_customers operational_metadata: ["_source_file_path", "_record_hash"] actions: - name: load_customer_files type: load source: type: cloudfiles path: "/mnt/data/*.json" # Action adds even more columns operational_metadata: - "_pipeline_name" - "_custom_business_logic" target: v_customers_raw # Final result: ALL columns combined # ✓ _processing_timestamp (from preset) # ✓ _source_file_path (from flowgroup) # ✓ _record_hash (from flowgroup) # ✓ _pipeline_name (from action) # ✓ _custom_business_logic (from action) Usage Patterns -------------- **Enable all available columns:** .. code-block:: yaml operational_metadata: true **Select specific columns:** .. code-block:: yaml operational_metadata: - "_processing_timestamp" - "_source_file_path" - "_record_hash" **Disable metadata:** .. code-block:: yaml operational_metadata: false **Generated Python code:** .. code-block:: python :caption: Generated DLT code with operational metadata :linenos: :emphasize-lines: 8-11 @dp.temporary_view() def v_customers_raw(): """Load customer files from landing zone""" df = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .load("/mnt/landing/customers/*.json") # Add operational metadata columns df = df.withColumn('_processing_timestamp', F.current_timestamp()) df = df.withColumn('_source_file_path', F.col('_metadata.file_path')) df = df.withColumn('_record_hash', F.xxhash64(*[F.col(c) for c in df.columns])) return df .. danger:: - When you add operational metadata columns to an upstream action, if your downstream action is a transformation, for example SQL transform, you need to make sure they are included in the SQL query. Internal Implementation Note ----------------------------- The codebase maintains strict semantic separation between single and multi-document YAML files: - ``load_yaml_file()`` - For single-document files (configs, templates, presets) * Validates exactly one document exists * Raises ``MultiDocumentError`` (LHP-IO-003) for empty files or files with multiple documents * Used for templates, presets, configs, and other single-document files - ``load_yaml_documents_all()`` - For multi-document files (flowgroup files only) * Returns list of all documents * Used exclusively for flowgroup YAML files that may contain multiple flowgroups This strict validation prevents accidental misuse and catches bugs early. If you encounter a ``MultiDocumentError``, the error message will guide you to the correct loading method.