Templates Reference

Templates are reusable action patterns that eliminate repetitive configuration and standardize common data pipeline workflows. They use parameter substitution to generate customized actions from a single template definition.

Templates Overview

Templates transform parametrized action patterns into concrete pipeline actions through variable substitution. Think of them as functions that accept parameters and return a list of actions configured for your specific use case.

Key Benefits:

Benefit

Description

Code Reuse

Define once, use many times across different tables
and data sources
Standardization

Enforce consistent patterns and configurations across
your data platform
Maintainability

Update logic in one place, automatically propagate
changes to all users of the template
Simplification

Reduce complex 100+ line configurations to simple
5-line parameter definitions

Template vs FlowGroup:

  • Templates are reusable patterns stored in templates/ directory

  • FlowGroups are concrete pipeline definitions that may use templates

  • Template Parameters customize the template for each specific use case

Template Structure

Every template file contains these core elements:

Basic template structure
 1name: my_template                    # Unique template identifier
 2version: "1.0"                      # Template version for tracking
 3description: "Template description" # Documentation
 4
 5presets: []                         # Optional presets to apply
 6
 7parameters:                         # Parameter definitions
 8  - name: param_name
 9    type: string
10    required: true
11    description: "Parameter description"
12    default: "default_value"
13
14actions:                           # Action patterns with {{ }} expressions
15  - name: "load_{{ table_name }}"
16    type: load
17    source:
18      type: cloudfiles
19      path: "{{ data_path }}/*.csv"
20    target: "v_{{ table_name }}_raw"

Required Fields:

  • name: Unique identifier for the template across your project

  • actions: List of action patterns that will be generated

Optional Fields:

  • version: Template version for change tracking and compatibility

  • description: Human-readable explanation of template purpose

  • presets: List of preset names to apply to generated actions

  • parameters: Parameter definitions with types and validation

Parameter Types

Templates support multiple parameter types with automatic type conversion and validation:

string

String parameters are the most common type for names, paths, and configuration values:

String parameter examples
 1parameters:
 2  - name: table_name
 3    type: string
 4    required: true
 5    description: "Name of the target table"
 6
 7  - name: file_format
 8    type: string
 9    required: false
10    default: "parquet"
11    description: "Input file format (csv, json, parquet)"

Usage in templates:

actions:
  - name: "load_{{ table_name }}_data"
    source:
      type: cloudfiles
      format: "{{ file_format }}"
      path: "/data/{{ table_name }}/*.{{ file_format }}"

object

Object parameters accept complex nested configurations as natural YAML objects:

Object parameter examples
 1parameters:
 2  - name: table_properties
 3    type: object
 4    required: false
 5    default: {}
 6    description: "Delta table properties for optimization"
 7
 8  - name: spark_conf
 9    type: object
10    required: false
11    default: {}
12    description: "Spark configuration for the streaming operation"

Usage in FlowGroup (Natural YAML):

FlowGroup using object parameters
 1use_template: advanced_streaming_template
 2template_parameters:
 3  table_name: customer_data
 4  table_properties:
 5    delta.enableChangeDataFeed: true
 6    delta.autoOptimize.optimizeWrite: true
 7    delta.autoOptimize.autoCompact: true
 8    custom.business.owner: "data_team"
 9  spark_conf:
10    spark.sql.streaming.stateStore.rebalancing.enabled: true
11    spark.sql.adaptive.coalescePartitions.enabled: true

Template usage:

Template usage
1actions:
2  - name: "write_{{ table_name }}_table"
3    type: write
4    write_target:
5      type: streaming_table
6      table_properties: "{{ table_properties }}"
7      spark_conf: "{{ spark_conf }}"

array

Array parameters accept lists of values using natural YAML array syntax:

Array parameter examples
 1parameters:
 2  - name: partition_columns
 3    type: array
 4    required: false
 5    default: []
 6    description: "Columns to partition the table by"
 7
 8  - name: cluster_columns
 9    type: array
10    required: false
11    default: []
12    description: "Columns for Liquid Clustering optimization"

Usage in FlowGroup (Natural YAML):

FlowGroup using array parameters
 1use_template: partitioned_table_template
 2template_parameters:
 3  table_name: sales_transactions
 4  partition_columns:
 5    - "year"
 6    - "month"
 7    - "region"
 8  cluster_columns:
 9    - "customer_id"
10    - "product_id"

Template usage:

Template usage
1actions:
2  - name: "write_{{ table_name }}_table"
3    type: write
4    write_target:
5      type: streaming_table
6      partition_columns: "{{ partition_columns }}"
7      cluster_columns: "{{ cluster_columns }}"

boolean

Boolean parameters control conditional behavior with true/false values:

