Examples

This section showcases realistic Lakehouse Plumber configurations. The first example – ACMI – ships with the repository and demonstrates a full Bronze → Silver → Gold medallion pipeline based on the TPC-H dataset.

ACMI Retail Demo

Folder: Example_Projects/acmi

Highlights

  • Multi-format ingestion – CSV, JSON, Parquet using cloudfiles.

  • Bronze → Silver → Gold layers encoded as separate pipelines.

  • Change-Data-Feed (CDC) enabled on streaming tables.

  • Data-Quality Expectations (expectations/*.json).

  • Templates & Presets to avoid repetition.

  • Environment substitutions for dev, tst, prod.

Walk-through

# 1. Install prereqs & enter project root
pip install lakehouse-plumber
cd Example_Projects/acmi

# 2. Validate all pipelines for dev environment
lhp validate --env dev

# 3. Generate Bronze layer code (raw ingestion)
lhp generate --env dev --pipeline 01_raw_ingestion

# Check ./generated/ for Python DLT scripts

# 4. Generate Silver layer
lhp generate --env dev --pipeline 03_silver

# 5. Generate Gold analytics views
lhp generate --env dev --pipeline 04_gold

Customising the Example

  1. Edit substitutions/dev.yaml to match your catalog and storage paths.

  2. Tweak presets under presets/ (e.g., change table properties).

  3. Adjust schema hints or expectations JSON to enforce your data contract.

Multi-Flowgroup Files

For projects with many similar flowgroups, you can combine multiple flowgroups into a single YAML file to reduce file proliferation and improve organization.

Example: SAP Master Data (3 files → 1 file)

Instead of:

  • brand_ingestion.yaml

  • category_ingestion.yaml

  • carrier_ingestion.yaml

Use one file sap_master_data.yaml:

pipeline: raw_ingestions_sap
use_template: TMPL003_parquet_ingestion_template

flowgroups:
  - flowgroup: sap_brand_ingestion
    template_parameters:
      table_name: raw_sap_brand
      landing_folder: brand

  - flowgroup: sap_cat_ingestion
    template_parameters:
      table_name: raw_sap_cat
      landing_folder: category

  - flowgroup: sap_carrier_ingestion
    template_parameters:
      table_name: raw_sap_carrier
      landing_folder: carrier

Result: 67% file reduction with identical functionality.

See Multi-Flowgroup YAML Files for complete documentation with inheritance rules, syntax options, migration guides, and real-world examples.

Local Variables

Local variables reduce repetition within a single flowgroup by defining reusable values. They use %{variable} syntax and are resolved before templates and environment substitutions.

Simple Example

Instead of repeating “customer” throughout your flowgroup:

Without local variables (repetitive)
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline

actions:
  - name: "load_customer_raw"
    source:
      table: "customer_raw"
    target: "v_customer_raw"

  - name: "customer_cleanse"
    source: "v_customer_raw"
    target: "v_customer_cleaned"

  - name: "write_customer_bronze"
    source: "v_customer_cleaned"
    write_target:
      table: "customer"

Use local variables to define it once:

With local variables (DRY principle)
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline

variables:
  entity: customer
  source_table: customer_raw
  target_table: customer

actions:
  - name: "load_%{entity}_raw"
    source:
      table: "%{source_table}"
    target: "v_%{entity}_raw"

  - name: "%{entity}_cleanse"
    source: "v_%{entity}_raw"
    target: "v_%{entity}_cleaned"

  - name: "write_%{entity}_bronze"
    source: "v_%{entity}_cleaned"
    write_target:
      table: "%{target_table}"

Benefits:

  • Single source of truth: Change “customer” to “order” in one place

  • Reduced errors: No risk of inconsistent naming across actions

  • Better readability: Intent is clear from the variables section

  • Works everywhere: Inline patterns like prefix_%{var}_suffix supported

Real-World Example

Here’s a production pattern combining local variables with environment substitutions:

pipelines/bronze/product_ingestion.yaml
 1pipeline: acmi_edw_bronze
 2flowgroup: product_pipeline
 3
 4variables:
 5  entity: product
 6  source_table: product_raw
 7  target_table: product
 8  schema_file: product_schema.yaml
 9
10actions:
11  - name: "load_%{entity}_raw"
12    type: load
13    operational_metadata: ["_processing_timestamp"]
14    readMode: stream
15    source:
16      type: delta
17      database: "${catalog}.${raw_schema}"  # Environment token
18      table: "%{source_table}"            # Local variable
19    target: "v_%{entity}_raw"
20    description: "Load %{entity} table from raw schema"
21
22  - name: "%{entity}_quality_check"
23    type: transform
24    transform_type: sql
25    source: "v_%{entity}_raw"
26    target: "v_%{entity}_validated"
27    sql_path: "sql/quality_checks/%{entity}_check.sql"
28    expectations_path: "expectations/%{entity}_expectations.json"
29
30  - name: "write_%{entity}_bronze"
31    type: write
32    source: "v_%{entity}_validated"
33    write_target:
34      type: streaming_table
35      database: "${catalog}.${bronze_schema}"  # Environment token
36      table: "%{target_table}"               # Local variable
37      schema_hints_path: "schemas/%{schema_file}"

Notice: Local variables (%{entity}) and environment tokens (${catalog}) work together seamlessly.

See Templates Reference for complete documentation on local variables, including:

  • Recursive variable definitions

  • Error handling for undefined variables

  • Interaction with templates and presets

  • Processing order details

Sink Examples

The ACME Supermarkets project includes comprehensive sink examples demonstrating data export to external systems. These examples showcase Delta, Kafka, and custom API sinks for streaming data to destinations beyond traditional DLT-managed tables.

Location: Example_Projects/acme_supermarkets_lhp/pipelines/06_sink_examples/

Delta Sink Example

Export aggregated sales metrics to external Unity Catalog for cross-workspace analytics.

File: 01_delta_sink_external_catalog.yaml

cd Example_Projects/acme_supermarkets_lhp
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py

Key features:

  • Aggregates silver layer data

  • Writes to external catalog table

  • Schema evolution enabled

  • Optimized writes for performance

Kafka Sink Example

Stream order fulfillment events to Kafka for real-time processing by downstream systems.

File: 02_kafka_sink_order_events.yaml

Key features:

  • Transforms data to Kafka key/value format using to_json()

  • JSON serialization of order events

  • Kafka headers for event metadata

  • Security and performance tuning configuration

Important

Kafka sinks require explicit key and value columns created in a transform action before writing.

Azure Event Hubs Example

Stream inventory alerts to Azure Event Hubs using OAuth authentication.

File: 03_event_hubs_sink_inventory_alerts.yaml

Key features:

  • OAuth authentication with Azure Event Hubs

  • Kafka-compatible interface (sink_type: kafka)

  • Priority-based alert routing

  • Unity Catalog service credentials

Custom API Sink Example

Push customer profile updates to external CRM via REST API.

File: 04_custom_api_sink_customer_updates.yaml

Custom implementation: sinks/customer_api_sink.py

Key features:

  • HTTP POST with bearer token authentication

  • Batch processing with configurable batch size

  • Retry logic with exponential backoff

  • Dead letter queue for failed records

  • Comprehensive error logging

Walk-through

cd Example_Projects/acme_supermarkets_lhp

# Validate sink configurations
lhp validate --env dev

# Generate all sink examples
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline

# Inspect generated Python code
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/kafka_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/custom_api_sink_example.py

# Deploy with Databricks bundles
databricks bundle deploy -t dev

For more details on sink configuration and options, see Write Actions.

More Examples (Coming Soon)

  • JDBC ingestion with on-prem Oracle.

  • Incremental snapshot tables using delta load and materialized_view write.

  • Python transform with Pandas-UDF cleaning.

Contributions welcome – open a PR adding a folder under Example_Projects!