Substitutions & Secrets¶
Summary¶
LakehousePlumber uses multiple substitution syntaxes, each resolved at a different stage of the generation pipeline. The table below shows all forms, their scope, and the processing order.
Syntax |
Name |
Scope |
Defined In |
|---|---|---|---|
|
Local variable |
Flowgroup |
|
|
Template parameter |
Template |
|
|
Environment token |
Global / per-environment |
|
|
Secret reference |
Global / per-environment |
|
Processing order:
%{var}— Local variables are resolved first, within the flowgroup{{ param }}— Template parameters are expanded via Jinja2${token}— Environment tokens are substituted from the env file${secret:scope/key}— Secret references are converted to securedbutils.secrets.get()calls
Each phase only processes its own syntax and passes all other forms through untouched,
so tokens from later phases can safely appear in earlier contexts (e.g., ${catalog}
inside a %{var} value).
Environment Configuration¶
Tokens wrapped in ${token} are replaced at generation time using files under
substitutions/<env>.yaml. This enables environment-specific configurations
while keeping pipeline definitions portable.
Example substitution file:
1# Environment-specific tokens
2dev:
3 catalog: dev_catalog
4 bronze_schema: bronze
5 silver_schema: silver
6 landing_path: /mnt/dev/landing
7 checkpoint_path: /mnt/dev/checkpoints
8
9# Secret configuration
10secrets:
11 default_scope: dev_secrets
12 scopes:
13 database_secrets: dev_db_secrets
14 storage_secrets: dev_azure_secrets
15 api_secrets: dev_external_apis
Local Variables¶
Local variables allow you to define reusable values within a single flowgroup, reducing repetition and improving maintainability. They are resolved before templates and environment substitutions.
Syntax: %{variable_name}
Key Features:
Flowgroup-scoped: Variables are only accessible within the flowgroup where they’re defined
Inline substitution: Supports patterns like
prefix_%{var}_suffixStrict validation: Undefined variables cause immediate errors with clear messages
Processed first: Resolved before templates, presets, and environment substitutions
Example:
1pipeline: acmi_edw_bronze
2flowgroup: customer_pipeline
3
4variables:
5 entity: customer
6 source_table: customer_raw
7 target_table: customer
8
9actions:
10 - name: "load_%{entity}_raw"
11 type: load
12 source:
13 type: delta
14 database: "${catalog}.${raw_schema}"
15 table: "%{source_table}"
16 target: "v_%{entity}_raw"
17
18 - name: "write_%{entity}_bronze"
19 type: write
20 source: "v_%{entity}_cleaned"
21 write_target:
22 type: streaming_table
23 database: "${catalog}.${bronze_schema}"
24 table: "%{target_table}"
See also
For complete details on local variables, see Templates Reference.
Secret Management¶
Secret references use the ${secret:scope/key} syntax and are converted to
secure dbutils.secrets.get() calls in generated Python code. LHP validates
scope aliases and collects every secret used by the pipeline, making security
reviews and approvals easier.
Secret reference formats:
${secret:scope_alias/key}- Uses specific scope alias (resolved to actual Databricks scope)${secret:key}- Uses default_scope if configured
Note
Scope aliases (like database_secrets) are mapped to actual Databricks secret scope
names (like dev_db_secrets) in the substitution file. This provides flexibility
to use different scope names across environments while keeping pipeline definitions portable.
File Substitution Support¶
Added in version Latest.
LakehousePlumber now supports substitutions in external files, providing the same environment-specific flexibility for Python functions and SQL files that you have in YAML configurations.
Supported File Types:
File Type |
Where Used |
|---|---|
Python Files |
|
SQL Files |
|
Example Python Function with Substitutions:
1from typing import Optional, Tuple
2from pyspark.sql import DataFrame
3
4catalog = "${catalog}"
5schema = "${bronze_schema}"
6
7def next_customer_snapshot(latest_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
8 if latest_version is None:
9 df = spark.sql(f"""
10 SELECT * FROM {catalog}.{schema}.customers
11 WHERE snapshot_id = 1
12 """)
13 return (df, 1)
14 return None
Example SQL File with Substitutions:
1SELECT
2 customer_id,
3 customer_name,
4 '${environment}' as source_env
5FROM ${catalog}.${bronze_schema}.customers
6WHERE created_date >= '${cutoff_date}'
Secret Support in Files:
Both Python and SQL files support secret substitutions with the same syntax as YAML:
# Environment token
api_endpoint = "${api_base_url}"
# Secret reference
api_key = "${secret:api_keys/service_key}"
db_password = "${secret:database/password}"
Processing Behavior:
Tokens and secrets are processed before the file content is used
Python files have substitutions applied before import management
SQL files have substitutions applied before query execution
Backward compatible - files without substitution variables work unchanged
Same syntax as YAML substitutions for consistency
Example pipeline with secrets:
1pipeline: customer_ingestion
2flowgroup: external_load
3
4actions:
5 - name: load_from_postgres
6 type: load
7 source:
8 type: jdbc
9 url: "jdbc:postgresql://${secret:database_secrets/host}:5432/customers"
10 user: "${secret:database_secrets/username}"
11 password: "${secret:database_secrets/password}"
12 driver: "org.postgresql.Driver"
13 table: "customers"
14 target: v_customers_raw
Generated Python code:
1@dp.temporary_view()
2def v_customers_raw():
3 """Load from external database"""
4 df = spark.read \
5 .format("jdbc") \
6 .option("url", f"jdbc:postgresql://{dbutils.secrets.get(scope='dev_db_secrets', key='host')}:5432/customers") \
7 .option("user", f"{dbutils.secrets.get(scope='dev_db_secrets', key='username')}") \
8 .option("password", f"{dbutils.secrets.get(scope='dev_db_secrets', key='password')}") \
9 .option("driver", "org.postgresql.Driver") \
10 .option("dbtable", "customers") \
11 .load()
12
13 return df
Substitution Syntax¶
LakehousePlumber supports multiple substitution syntaxes for different purposes:
Local Variables (Flowgroup-scoped): %{variable}
variables:
entity: customer
actions:
- name: "load_%{entity}_raw"
target: "v_%{entity}_raw"
Environment Substitution: ${token}
catalog: ${my_catalog}
table: ${catalog}.${schema}.customers
Secret References: ${secret:scope/key}
password: ${secret:database/db_password}
Template Parameters: {{ parameter }}
use_template: my_template
template_parameters:
table_name: customer
# In template: table: "{{ table_name }}"
Note
Syntax Distinction:
%{var}= Local variable (flowgroup-scoped)${token}= Environment substitution${secret:scope/key}= Secret reference{{ parameter }}= Template parameter (Jinja2)
Warning
Legacy syntax: The bare {token} form (without $) is still supported for
backward compatibility but is deprecated. In external Python files (transforms,
batch handlers, custom datasources, snapshot CDC functions, custom sinks), the
{token} pattern directly collides with Python f-string syntax — if a Python
runtime variable like {catalog} in f"SELECT * FROM {catalog}.{schema}.table"
matches a substitution token name, it will be silently replaced at generation time,
breaking your code. The ${token} syntax avoids this entirely because ${} is
not valid Python f-string syntax. Use ${token} in all new configurations.
Note
Processing Order:
Local variables (
%{var}) are resolved first within the flowgroupTemplate parameters (
{{ }}) are resolved when templates are appliedEnvironment substitutions (
${ }) are resolved at generation timeSecret references (
${secret:}) are converted todbutils.secrets.get()calls
Warning
Python Code Context: When using LHP substitution tokens inside external Python
files (batch handlers, Python transforms, custom datasources, snapshot CDC functions,
custom sinks), you must use ${} syntax. LHP applies substitution to these
files at generation time, and the legacy {token} pattern matches Python f-string
variables.
# ${catalog} is replaced by LHP at generation time
default_catalog = "${catalog}"
# {table} is a Python runtime variable — safe because it has no $ prefix
spark.sql(f"SELECT * FROM {default_catalog}.{table}")
def my_transform(df, spark, parameters):
catalog = parameters.get("catalog", "main")
# If 'catalog' is also a substitution token, LHP replaces {catalog}
# at generation time, breaking this f-string!
return spark.sql(f"SELECT * FROM {catalog}.{schema}.lookup")