Boolean parameter examples
 1parameters:
 2  - name: enable_cdc
 3    type: boolean
 4    required: false
 5    default: true
 6    description: "Enable Change Data Feed on the target table"
 7
 8  - name: create_table
 9    type: boolean
10    required: false
11    default: true
12    description: "Whether to create the target table"

Usage in FlowGroup:

FlowGroup using boolean parameters
1use_template: configurable_table_template
2template_parameters:
3  table_name: customer_master
4  enable_cdc: true
5  create_table: false  # Append to existing table

Template usage:

Template usage
1actions:
2  - name: "write_{{ table_name }}_table"
3    type: write
4    write_target:
5      type: streaming_table
6      create_table: "{{ create_table }}"
7      table_properties:
8        delta.enableChangeDataFeed: "{{ enable_cdc }}"

number

Number parameters accept integer and floating-point values:

Number parameter examples
 1parameters:
 2  - name: max_files_per_trigger
 3    type: number
 4    required: false
 5    default: 1000
 6    description: "Maximum files to process per streaming trigger"
 7
 8  - name: batch_size
 9    type: number
10    required: false
11    default: 50000
12    description: "Number of records to process in each batch"

Usage in FlowGroup:

FlowGroup using number parameters
use_template: optimized_ingestion_template
template_parameters:
  table_name: transaction_logs
  max_files_per_trigger: 500
  batch_size: 100000

Template usage:

Template usage
1actions:
2  - name: "load_{{ table_name }}_files"
3    type: load
4    source:
5      type: cloudfiles
6      options:
7        cloudFiles.maxFilesPerTrigger: "{{ max_files_per_trigger }}"

Template Examples

Simple Ingestion Template

A basic template for standardized CSV ingestion with schema hints:

templates/csv_ingestion_template.yaml
 1name: csv_ingestion_template
 2version: "1.0"
 3description: "Standard template for ingesting CSV files with schema enforcement"
 4
 5presets:
 6  - bronze_layer
 7
 8parameters:
 9  - name: table_name
10    type: string
11    required: true
12    description: "Name of the table to ingest"
13  - name: landing_folder
14    type: string
15    required: true
16    description: "Name of the landing folder"
17  - name: table_properties
18    type: object
19    required: false
20    description: "Optional table properties as key-value pairs"
21    default: {}
22  - name: cluster_columns
23    type: array
24    required: false
25    description: "Optional Liquid clustering columns"
26    default: []
27
28actions:
29  - name: "load_{{ table_name }}_csv"
30    type: load
31    readMode: stream
32    operational_metadata:
33      - "_source_file_path"
34      - "_processing_timestamp"
35    source:
36      type: cloudfiles
37      path: "${landing_volume}/{{ landing_folder }}/*.csv"
38      format: csv
39      options:
40        cloudFiles.format: csv
41        header: true
42        delimiter: ","
43        cloudFiles.maxFilesPerTrigger: 50
44        cloudFiles.inferColumnTypes: false
45        cloudFiles.schemaEvolutionMode: addNewColumns
46        cloudFiles.rescuedDataColumn: _rescued_data
47        cloudFiles.schemaHints: "schemas/{{ table_name }}_schema.yaml"
48    target: "v_{{ table_name }}_cloudfiles"
49    description: "Load {{ table_name }} CSV files from landing volume"
50
51  - name: "write_{{ table_name }}_bronze"
52    type: write
53    source: "v_{{ table_name }}_cloudfiles"
54    write_target:
55      type: streaming_table
56      database: "${catalog}.${bronze_schema}"
57      table: "{{ table_name }}"
58      cluster_columns: "{{ cluster_columns }}"
59      table_properties: "{{ table_properties }}"
60    description: "Write {{ table_name }} to bronze layer"

Using the CSV Ingestion Template

pipelines/ingestion/customer_ingestion.yaml
 1pipeline: raw_ingestions
 2flowgroup: customer_ingestion
 3
 4use_template: csv_ingestion_template
 5template_parameters:
 6  table_name: customer
 7  landing_folder: customer_data
 8  cluster_columns:
 9    - "customer_id"
10    - "region"
11  table_properties:
12    delta.autoOptimize.optimizeWrite: true
13    custom.business.domain: "customer_data"

The above template usage generates this Python code:

