Pipeline Patterns¶
Reusable patterns for common data engineering scenarios encountered in production Lakehouse Plumber projects. Each pattern includes the YAML configuration, generated Python output, and relevant caveats.
Multi-Source Ingestion (Fan-In)¶
Consolidate data from multiple sources into a single streaming table — for example, the same log schema arriving from S3 buckets in different regions or accounts.
LHP natively supports multiple write actions targeting the same streaming table. It
generates a single dp.create_streaming_table() call with multiple
@dp.append_flow() functions — one per source. Each append flow gets its own
independent checkpoint, so if one source has issues the others continue processing
normally.
The key is the create_table flag: the first write action creates the table
(create_table: true, the default), and subsequent writes append to it
(create_table: false).
Configuration¶
1pipeline: raw_ingestions
2flowgroup: logs_multi_region
3
4actions:
5 # Load from US bucket
6 - name: load_logs_us
7 type: load
8 readMode: stream
9 source:
10 type: cloudfiles
11 path: "s3://my-bucket-us-east-1/logs/*.parquet"
12 format: parquet
13 options:
14 cloudFiles.maxFilesPerTrigger: 100
15 target: v_logs_us
16
17 # Load from EU bucket
18 - name: load_logs_eu
19 type: load
20 readMode: stream
21 source:
22 type: cloudfiles
23 path: "s3://my-bucket-eu-west-1/logs/*.parquet"
24 format: parquet
25 options:
26 cloudFiles.maxFilesPerTrigger: 100
27 target: v_logs_eu
28
29 # First write — creates the table
30 - name: write_logs_us
31 type: write
32 source: v_logs_us
33 write_target:
34 type: streaming_table
35 database: "${catalog}.${bronze_schema}"
36 table: unified_logs
37 create_table: true
38 description: "Write US region logs to unified table"
39
40 # Second write — appends to the same table
41 - name: write_logs_eu
42 type: write
43 source: v_logs_eu
44 write_target:
45 type: streaming_table
46 database: "${catalog}.${bronze_schema}"
47 table: unified_logs
48 create_table: false
49 description: "Append EU region logs to unified table"
Generated Output¶
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_logs_us():
5 """Write US region logs to unified table"""
6 df = spark.readStream \
7 .format("cloudFiles") \
8 .option("cloudFiles.format", "parquet") \
9 .option("cloudFiles.maxFilesPerTrigger", 100) \
10 .load("s3://my-bucket-us-east-1/logs/*.parquet")
11 return df
12
13@dp.temporary_view()
14def v_logs_eu():
15 """Append EU region logs to unified table"""
16 df = spark.readStream \
17 .format("cloudFiles") \
18 .option("cloudFiles.format", "parquet") \
19 .option("cloudFiles.maxFilesPerTrigger", 100) \
20 .load("s3://my-bucket-eu-west-1/logs/*.parquet")
21 return df
22
23# Single table creation
24dp.create_streaming_table(
25 name="catalog.bronze.unified_logs",
26 comment="Write US region logs to unified table"
27)
28
29# One append flow per source
30@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_us")
31def f_unified_logs_us():
32 """Write US region logs to unified table"""
33 df = spark.readStream.table("v_logs_us")
34 return df
35
36@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_eu")
37def f_unified_logs_eu():
38 """Append EU region logs to unified table"""
39 df = spark.readStream.table("v_logs_eu")
40 return df
Important
Each streaming table must have exactly one action with create_table: true across
the entire pipeline. Additional actions targeting the same table must use
create_table: false.
Templatising Multi-Source Ingestion¶
When you have many sources following the same pattern, combine this with templates to eliminate boilerplate. Define a template for the load+write pair and invoke it per source, each targeting the same table.
See also
Write Actions for full
streaming_tablereference
CloudFiles Path Filtering¶
Three approaches to exclude specific paths, directories, or files from a CloudFiles Auto Loader pipeline — each useful in different scenarios.
Glob Patterns in the Load Path¶
LHP passes the source.path string directly to the generated .load("...") call
with no escaping or transformation. Any glob syntax that Spark and CloudFiles support
works as-is in your YAML.
Supported glob syntax (Databricks Auto Loader):
Syntax |
Meaning |
Example |
|---|---|---|
|
Match any sequence of characters |
|
|
Match any single character |
|
|
Match any character in set |
|
|
Match any character in range |
|
|
Match any character NOT in set |
|
|
Match any of the alternatives |
|
1actions:
2 - name: load_logs_excluding_day16
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
8 format: parquet
9 options:
10 cloudFiles.useStrictGlobber: "true"
11 target: v_logs_raw
You can also put the glob pattern in an environment substitution token if it varies by environment:
dev:
log_path_pattern: "s3://dev-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
source:
path: "${log_path_pattern}"
Warning
Caveats for complex glob patterns:
Character classes inside brace alternatives (like
{0[1-9],1[0-5]}) should work at the Hadoop GlobFilter level, but this specific combination is not shown in Databricks documentation. Test in a dev environment before relying on it in production.Use
[^x]not[!x]for negation — the Unix shell[!x]syntax is not supported by Spark’s globber.**(globstar) is not documented for Auto Loader — avoid it.Notification mode (
cloudFiles.useNotifications: "true") with complex globs is undocumented territory. Directory listing mode (the default) is the safe choice for glob-heavy paths.Consider adding
cloudFiles.useStrictGlobber: "true"(DBR 12.2+) for predictable, Spark-standard globbing behaviour. The default globber is more permissive — for example,*can cross directory boundaries.
Post-Load Filter with SQL Transform¶
CloudFiles natively exposes _metadata.file_path in the loaded DataFrame. Add a
SQL transform action between the load and write to filter out unwanted paths using
full SQL regex power.
1actions:
2 - name: load_all_files
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/"
8 format: parquet
9 target: v_raw_data
10
11 - name: filter_excluded_paths
12 type: transform
13 transform_type: sql
14 source: v_raw_data
15 target: v_filtered_data
16 sql: |
17 SELECT * FROM stream(v_raw_data)
18 WHERE NOT _metadata.file_path RLIKE '.*/exclude_[^/]+/.*'
19
20 - name: write_bronze
21 type: write
22 source: v_filtered_data
23 write_target:
24 type: streaming_table
25 database: "${catalog}.${bronze_schema}"
26 table: my_table
27 description: "Write filtered data to bronze layer"
Note
Auto Loader still reads all files before the filter is applied. This approach
works when the excluded files are readable but contain unwanted data. If excluded files
cannot be read at all (e.g. AccessDenied), use glob patterns or pathGlobFilter
instead.
pathGlobFilter Option¶
To filter by file name (not full path), use the pathGlobFilter reader option.
This is a Spark-native option that filters on the basename of each file after directory
listing.
1actions:
2 - name: load_parquet_only
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/"
8 format: parquet
9 options:
10 pathGlobFilter: "*.parquet"
11 target: v_raw_data
Important
pathGlobFilter filters on the filename only (basename), not the full path. The
.load() path is a prefix filter; pathGlobFilter is for suffix or name filtering.
Explicit Include List¶
Use brace expansion to explicitly list only the directories to include:
1actions:
2 - name: load_selected_sources
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/{sales,marketing,events}/*.parquet"
8 format: parquet
9 target: v_selected_data
This generates .load("s3://my-bucket/data/{sales,marketing,events}/*.parquet") and
Auto Loader will only scan those three directories.
For truly separate paths (different buckets or unrelated prefixes), use the
multi-source ingestion pattern above — multiple
load+write actions targeting the same table with create_table: false on the
additional writes.
Choosing the Right Approach¶
Scenario |
Approach |
Notes |
|---|---|---|
Exclude specific date partitions |
Glob patterns |
Use brace expansion to enumerate included segments |
Exclude paths matching a regex |
SQL transform |
Full regex power via |
Filter by file extension or name |
|
Set in |
Include specific named directories |
Brace expansion |
Simple and explicit; documented by Databricks |
Multiple separate buckets or prefixes |
Multiple append flows |
Independent checkpoints per source |
See also
Load Actions for full CloudFiles load reference
Write Actions for
streaming_tableand append flow detailsEnterprise Best Practices for enterprise ingestion patterns