Pipeline Patterns

Reusable patterns for common data engineering scenarios encountered in production Lakehouse Plumber projects. Each pattern includes the YAML configuration, generated Python output, and relevant caveats.

Multi-Source Ingestion (Fan-In)

Consolidate data from multiple sources into a single streaming table — for example, the same log schema arriving from S3 buckets in different regions or accounts.

LHP natively supports multiple write actions targeting the same streaming table. It generates a single dp.create_streaming_table() call with multiple @dp.append_flow() functions — one per source. Each append flow gets its own independent checkpoint, so if one source has issues the others continue processing normally.

The key is the create_table flag: the first write action creates the table (create_table: true, the default), and subsequent writes append to it (create_table: false).

Configuration

pipelines/bronze/logs_multi_region.yaml
 1pipeline: raw_ingestions
 2flowgroup: logs_multi_region
 3
 4actions:
 5  # Load from US bucket
 6  - name: load_logs_us
 7    type: load
 8    readMode: stream
 9    source:
10      type: cloudfiles
11      path: "s3://my-bucket-us-east-1/logs/*.parquet"
12      format: parquet
13      options:
14        cloudFiles.maxFilesPerTrigger: 100
15    target: v_logs_us
16
17  # Load from EU bucket
18  - name: load_logs_eu
19    type: load
20    readMode: stream
21    source:
22      type: cloudfiles
23      path: "s3://my-bucket-eu-west-1/logs/*.parquet"
24      format: parquet
25      options:
26        cloudFiles.maxFilesPerTrigger: 100
27    target: v_logs_eu
28
29  # First write — creates the table
30  - name: write_logs_us
31    type: write
32    source: v_logs_us
33    write_target:
34      type: streaming_table
35      database: "${catalog}.${bronze_schema}"
36      table: unified_logs
37      create_table: true
38    description: "Write US region logs to unified table"
39
40  # Second write — appends to the same table
41  - name: write_logs_eu
42    type: write
43    source: v_logs_eu
44    write_target:
45      type: streaming_table
46      database: "${catalog}.${bronze_schema}"
47      table: unified_logs
48      create_table: false
49    description: "Append EU region logs to unified table"

Generated Output

Generated logs_multi_region.py
 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_logs_us():
 5    """Write US region logs to unified table"""
 6    df = spark.readStream \
 7        .format("cloudFiles") \
 8        .option("cloudFiles.format", "parquet") \
 9        .option("cloudFiles.maxFilesPerTrigger", 100) \
10        .load("s3://my-bucket-us-east-1/logs/*.parquet")
11    return df
12
13@dp.temporary_view()
14def v_logs_eu():
15    """Append EU region logs to unified table"""
16    df = spark.readStream \
17        .format("cloudFiles") \
18        .option("cloudFiles.format", "parquet") \
19        .option("cloudFiles.maxFilesPerTrigger", 100) \
20        .load("s3://my-bucket-eu-west-1/logs/*.parquet")
21    return df
22
23# Single table creation
24dp.create_streaming_table(
25    name="catalog.bronze.unified_logs",
26    comment="Write US region logs to unified table"
27)
28
29# One append flow per source
30@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_us")
31def f_unified_logs_us():
32    """Write US region logs to unified table"""
33    df = spark.readStream.table("v_logs_us")
34    return df
35
36@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_eu")
37def f_unified_logs_eu():
38    """Append EU region logs to unified table"""
39    df = spark.readStream.table("v_logs_eu")
40    return df

Important

Each streaming table must have exactly one action with create_table: true across the entire pipeline. Additional actions targeting the same table must use create_table: false.

Templatising Multi-Source Ingestion

When you have many sources following the same pattern, combine this with templates to eliminate boilerplate. Define a template for the load+write pair and invoke it per source, each targeting the same table.

See also

CloudFiles Path Filtering

Three approaches to exclude specific paths, directories, or files from a CloudFiles Auto Loader pipeline — each useful in different scenarios.

Glob Patterns in the Load Path

LHP passes the source.path string directly to the generated .load("...") call with no escaping or transformation. Any glob syntax that Spark and CloudFiles support works as-is in your YAML.

Supported glob syntax (Databricks Auto Loader):

Syntax

Meaning

Example

*

Match any sequence of characters