Generated customer_ingestion.py
 1# Generated by LakehousePlumber
 2# Pipeline: raw_ingestions
 3# FlowGroup: customer_ingestion
 4
 5from pyspark.sql import functions as F
 6from pyspark import pipelines as dp
 7
 8# Schema hints for customer_cloudfiles table
 9customer_cloudfiles_schema_hints = """
10    customer_id BIGINT,
11    name STRING,
12    email STRING,
13    region STRING,
14    registration_date DATE
15""".strip().replace("\n", " ")
16
17@dp.temporary_view()
18def v_customer_cloudfiles():
19    """Load customer CSV files from landing volume"""
20    df = spark.readStream \
21        .format("cloudFiles") \
22        .option("cloudFiles.format", "csv") \
23        .option("header", True) \
24        .option("delimiter", ",") \
25        .option("cloudFiles.maxFilesPerTrigger", 50) \
26        .option("cloudFiles.inferColumnTypes", False) \
27        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
28        .option("cloudFiles.rescuedDataColumn", "_rescued_data") \
29        .option("cloudFiles.schemaHints", customer_cloudfiles_schema_hints) \
30        .load("/Volumes/dev/raw/landing_volume/customer_data/*.csv")
31
32    # Add operational metadata columns
33    df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
34    df = df.withColumn('_processing_timestamp', F.current_timestamp())
35
36    return df
37
38# Create the streaming table
39dp.create_streaming_table(
40    name="dev_catalog.bronze.customer",
41    comment="Write customer to bronze layer",
42    table_properties={
43        "delta.autoOptimize.optimizeWrite": True,
44        "custom.business.domain": "customer_data"
45    },
46    cluster_by=["customer_id", "region"]
47)
48
49@dp.append_flow(
50    target="dev_catalog.bronze.customer",
51    name="f_customer_bronze"
52)
53def f_customer_bronze():
54    """Write customer to bronze layer"""
55    df = spark.readStream.table("v_customer_cloudfiles")
56    return df

Multi-Format Ingestion Template

A more advanced template supporting multiple file formats with format-specific configurations:

templates/multi_format_ingestion_template.yaml
 1name: multi_format_ingestion_template
 2version: "2.0"
 3description: "Advanced template supporting multiple file formats with custom configurations"
 4
 5parameters:
 6  - name: table_name
 7    type: string
 8    required: true
 9    description: "Name of the target table"
10
11  - name: file_format
12    type: string
13    required: true
14    description: "File format: csv, json, parquet, avro"
15
16  - name: source_path
17    type: string
18    required: true
19    description: "Source data path pattern"
20
21  - name: format_options
22    type: object
23    required: false
24    default: {}
25    description: "Format-specific reader options"
26
27  - name: cloudfiles_options
28    type: object
29    required: false
30    default: {}
31    description: "CloudFiles-specific options"
32
33  - name: enable_dqe
34    type: boolean
35    required: false
36    default: false
37    description: "Enable data quality expectations"
38
39  - name: expectation_file
40    type: string
41    required: false
42    description: "Path to data quality expectations file"
43
44  - name: partition_columns
45    type: array
46    required: false
47    default: []
48    description: "Columns to partition the target table by"
49
50actions:
51  - name: "load_{{ table_name }}_{{ file_format }}"
52    type: load
53    readMode: stream
54    operational_metadata:
55      - "_source_file_path"
56      - "_source_file_modification_time"
57      - "_processing_timestamp"
58    source:
59      type: cloudfiles
60      path: "{{ source_path }}"
61      format: "{{ file_format }}"
62      format_options: "{{ format_options }}"
63      options: "{{ cloudfiles_options }}"
64    target: "v_{{ table_name }}_raw"
65    description: "Load {{ table_name }} {{ file_format }} files from {{ source_path }}"
66
67  - name: "validate_{{ table_name }}_quality"
68    type: transform
69    transform_type: data_quality
70    source: "v_{{ table_name }}_raw"
71    target: "v_{{ table_name }}_validated"
72    readMode: stream
73    expectations_file: "{{ expectation_file }}"
74    description: "Apply data quality validations to {{ table_name }}"
75    # This action only gets generated if enable_dqe is true
76
77  - name: "write_{{ table_name }}_bronze"
78    type: write
79    source: "{% if enable_dqe %}v_{{ table_name }}_validated{% else %}v_{{ table_name }}_raw{% endif %}"
80    write_target:
81      type: streaming_table
82      database: "${catalog}.${bronze_schema}"
83      table: "{{ table_name }}"
84      partition_columns: "{{ partition_columns }}"
85      table_properties:
86        delta.enableChangeDataFeed: true
87        delta.autoOptimize.optimizeWrite: true
88        source.format: "{{ file_format }}"
89        source.path: "{{ source_path }}"
90    description: "Write {{ table_name }} to bronze streaming table"

Using the Multi-Format Template for JSON data:

pipelines/ingestion/events_ingestion.yaml
 1pipeline: event_ingestion
 2flowgroup: user_events
 3
 4use_template: multi_format_ingestion_template
 5template_parameters:
 6  table_name: user_events
 7  file_format: json
 8  source_path: "/Volumes/prod/landing/events/user_events/*.json"
 9  format_options:
10    multiline: true
11    allowComments: false
12    timestampFormat: "yyyy-MM-dd HH:mm:ss"
13  cloudfiles_options:
14    cloudFiles.maxFilesPerTrigger: 100
15    cloudFiles.schemaEvolutionMode: addNewColumns
16    cloudFiles.rescuedDataColumn: "_rescued_data"
17  enable_dqe: true
18  expectation_file: "expectations/user_events_quality.json"
19  partition_columns:
20    - "event_date"
21    - "event_type"

Using the Multi-Format Template for Parquet data:

pipelines/ingestion/sales_ingestion.yaml
 1pipeline: sales_ingestion
 2flowgroup: sales_transactions
 3
 4use_template: multi_format_ingestion_template
 5template_parameters:
 6  table_name: sales_transactions
 7  file_format: parquet
 8  source_path: "/Volumes/prod/landing/sales/*.parquet"
 9  cloudfiles_options:
10    cloudFiles.maxFilesPerTrigger: 200
11    cloudFiles.schemaEvolutionMode: rescue
12  enable_dqe: false
13  partition_columns:
14    - "transaction_date"
15    - "store_region"

CDC Template with SCD Type 2

A template for implementing Change Data Capture with Slowly Changing Dimensions:

templates/scd_type2_template.yaml
 1name: scd_type2_template
 2version: "1.0"
 3description: "Template for SCD Type 2 implementation with CDC"
 4
 5parameters:
 6  - name: table_name
 7    type: string
 8    required: true
 9    description: "Name of the dimension table"
10
11  - name: source_table
12    type: string
13    required: true
14    description: "Source table for CDC changes"
15
16  - name: primary_keys
17    type: array
18    required: true
19    description: "Primary key columns for the dimension"
20
21  - name: track_history_column_list
22    type: array
23    required: false
24    default: []
25    description: "Columns to track history for (empty = all columns)"
26
27  - name: sequence_column
28    type: string
29    required: true
30    description: "Column to determine order of changes"
31
32  - name: ignore_null_updates
33    type: boolean
34    required: false
35    default: true
36    description: "Ignore updates where all tracked columns are null"
37
38actions:
39  - name: "load_{{ table_name }}_changes"
40    type: load
41    readMode: stream
42    source:
43      type: delta
44      database: "${catalog}.${bronze_schema}"
45      table: "{{ source_table }}"
46      options:
47        readChangeFeed: "true"
48    target: "v_{{ table_name }}_changes"
49    description: "Load change data from {{ source_table }}"
50
51  - name: "write_{{ table_name }}_dimension"
52    type: write
53    source: "v_{{ table_name }}_changes"
54    write_target:
55      type: streaming_table
56      database: "${catalog}.${silver_schema}"
57      table: "dim_{{ table_name }}"
58      mode: cdc
59      cdc_config:
60        keys: "{{ primary_keys }}"
61        sequence_by: "{{ sequence_column }}"
62        scd_type: 2
63        track_history_column_list: "{{ track_history_column_list }}"
64        ignore_null_updates: "{{ ignore_null_updates }}"
65      table_properties:
66        delta.enableChangeDataFeed: true
67        table.type: "dimension"
68        scd.type: "2"
69    description: "Create SCD Type 2 dimension for {{ table_name }}"

Using the SCD Type 2 Template:

pipelines/dimensions/customer_dimension.yaml
 1pipeline: silver_dimensions
 2flowgroup: customer_dimension
 3
 4use_template: scd_type2_template
 5template_parameters:
 6  table_name: customer
 7  source_table: customer_bronze
 8      primary_keys:
 9   - "customer_id"
10 track_history_column_list:
11   - "name"
12   - "address"
13   - "phone"
14   - "email"
15   - "market_segment"
16  sequence_column: "_commit_timestamp"
17  ignore_null_updates: true

Environment and Secret Substitutions

In addition to template parameters, both template definitions and flowgroup YAML files support environment-specific substitutions and secret references. These use different syntax than template parameters and are resolved at generation time.

Substitution Types

Templates interact with four substitution syntaxes: local variables (%{var}), environment tokens (${token}), secret references (${secret:scope/key}), and template parameters ({{ param }}). Each is resolved at a different stage of the processing pipeline.

See also

For the complete substitution reference — syntax details, processing order, file substitution support, and examples — see Substitutions & Secrets.

Local Variables

Local variables allow you to define reusable values within a single flowgroup, reducing repetition and improving maintainability. They are scoped to the flowgroup and resolved before templates, presets, and environment substitution.

Syntax: %{variable_name}

Definition: Add a variables section to your flowgroup YAML:

pipelines/bronze/customer_pipeline.yaml
 1pipeline: acme_bronze
 2flowgroup: customer_pipeline
 3
 4# Define local variables
 5variables:
 6  entity: customer
 7  source_table: customer_raw
 8  target_table: customer
 9