/data/*.parquet

?

Match any single character

/data/file_?.csv

[abc]

Match any character in set

/data/[abc]*.csv

[a-z]

Match any character in range

/data/part_[0-9].csv

[^x]

Match any character NOT in set

/data/[^_]*.csv

{a,b,c}

Match any of the alternatives

/data/{sales,events}/

Exclude a specific day using brace expansion
 1actions:
 2  - name: load_logs_excluding_day16
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
 8      format: parquet
 9      options:
10        cloudFiles.useStrictGlobber: "true"
11    target: v_logs_raw

You can also put the glob pattern in an environment substitution token if it varies by environment:

substitutions/dev.yaml
dev:
  log_path_pattern: "s3://dev-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
flowgroup YAML
source:
  path: "${log_path_pattern}"

Warning

Caveats for complex glob patterns:

  • Character classes inside brace alternatives (like {0[1-9],1[0-5]}) should work at the Hadoop GlobFilter level, but this specific combination is not shown in Databricks documentation. Test in a dev environment before relying on it in production.

  • Use [^x] not [!x] for negation — the Unix shell [!x] syntax is not supported by Spark’s globber.

  • ** (globstar) is not documented for Auto Loader — avoid it.

  • Notification mode (cloudFiles.useNotifications: "true") with complex globs is undocumented territory. Directory listing mode (the default) is the safe choice for glob-heavy paths.

  • Consider adding cloudFiles.useStrictGlobber: "true" (DBR 12.2+) for predictable, Spark-standard globbing behaviour. The default globber is more permissive — for example, * can cross directory boundaries.

Post-Load Filter with SQL Transform

CloudFiles natively exposes _metadata.file_path in the loaded DataFrame. Add a SQL transform action between the load and write to filter out unwanted paths using full SQL regex power.

Filter excluded paths via SQL transform
 1actions:
 2  - name: load_all_files
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/"
 8      format: parquet
 9    target: v_raw_data
10
11  - name: filter_excluded_paths
12    type: transform
13    transform_type: sql
14    source: v_raw_data
15    target: v_filtered_data
16    sql: |
17      SELECT * FROM stream(v_raw_data)
18      WHERE NOT _metadata.file_path RLIKE '.*/exclude_[^/]+/.*'
19
20  - name: write_bronze
21    type: write
22    source: v_filtered_data
23    write_target:
24      type: streaming_table
25      database: "${catalog}.${bronze_schema}"
26      table: my_table
27    description: "Write filtered data to bronze layer"

Note

Auto Loader still reads all files before the filter is applied. This approach works when the excluded files are readable but contain unwanted data. If excluded files cannot be read at all (e.g. AccessDenied), use glob patterns or pathGlobFilter instead.

pathGlobFilter Option

To filter by file name (not full path), use the pathGlobFilter reader option. This is a Spark-native option that filters on the basename of each file after directory listing.

Filter by filename pattern
 1actions:
 2  - name: load_parquet_only
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/"
 8      format: parquet
 9      options:
10        pathGlobFilter: "*.parquet"
11    target: v_raw_data

Important

pathGlobFilter filters on the filename only (basename), not the full path. The .load() path is a prefix filter; pathGlobFilter is for suffix or name filtering.

Explicit Include List

Use brace expansion to explicitly list only the directories to include:

Include only specific directories
1actions:
2  - name: load_selected_sources
3    type: load
4    readMode: stream
5    source:
6      type: cloudfiles
7      path: "s3://my-bucket/data/{sales,marketing,events}/*.parquet"
8      format: parquet
9    target: v_selected_data

This generates .load("s3://my-bucket/data/{sales,marketing,events}/*.parquet") and Auto Loader will only scan those three directories.

For truly separate paths (different buckets or unrelated prefixes), use the multi-source ingestion pattern above — multiple load+write actions targeting the same table with create_table: false on the additional writes.

Choosing the Right Approach

Scenario

Approach

Notes

Exclude specific date partitions

Glob patterns

Use brace expansion to enumerate included segments

Exclude paths matching a regex

SQL transform

Full regex power via _metadata.file_path

Filter by file extension or name

pathGlobFilter

Set in source.options; filters basename only

Include specific named directories

Brace expansion

Simple and explicit; documented by Databricks

Multiple separate buckets or prefixes

Multiple append flows

Independent checkpoints per source

See also