10actions:
11  # Use variables throughout the flowgroup
12  - name: "load_%{entity}_raw"
13    type: load
14    source:
15      type: delta
16      database: "${catalog}.${raw_schema}"  # Environment tokens still work!
17      table: "%{source_table}"
18    target: "v_%{entity}_raw"
19    description: "Load %{entity} table from raw schema"
20
21  - name: "%{entity}_cleanse"
22    type: transform
23    transform_type: sql
24    source: "v_%{entity}_raw"
25    target: "v_%{entity}_cleaned"
26    sql_path: "sql/brz/%{entity}_cleanse.sql"
27
28  - name: "write_%{entity}_bronze"
29    type: write
30    source: "v_%{entity}_cleaned"
31    write_target:
32      type: streaming_table
33      database: "${catalog}.${bronze_schema}"
34      table: "%{target_table}"

Key Features:

  • Inline Substitution: Supports prefix_%{var}_suffix patterns

  • Recursive Variables: Variables can reference other variables

  • Strict Validation: Undefined variables cause immediate errors

  • Flowgroup-Scoped: Variables are NOT shared across flowgroups

Benefits:

Benefit

Description

Single Source of Truth

Change “customer” to “order” in one place

Consistency

All action names follow the same pattern

Readability

Clear intent with meaningful variable names

Maintainability

Easy to refactor or convert to templates

Example - Before and After:

Before (repetitive)
actions:
  - name: "load_customer_raw"
    target: "v_customer_raw"
  - name: "customer_cleanse"
    source: "v_customer_raw"
    target: "v_customer_cleaned"
  - name: "write_customer_bronze"
    source: "v_customer_cleaned"
After (with local variables)
variables:
  entity: customer

actions:
  - name: "load_%{entity}_raw"
    target: "v_%{entity}_raw"
  - name: "%{entity}_cleanse"
    source: "v_%{entity}_raw"
    target: "v_%{entity}_cleaned"
  - name: "write_%{entity}_bronze"
    source: "v_%{entity}_cleaned"

Using Substitutions in Templates

Templates can include environment and secret substitutions alongside template parameters:

templates/secure_jdbc_template.yaml
 1name: secure_jdbc_template
 2version: "1.0"
 3description: "Template for secure JDBC ingestion with environment and secret support"
 4
 5parameters:
 6  - name: table_name
 7    type: string
 8    required: true
 9    description: "Name of the source table"
10
11  - name: query_filter
12    type: string
13    required: false
14    description: "Optional WHERE clause filter"
15
16actions:
17  - name: "load_{{ table_name }}_from_database"
18    type: load
19    readMode: batch
20    source:
21      type: jdbc
22      # Environment substitution - resolved from substitutions/{env}.yaml
23      url: "${jdbc_url}"
24      driver: "${jdbc_driver}"
25      # Secret substitutions - resolved to dbutils.secrets.get() calls
26      user: "${secret:database_secrets/username}"
27      password: "${secret:database_secrets/password}"
28      # Template parameter - resolved from template_parameters
29      query: |
30        SELECT * FROM {{ table_name }}
31        {% if query_filter %}WHERE {{ query_filter }}{% endif %}
32    target: "v_{{ table_name }}_raw"
33    description: "Load {{ table_name }} from external database"
34
35  - name: "write_{{ table_name }}_bronze"
36    type: write
37    source: "v_{{ table_name }}_raw"
38    write_target:
39      type: streaming_table
40      # Environment substitutions for database targeting
41      database: "${catalog}.${bronze_schema}"
42      table: "{{ table_name }}"
43      table_properties:
44        # Mixed substitutions and template parameters
45        source.database: "${source_database}"
46        source.table: "{{ table_name }}"
47        ingestion.environment: "${environment}"
48    description: "Write {{ table_name }} to bronze layer"

Example substitutions/dev.yaml:

substitutions/dev.yaml
 1dev:
 2  catalog: "dev_catalog"
 3  bronze_schema: "bronze"
 4  environment: "development"
 5  source_database: "external_prod_db"
 6  jdbc_url: "jdbc:postgresql://dev-db.company.com:5432/analytics"
 7  jdbc_driver: "org.postgresql.Driver"
 8
 9secrets:
10  default_scope: "dev_secrets"
11  scopes:
12    database_secrets: "dev_database_secrets"

Using the template in a flowgroup:

pipelines/external_ingestion/customers_from_postgres.yaml
1pipeline: external_ingestion
2flowgroup: customer_data_load
3
4use_template: secure_jdbc_template
5template_parameters:
6  table_name: customers
7  query_filter: "status = 'active' AND created_date >= CURRENT_DATE - INTERVAL '30 days'"

Generated Python code shows all three substitution types resolved:

Generated customer_data_load.py
 1@dp.temporary_view()
 2def v_customers_raw():
 3    """Load customers from external database"""
 4    df = spark.read \
 5        .format("jdbc") \
 6        .option("url", "jdbc:postgresql://dev-db.company.com:5432/analytics") \
 7        .option("driver", "org.postgresql.Driver") \
 8        .option("user", dbutils.secrets.get(scope="dev_database_secrets", key="username")) \
 9        .option("password", dbutils.secrets.get(scope="dev_database_secrets", key="password")) \
10        .option("query", """
11            SELECT * FROM customers
12            WHERE status = 'active' AND created_date >= CURRENT_DATE - INTERVAL '30 days'
13        """) \
14        .load()
15    return df
16
17# Create the streaming table
18dp.create_streaming_table(
19    name="dev_catalog.bronze.customers",
20    comment="Write customers to bronze layer",
21    table_properties={
22        "source.database": "external_prod_db",
23        "source.table": "customers",
24        "ingestion.environment": "development"
25    }
26)
27
28@dp.append_flow(target="dev_catalog.bronze.customers", name="f_customers_bronze")
29def f_customers_bronze():
30    """Write customers to bronze layer"""
31    return spark.readStream.table("v_customers_raw")

Using Substitutions in FlowGroups

FlowGroups can also use environment and secret substitutions directly without templates:

pipelines/direct_ingestion/events_load.yaml
 1pipeline: event_ingestion
 2flowgroup: user_events_direct
 3
 4actions:
 5  - name: load_events_from_api
 6    type: load
 7    readMode: batch
 8    source:
 9      type: python
10      module_path: "extractors/events_api.py"
11      function_name: "fetch_events"
12      parameters:
13        # Environment substitution
14        api_endpoint: "${events_api_endpoint}"
15        # Secret substitution
16        api_key: "${secret:api_secrets/events_api_key}"
17        # Direct value
18        batch_size: 1000
19    target: v_events_raw
20    description: "Load events from external API"
21
22  - name: write_events_bronze
23    type: write
24    source: v_events_raw
25    write_target:
26      type: streaming_table
27      # Environment substitutions
28      database: "${catalog}.${bronze_schema}"
29      table: user_events
30      table_properties:
31        # Mix of environment substitutions and direct values
32        source.api: "${events_api_endpoint}"
33        ingestion.frequency: "hourly"
34        environment: "${environment}"
35    description: "Write events to bronze layer"

Multi-Environment Examples

The same template or flowgroup works across environments by changing substitution files:

Development Environment:

substitutions/dev.yaml
 1dev:
 2  catalog: "dev_catalog"
 3  bronze_schema: "bronze_dev"
 4  events_api_endpoint: "https://dev-api.company.com/events"
 5  environment: "development"
 6
 7secrets:
 8  default_scope: "dev_secrets"
 9  scopes:
10    api_secrets: "dev_api_secrets"
11    database_secrets: "dev_db_secrets"

Production Environment:

substitutions/prod.yaml
 1prod:
 2  catalog: "prod_catalog"
 3  bronze_schema: "bronze"
 4  events_api_endpoint: "https://api.company.com/events"
 5  environment: "production"
 6
 7secrets:
 8  default_scope: "prod_secrets"
 9  scopes:
10    api_secrets: "prod_api_secrets"
11    database_secrets: "prod_db_secrets"

Same template generates different configurations:

# Development deployment
lhp generate --env dev
# Uses dev_catalog.bronze_dev, dev API endpoint, dev secrets

# Production deployment
lhp generate --env prod
# Uses prod_catalog.bronze, prod API endpoint, prod secrets

Advanced Substitution Patterns

Conditional Secret Usage

Templates can conditionally use secrets based on environment:

Template with conditional secrets
 1actions:
 2  - name: "load_{{ table_name }}_data"
 3    type: load
 4    source:
 5      type: cloudfiles
 6      path: "${data_path}/{{ table_name }}/*.parquet"
 7      {% if environment == "prod" %}
 8      # Only use encryption in production
 9      reader_options:
10        spark.sql.parquet.encryption.kms.client.class: "org.apache.parquet.crypto.keytools.KmsClient"
11        spark.sql.parquet.encryption.key.retrieval.kms.instance.id: "${secret:encryption_secrets/kms_instance}"
12      {% endif %}

Dynamic Database Targeting

Use substitutions for flexible database targeting:

Environment-aware database targeting
1write_target:
2  type: streaming_table
3  # Dynamic catalog and schema based on environment and data classification
4  database: "${catalog}.${bronze_schema}_${data_classification}"
5  table: "{{ table_name }}"
6  table_properties:
7    data.classification: "${data_classification}"
8    governance.retention: "${retention_policy}"

Secret Scope Aliases

Use scope aliases for flexible secret management:

substitutions/staging.yaml
 1staging:
 2  catalog: "staging_catalog"
 3  bronze_schema: "bronze_staging"
 4
 5secrets:
 6  default_scope: "staging_secrets"
 7  scopes:
 8    # Alias mapping for different secret scope organization
 9    external_apis: "staging_external_secrets"
10    databases: "staging_rds_secrets"
11    storage: "staging_azure_secrets"
Template using scope aliases
source:
  type: jdbc
  url: "${jdbc_url}"
  # Uses mapped scope from substitutions
  user: "${secret:databases/readonly_user}"
  password: "${secret:databases/readonly_password}"

Best Practices for Substitutions

When to Use Each Type:

Substitution Type

Use Case

Example

Template Parameters {{ }}

Values that change per template usage within the same environment

{{ table_name }}, {{ file_format }}

Environment ${token}

Values that change between dev/staging/prod but stay consistent within an environment

${catalog}, ${bronze_schema}

Secret References ${secret:}

Sensitive data like passwords, API keys, connection strings

${secret:db/password}, ${secret:apis/key}

Security Guidelines:

Warning

Never put secrets in template parameters or direct values:

❌ NEVER do this
template_parameters:
  api_key: "sk-1234567890abcdef"  # ❌ Exposed in YAML
  password: "mypassword"          # ❌ Stored in plain text
✅ Always use secret substitutions
source:
  user: "${secret:database_secrets/username}"     # ✅ Secure
  password: "${secret:database_secrets/password}" # ✅ Secure

Organization Tips:

  1. Group related substitutions in your environment files

  2. Use consistent naming across environments (dev/staging/prod)

  3. Document secret scope mappings in your substitution files

  4. Validate secret references using lhp validate --env {env}

See also

Template Expressions

Template expressions use Jinja2-style {{ }} syntax for parameter substitution and support advanced templating features:

Basic Substitution

Simple parameter replacement:

# Template parameter
parameters:
  - name: table_name
    type: string
    required: true

# Template usage
actions:
  - name: "process_{{ table_name }}_data"
    target: "v_{{ table_name }}_processed"
    source:
      path: "/data/{{ table_name }}/*.parquet"

Conditional Logic

Use conditional expressions for dynamic action generation:

# Template with conditional logic
actions:
  - name: "load_{{ table_name }}_data"
    type: load
    source:
      type: cloudfiles
      path: "{{ data_path }}"
      {% if file_format == "csv" %}
      options:
        header: true
        delimiter: ","
      {% elif file_format == "json" %}
      options:
        multiline: true
      {% endif %}
    target: "v_{{ table_name }}_raw"

Note: Complex conditional logic should be used sparingly. Consider creating separate templates for significantly different patterns.

String Operations

Jinja2 filters for string manipulation:

# Template with string operations
actions:
  - name: "{{ table_name | lower }}_processing"
    target: "v_{{ table_name | upper }}_CLEANED"
    description: "Process {{ table_name | title }} data from {{ source_path | basename }}"

Natural YAML Syntax

Templates support natural YAML syntax for complex parameters, eliminating the need for JSON strings:

Object Parameters

Traditional approach (JSON strings):

❌ Old way - JSON strings (avoid this)
template_parameters:
  table_properties: '{"delta.enableChangeDataFeed": "true", "delta.autoOptimize.optimizeWrite": "true"}'

Natural YAML approach:

✅ New way - Natural YAML objects
template_parameters:
  table_properties:
    delta.enableChangeDataFeed: true
    delta.autoOptimize.optimizeWrite: true
    delta.autoOptimize.autoCompact: true
    custom.business.domain: "customer_data"

Array Parameters

Traditional approach (JSON strings):

❌ Old way - JSON strings (avoid this)
template_parameters:
  partition_columns: '["year", "month", "region"]'

Natural YAML approach:

✅ New way - Natural YAML arrays
template_parameters:
  partition_columns:
    - "year"
    - "month"
    - "region"

Mixed Complex Parameters

Natural YAML syntax enables readable complex configurations:

Complex template parameters with natural YAML
 1use_template: advanced_data_platform_template
 2template_parameters:
 3  table_name: customer_360
 4
 5  # Natural YAML array
 6  partition_columns:
 7    - "year"
 8    - "month"
 9    - "region"
10
11  # Natural YAML object
12  table_properties:
13    delta.enableChangeDataFeed: true
14    delta.autoOptimize.optimizeWrite: true
15    delta.autoOptimize.autoCompact: true
16    delta.deletedFileRetentionDuration: "interval 30 days"
17    custom.business.owner: "customer_analytics_team"
18    custom.data.classification: "sensitive"
19    custom.refresh.frequency: "daily"
20
21  # Natural YAML object with nested structure
22  cloudfiles_options:
23    cloudFiles.maxFilesPerTrigger: 100
24    cloudFiles.schemaEvolutionMode: addNewColumns
25    cloudFiles.rescuedDataColumn: "_rescued_data"
26    cloudFiles.inferColumnTypes: false
27
28  # Natural YAML array of objects
29  operational_metadata:
30    - "_source_file_path"
31    - "_processing_timestamp"
32    - "_record_hash"
33
34  # Simple boolean
35  enable_data_quality: true
36
37  # Simple number
38  max_files_per_trigger: 250

Best Practices

Template Design Principles

Single Responsibility

Each template should solve one specific pattern or use case. Avoid overly generic templates that try to handle every scenario.

Clear Parameter Naming

Use descriptive parameter names that clearly indicate their purpose and expected values.

Sensible Defaults

Provide reasonable default values for optional parameters to minimize required configuration.

Documentation

Include comprehensive descriptions for the template and all parameters.

Parameter Validation

Use Strong Typing

✅ Good parameter definitions
parameters:
  - name: file_format
    type: string
    required: true
    description: "File format: csv, json, parquet, avro, orc"

  - name: max_files_per_trigger
    type: number
    required: false
    default: 1000
    description: "Maximum files to process per trigger (1-10000)"

  - name: partition_columns
    type: array
    required: false
    default: []
    description: "Table partitioning columns (recommended: 2-4 columns max)"

Provide Examples

Parameter documentation with examples
parameters:
  - name: cdc_config
    type: object
    required: false
    default: {}
    description: |
      CDC configuration for change data capture.
      Example:
        keys: ["customer_id"]
        sequence_by: "_commit_timestamp"
        scd_type: 2

Template Organization

File Structure

templates/
├── ingestion/
│   ├── csv_ingestion_template.yaml
│   ├── json_ingestion_template.yaml
│   └── multi_format_template.yaml
├── transformation/
│   ├── bronze_to_silver_template.yaml
│   └── data_quality_template.yaml
├── dimension/
│   ├── scd_type1_template.yaml
│   └── scd_type2_template.yaml
└── analytics/
    ├── materialized_view_template.yaml
    └── aggregation_template.yaml
Naming Conventions
  • Use descriptive names that indicate the template’s purpose

  • Include the layer or function in the name (e.g., bronze_ingestion_template)

  • Add version numbers for breaking changes (e.g., csv_ingestion_template_v2.yaml)

Error Handling

Parameter Validation

Templates should validate critical parameters and provide clear error messages:

parameters:
  - name: primary_keys
    type: array
    required: true
    description: "Primary key columns (at least one column required)"

Defensive Defaults

Use safe defaults that won’t cause runtime errors:

parameters:
  - name: cloudfiles_options
    type: object
    required: false
    default:
      cloudFiles.maxFilesPerTrigger: 1000
      cloudFiles.schemaEvolutionMode: addNewColumns
    description: "CloudFiles options with safe defaults"

Integration with Presets

Templates and presets work together to provide maximum reusability:

Template with Preset

templates/bronze_ingestion_template.yaml
 1name: bronze_ingestion_template
 2version: "1.0"
 3description: "Bronze layer ingestion with standard configurations"
 4
 5presets:
 6  - bronze_layer_defaults  # Applies to all generated actions
 7
 8parameters:
 9  - name: table_name
10    type: string
11    required: true
12
13actions:
14  # Preset values are automatically applied to these actions
15  - name: "load_{{ table_name }}"
16    type: load
17    # ... action configuration

Preset Definition

presets/bronze_layer_defaults.yaml
 1name: bronze_layer_defaults
 2version: "1.0"
 3description: "Standard defaults for bronze layer operations"
 4
 5defaults:
 6  operational_metadata:
 7    - "_processing_timestamp"
 8    - "_source_file_path"
 9
10  write_target:
11    table_properties:
12      delta.enableChangeDataFeed: true
13      delta.autoOptimize.optimizeWrite: true
14      quality: bronze

Combination Result

When the template is used, actions automatically inherit both template parameters and preset defaults, providing consistent configuration across your platform.

Troubleshooting Templates

Common Issues

Parameter Type Mismatches

Error: Expected array for parameter 'partition_columns', got string

Solution: Ensure parameter types match template expectations:

✅ Correct usage
template_parameters:
  partition_columns:  # Array type
    - "year"
    - "month"
❌ Incorrect usage
template_parameters:
  partition_columns: "year,month"  # String type

Missing Required Parameters

Error: Required parameter 'table_name' not provided

Solution: Check template parameter definitions and provide all required parameters.

Template Not Found

Error: Template 'my_template' not found

Solution: Verify template file exists in templates/ directory and has correct name.

Debugging Template Rendering

Use Dry Run Mode

# Preview generated actions without creating files
lhp generate --env dev --dry-run --verbose

Check Template Syntax

# Validate template files
lhp validate --env dev --templates-only

Inspect Generated Actions

Enable verbose logging to see parameter substitution details:

lhp generate --env dev --verbose

See also