Load Actions¶
Note
At this time the framework supports the following load sub-types.
Coming soon: It will support more sources and target types through plugins.
Sub-type |
Purpose & Source |
|---|---|
cloudfiles
|
Databricks Auto Loader (CloudFiles) – stream files from
object storage (CSV, JSON, Parquet, etc.).
|
delta
|
Read from an existing Delta table or Change Data Feed
(CDC).
|
sql
|
Execute an arbitrary SQL query and load the result as a
view.
|
jdbc
|
Ingest from an external RDBMS via JDBC with secret
handling.
|
python
|
Call custom Python code (path or inline), returning a
DataFrame.
|
custom_datasource(PySpark)
|
Configured under
source block with automatic importmanagement and registration.
|
cloudFiles¶
actions:
- name: load_csv_file_from_cloudfiles
type: load
readMode : "stream"
operational_metadata: ["_source_file_path","_source_file_size","_source_file_modification_time"]
source:
type: cloudfiles
path: "${landing_volume}/{{ landing_folder }}/*.csv"
format: csv
options:
cloudFiles.format: csv
header: True
delimiter: "|"
cloudFiles.maxFilesPerTrigger: 11
cloudFiles.inferColumnTypes: False
cloudFiles.schemaEvolutionMode: "addNewColumns"
cloudFiles.rescuedDataColumn: "_rescued_data"
cloudFiles.schemaHints: "schemas/{{ table_name }}_schema.yaml"
target: v_customer_cloudfiles
description: "Load customer CSV files from landing volume"
Anatomy of a cloudFiles load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: must be stream (CloudFiles only supports streaming mode). This translates to
spark.readStream.format("cloudFiles")operational_metadata: Add custom metadata columns
- source:
type: Use Databricks Auto Loader (CloudFiles)
path: File path pattern with substitution variables
format: Specify the file format as CSV, JSON, Parquet, etc.
schema: Path to a YAML schema file for full schema enforcement (see below)
- options:
cloudFiles.format: Explicitly set CloudFiles format to CSV
header: First row contains column headers
delimiter: Use pipe character as field separator
cloudFiles.maxFilesPerTrigger: Limit number of files processed per trigger
cloudFiles.schemaHints: Schema definition for Auto Loader (supports multiple formats - see below)
target: Name of the temporary view created
description: Optional documentation for the action
cloudFiles.schemaHints Format Options
The cloudFiles.schemaHints option supports three formats, automatically detected by the framework:
Option 1: Inline DDL String (for simple schemas)
cloudFiles.schemaHints: "customer_id BIGINT, name STRING, email STRING"
Option 2: External YAML File (recommended for complex schemas with metadata)
cloudFiles.schemaHints: "schemas/customer_schema.yaml"
Option 3: External DDL/SQL File (for pre-defined DDL statements)
cloudFiles.schemaHints: "schemas/customer_schema.ddl"
# or
cloudFiles.schemaHints: "schemas/customer_schema.sql"
Note
File Path Organization: Schema files can be organized in subdirectories relative to your project root:
Root level:
"customer_schema.yaml"Single directory:
"schemas/customer_schema.yaml"Nested subdirectories:
"schemas/bronze/dimensions/customer_schema.yaml"
The framework automatically detects whether the value is an inline DDL string or a file path based on common file indicators (.yaml, .yml, .ddl, .sql, or path separators).
YAML Schema Conversion: When using YAML schema files (Option 2), the nullable field is respected during conversion to DDL:
Columns with
nullable: falseare converted to includeNOT NULLconstraintColumns with
nullable: true(or omitted, default is true) are converted without constraints
Example: A YAML column defined as {name: c_custkey, type: BIGINT, nullable: false} will generate c_custkey BIGINT NOT NULL in the schema hints.
source.schema — Full Schema Enforcement
The source.schema field provides full schema enforcement by applying a StructType schema
on the DataStreamReader before .load(). This disables schema inference entirely, ensuring
the data conforms exactly to the specified schema.
When to use ``schema`` vs ``schemaHints``:
Use
schemawhen you want to enforce a complete, exact schema and disable inference.Use
schemaHintswhen you want to guide Auto Loader’s inference while still allowing it to discover additional columns.
actions:
- name: load_customer_with_schema
type: load
readMode: stream
source:
type: cloudfiles
path: "/data/customers/*.csv"
format: csv
schema: schemas/customer_schema.yaml
options:
cloudFiles.format: csv
target: v_customer_raw
description: "Load customer CSV with explicit schema enforcement"
The schema file uses the same YAML format as schemaHints files:
name: customer
version: "1.0"
columns:
- name: c_custkey
type: BIGINT
nullable: false
- name: c_name
type: STRING
nullable: true
This generates code with .schema() applied on the reader chain before .load():
customer_schema = StructType([
StructField("c_custkey", LongType(), False),
StructField("c_name", StringType(), True),
])
@dp.temporary_view()
def v_customer_raw():
df = spark.readStream \
.format("cloudFiles") \
.schema(customer_schema) \
.option("cloudFiles.format", "csv") \
.load("/data/customers/*.csv")
return df
Note
When source.schema is provided, cloudFiles.schemaEvolutionMode defaults to none
because inference is disabled. You cannot combine source.schema with cloudFiles.schemaHints
— they are mutually exclusive approaches.
See also
For full list of options see the Databricks Auto Loader documentation.
Operational metadata: Operational Metadata
Important
Lakehouse Plumber uses syntax consistent with Databricks, making it easy to transfer knowledge between the two. All options available here mirror those of Databricks Auto Loader.
The above Yaml translates to the following Pyspark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import F
3
4customer_cloudfiles_schema_hints = """
5 c_custkey BIGINT NOT NULL,
6 c_name STRING NOT NULL,
7 c_address STRING,
8 c_nationkey BIGINT NOT NULL,
9 c_phone STRING,
10 c_acctbal DECIMAL(18,2),
11 c_mktsegment STRING,
12 c_comment STRING
13""".strip().replace("\n", " ")
14
15
16@dp.temporary_view()
17def v_customer_cloudfiles():
18 """Load customer CSV files from landing volume"""
19 df = spark.readStream \
20 .format("cloudFiles") \
21 .option("cloudFiles.format", "csv") \
22 .option("header", True) \
23 .option("delimiter", "|") \
24 .option("cloudFiles.maxFilesPerTrigger", 11) \
25 .option("cloudFiles.inferColumnTypes", False) \
26 .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
27 .option("cloudFiles.rescuedDataColumn", "_rescued_data") \
28 .option("cloudFiles.schemaHints", customer_cloudfiles_schema_hints) \
29 .load("/Volumes/acmi_edw_dev/edw_raw/landing_volume/customer/*.csv")
30
31
32 # Add operational metadata columns
33 df = df.withColumn('_source_file_size', F.col('_metadata.file_size'))
34 df = df.withColumn('_source_file_modification_time', F.col('_metadata.file_modification_time'))
35 df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
36
37 return df
delta¶
Deprecated since version 0.7.8: The database field (e.g., database: "${catalog}.${schema}") is deprecated
for delta sources. Use explicit catalog and schema fields instead. The old
format is auto-converted with a deprecation warning. Removal in v1.0.0.
actions:
- name: customer_raw_load
type: load
operational_metadata: ["_processing_timestamp"]
readMode: stream
source:
type: delta
catalog: "${catalog}"
schema: "${raw_schema}"
table: customer
target: v_customer_raw
description: "Load customer table from raw schema"
Anatomy of a delta load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
readMode: Either batch or stream - translates to
spark.read.table()orspark.readStream.table()- source:
type: Use Delta table as source
catalog: Target catalog using substitution variables
schema: Target schema using substitution variables
table: Name of the Delta table to read from
target: Name of the temporary view created
description: Optional documentation for the action
Important
Delta load actions can read from both regular Delta tables and Change Data Feed (CDC) enabled tables. Use readMode: stream for real-time processing or readMode: batch for one-time loads.
Delta Options
Delta load actions support the options field to configure Delta-specific reader options:
actions:
- name: load_orders_cdc
type: load
readMode: stream
source:
type: delta
catalog: "${catalog}"
schema: "bronze"
table: orders
options:
readChangeFeed: "true"
startingVersion: "0"
ignoreDeletes: "true"
target: v_orders_changes
description: "Stream order changes using Delta Change Data Feed"
Supported Delta Options:
Note
readChangeFeedworks in both stream and batch mode. In batch mode, a starting bound (startingVersionorstartingTimestamp) is required.endingVersionandendingTimestampare only valid in batch mode.readChangeFeedandskipChangeCommitsare mutually exclusive — one reads all changes, the other skips them.readChangeFeedcannot be combined with time-travel options (versionAsOf/timestampAsOf).startingVersionandstartingTimestampare mutually exclusive.versionAsOfandtimestampAsOfare mutually exclusive.All option values are validated and cannot be
Noneor empty strings.
Batch CDF Example:
actions:
- name: load_order_changes
type: load
readMode: batch
source:
type: delta
catalog: "${catalog}"
schema: "bronze"
table: orders
options:
readChangeFeed: "true"
startingVersion: "5"
endingVersion: "20"
target: v_order_changes
description: "Read order changes between version 5 and 20"
Warning
When using startingVersion, the specified version may become unavailable after
VACUUM runs. Prefer startingTimestamp for durable references, or use
checkpoint-managed streaming for production workloads.
readChangeFeed vs skipChangeCommits:
readChangeFeed: "true"— reads the Change Data Feed, exposing row-level changes (inserts, updates, deletes) with metadata columns. Use this when you need to process individual changes (e.g., CDC into a downstream table).skipChangeCommits: "true"— skips commits that contain data-changing operations (useful when a table has CDF enabled but you only want the latest state, ignoring change events). Cannot be combined with readChangeFeed.
CDF Metadata Columns:
When readChangeFeed is enabled, the resulting DataFrame includes three additional
columns:
_change_type— the type of change:insert,update_preimage,update_postimage, ordelete_commit_version— the Delta version of the commit_commit_timestamp— the timestamp of the commit
An UPDATE operation produces two rows: one with _change_type = "update_preimage"
(the old values) and one with _change_type = "update_postimage" (the new values).
If writing CDF data to a non-CDC streaming table, you should filter or drop these columns in a transform action:
SELECT * EXCEPT (_change_type, _commit_version, _commit_timestamp)
FROM stream(v_order_changes)
WHERE _change_type != 'delete'
Full Refresh Resilience:
When a Delta table undergoes a full refresh (e.g., TRUNCATE followed by reload), the
CDF stream emits a large batch of delete rows followed by insert rows. This can
overwhelm downstream consumers. Mitigation strategies:
Use
ignoreDeletes: "true"if deletes are not relevant to your pipeline.Use
skipChangeCommits: "true"on non-CDF consumers that share the same source table.For CDC targets, rely on Databricks checkpointing to handle reprocessing gracefully.
Time Travel Example:
actions:
- name: load_customers_snapshot
type: load
readMode: batch
source:
type: delta
catalog: "${catalog}"
schema: "silver"
table: customers
options:
versionAsOf: "10"
where_clause: ["status = 'active'"]
select_columns: ["customer_id", "name", "email"]
target: v_customers_snapshot
description: "Load customers at version 10"
See also
For
streamreadMode see the Databricks documentation on Change Data FeedFor time travel see Delta Time Travel
Operational metadata: Operational Metadata
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_customer_raw():
6 """Load customer table from raw schema"""
7 df = spark.readStream.table("acmi_edw_dev.edw_raw.customer")
8
9 # Add operational metadata columns
10 df = df.withColumn('_processing_timestamp', current_timestamp())
11
12 return df
kafka¶
actions:
- name: load_kafka_events
type: load
readMode: stream
operational_metadata: ["_processing_timestamp"]
source:
type: kafka
bootstrap_servers: "kafka1.example.com:9092,kafka2.example.com:9092"
subscribe: "events,logs,metrics"
options:
startingOffsets: "latest"
failOnDataLoss: false
kafka.group.id: "lhp-consumer-group"
kafka.session.timeout.ms: 30000
kafka.ssl.truststore.location: "/path/to/truststore.jks"
kafka.ssl.truststore.password: "${secret:scope/truststore-password}"
target: v_kafka_events_raw
description: "Load events from Kafka topics"
Anatomy of a Kafka load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Must be stream - Kafka is always streaming
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
- source:
type: Use Apache Kafka as source
bootstrap_servers: Comma-separated list of Kafka broker addresses (host:port)
subscribe: Comma-separated list of topics to subscribe to (choose ONE subscription method)
subscribePattern: Java regex pattern for topic subscription (alternative to subscribe)
assign: JSON string specifying specific topic partitions (alternative to subscribe)
- options:
startingOffsets: Starting offset position (earliest/latest/JSON)
failOnDataLoss: Whether to fail on potential data loss (default: true)
kafka.group.id: Consumer group ID (use with caution)
kafka.session.timeout.ms: Session timeout in milliseconds
kafka.ssl.*: SSL/TLS configuration options for secure connections
kafka.sasl.*: SASL authentication options
All other kafka.* options from Databricks Kafka connector
target: Name of the temporary view created
description: Optional documentation for the action
See also
For full list of Kafka options see the Databricks Kafka documentation.
Operational metadata: Operational Metadata
Important
Kafka always returns a fixed 7-column schema with binary key/value columns:
key, value, topic, partition, offset, timestamp, timestampType.
You must explicitly deserialize the key and value columns using transform actions.
Warning
Subscription Methods: You must specify exactly ONE of:
subscribe: Comma-separated list of specific topicssubscribePattern: Java regex pattern for topic namesassign: JSON with specific topic partitions
Using multiple subscription methods will result in an error.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_kafka_events_raw():
6 """Load events from Kafka topics"""
7 df = spark.readStream \
8 .format("kafka") \
9 .option("kafka.bootstrap.servers", "kafka1.example.com:9092,kafka2.example.com:9092") \
10 .option("subscribe", "events,logs,metrics") \
11 .option("startingOffsets", "latest") \
12 .option("failOnDataLoss", False) \
13 .option("kafka.group.id", "lhp-consumer-group") \
14 .option("kafka.session.timeout.ms", 30000) \
15 .option("kafka.ssl.truststore.location", "/path/to/truststore.jks") \
16 .option("kafka.ssl.truststore.password", dbutils.secrets.get("scope", "truststore-password")) \
17 .load()
18
19 # Add operational metadata columns
20 df = df.withColumn('_processing_timestamp', current_timestamp())
21
22 return df
Example: Deserializing Kafka Data
Since Kafka returns binary data, you typically need a transform action to deserialize:
actions:
# Load from Kafka (returns binary key/value)
- name: load_kafka_events
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "localhost:9092"
subscribe: "events"
target: v_kafka_events_raw
# Deserialize and parse JSON
- name: parse_kafka_events
type: transform
transform_type: sql
source: v_kafka_events_raw
target: v_kafka_events_parsed
sql: |
SELECT
CAST(key AS STRING) as message_key,
from_json(CAST(value AS STRING), 'event_type STRING, timestamp BIGINT, data STRING') as parsed_value,
topic,
partition,
offset,
timestamp as kafka_timestamp
FROM $source
Advanced Authentication: AWS MSK IAM
AWS Managed Streaming for Apache Kafka (MSK) supports IAM authentication for secure, credential-free access.
Prerequisites:
AWS MSK cluster configured with IAM authentication enabled
Databricks cluster with IAM role/instance profile with MSK permissions
IAM policy granting
kafka-cluster:Connect,kafka-cluster:DescribeCluster, and topic/group permissions
YAML Configuration:
actions:
- name: load_msk_orders
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098"
subscribe: "orders"
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "AWS_MSK_IAM"
kafka.sasl.jaas.config: "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;"
kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
startingOffsets: "earliest"
failOnDataLoss: "false"
target: v_msk_orders_raw
description: "Load orders from MSK using IAM authentication"
With Specific IAM Role:
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "AWS_MSK_IAM"
kafka.sasl.jaas.config: 'shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="${msk_role_arn}";'
kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Generated PySpark Code:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_msk_orders_raw():
5 """Load orders from MSK using IAM authentication"""
6 df = spark.readStream \
7 .format("kafka") \
8 .option("kafka.bootstrap.servers", "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098") \
9 .option("subscribe", "orders") \
10 .option("kafka.security.protocol", "SASL_SSL") \
11 .option("kafka.sasl.mechanism", "AWS_MSK_IAM") \
12 .option("kafka.sasl.jaas.config", "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;") \
13 .option("kafka.sasl.client.callback.handler.class", "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler") \
14 .option("startingOffsets", "earliest") \
15 .option("failOnDataLoss", "false") \
16 .load()
17
18 return df
See also
For complete MSK IAM documentation see AWS MSK IAM Access Control.
Important
MSK IAM Requirements:
Port 9098 is used for IAM authentication (not the standard 9092)
All three options are required:
kafka.security.protocol,kafka.sasl.mechanism,kafka.sasl.jaas.config,kafka.sasl.client.callback.handler.classIAM role must have appropriate kafka-cluster:* permissions
No credentials are stored - authentication is via IAM
Ensure Databricks cluster has network access to MSK cluster
Advanced Authentication: Azure Event Hubs OAuth
Azure Event Hubs provides Kafka protocol support with OAuth 2.0 authentication using Azure Active Directory.
Prerequisites:
Azure Event Hubs namespace (Premium or Standard tier)
Azure AD App Registration (Service Principal) with appropriate permissions
Service Principal granted “Azure Event Hubs Data Receiver” role on the namespace
Databricks secrets configured for client credentials
YAML Configuration:
actions:
- name: load_event_hubs_data
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "my-namespace.servicebus.windows.net:9093"
subscribe: "my-event-hub"
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "OAUTHBEARER"
kafka.sasl.jaas.config: 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="${secret:azure_secrets/client_id}" clientSecret="${secret:azure_secrets/client_secret}" scope="https://${event_hubs_namespace}/.default" ssl.protocol="SSL";'
kafka.sasl.oauthbearer.token.endpoint.url: "https://login.microsoft.com/${azure_tenant_id}/oauth2/v2.0/token"
kafka.sasl.login.callback.handler.class: "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
startingOffsets: "earliest"
target: v_event_hubs_data_raw
description: "Load data from Azure Event Hubs using OAuth"
Generated PySpark Code:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_event_hubs_data_raw():
5 """Load data from Azure Event Hubs using OAuth"""
6 df = spark.readStream \
7 .format("kafka") \
8 .option("kafka.bootstrap.servers", "my-namespace.servicebus.windows.net:9093") \
9 .option("subscribe", "my-event-hub") \
10 .option("kafka.security.protocol", "SASL_SSL") \
11 .option("kafka.sasl.mechanism", "OAUTHBEARER") \
12 .option("kafka.sasl.jaas.config",
13 f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{dbutils.secrets.get(scope="azure-secrets", key="client_id")}" clientSecret="{dbutils.secrets.get(scope="azure-secrets", key="client_secret")}" scope="https://my-namespace.servicebus.windows.net/.default" ssl.protocol="SSL";') \
14 .option("kafka.sasl.oauthbearer.token.endpoint.url", "https://login.microsoft.com/12345678-1234-1234-1234-123456789012/oauth2/v2.0/token") \
15 .option("kafka.sasl.login.callback.handler.class", "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler") \
16 .option("startingOffsets", "earliest") \
17 .load()
18
19 return df
See also
For complete Event Hubs Kafka documentation see Azure Event Hubs for Apache Kafka.
Important
Event Hubs OAuth Requirements:
Port 9093 is always used for Kafka protocol with Event Hubs
Event Hubs namespace must be in format:
<namespace>.servicebus.windows.netThe scope in JAAS config must match:
https://<namespace>.servicebus.windows.net/.defaultAll four options are required:
kafka.security.protocol,kafka.sasl.mechanism,kafka.sasl.jaas.config,kafka.sasl.oauthbearer.token.endpoint.url,kafka.sasl.login.callback.handler.classService Principal needs “Azure Event Hubs Data Receiver” role assignment
OAuth token refresh is handled automatically by the callback handler
Always use secrets for client credentials - never hardcode in YAML
sql¶
SQL load actions support both inline SQL and external SQL files.
Option 1: Inline SQL
actions:
- name: load_customer_summary
type: load
readMode: batch
source:
type: sql
sql: |
SELECT
c_custkey,
c_name,
c_mktsegment,
COUNT(*) as order_count,
SUM(o_totalprice) as total_spent
FROM ${catalog}.${raw_schema}.customer c
LEFT JOIN ${catalog}.${raw_schema}.orders o
ON c.c_custkey = o.o_custkey
GROUP BY c_custkey, c_name, c_mktsegment
target: v_customer_summary
description: "Load customer summary with order statistics"
Option 2: External SQL File
actions:
- name: load_customer_metrics
type: load
readMode: batch
source:
type: sql
sql_path: "sql/customer_metrics.sql"
target: v_customer_metrics
description: "Load customer metrics from external SQL file"
Anatomy of an SQL load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - determines execution mode
- source:
type: Use SQL query as source
sql: SQL statement with substitution variables for dynamic values (inline option)
sql_path: Path to external .sql file (external file option)
target: Name of the temporary view created from query results
description: Optional documentation for the action
See also
For SQL syntax see the Databricks SQL documentation.
Substitution variables: Substitutions & Secrets
Important
SQL load actions allow you to create complex views from multiple tables using standard SQL.
Use substitution variables like ${catalog} and ${schema} for environment-specific values.
Note
File Substitution Support
Substitution variables work in both inline SQL and external SQL files (sql_path).
The same ${token} and ${secret:scope/key} syntax from YAML works in .sql files.
Files are processed for substitutions before query execution.
Note
File Organization: When using sql_path, the path is relative to your YAML file location.
Common practice is to create a sql/ folder alongside your pipeline YAML files.
The above YAML examples translate to the following PySpark code
For inline SQL:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_customer_summary():
5 """Load customer summary with order statistics"""
6 return spark.sql("""
7 SELECT
8 c_custkey,
9 c_name,
10 c_mktsegment,
11 COUNT(*) as order_count,
12 SUM(o_totalprice) as total_spent
13 FROM acmi_edw_dev.edw_raw.customer c
14 LEFT JOIN acmi_edw_dev.edw_raw.orders o
15 ON c.c_custkey = o.o_custkey
16 GROUP BY c_custkey, c_name, c_mktsegment
17 """)
For external SQL file:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_customer_metrics():
5 """Load customer metrics from external SQL file"""
6 return spark.sql("""
7 -- Content from sql/customer_metrics.sql file
8 SELECT
9 customer_id,
10 total_orders,
11 avg_order_value,
12 last_order_date
13 FROM ${catalog}.${silver_schema}.customer_analytics
14 WHERE last_order_date >= current_date() - INTERVAL 90 DAYS
15 """)
jdbc¶
JDBC load actions connect to external relational databases using JDBC drivers. They support both table queries and custom SQL queries.
Option 1: Query-based JDBC
actions:
- name: load_external_customers
type: load
readMode: batch
operational_metadata: ["_extraction_timestamp"]
source:
type: jdbc
url: "jdbc:postgresql://db.example.com:5432/production"
driver: "org.postgresql.Driver"
user: "${secret:database/username}"
password: "${secret:database/password}"
query: |
SELECT
customer_id,
first_name,
last_name,
email,
registration_date,
country
FROM customers
WHERE status = 'active'
AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
target: v_external_customers
description: "Load active customers from external PostgreSQL database"
Option 2: Table-based JDBC
actions:
- name: load_external_products
type: load
readMode: batch
source:
type: jdbc
url: "jdbc:mysql://mysql.example.com:3306/catalog"
driver: "com.mysql.cj.jdbc.Driver"
user: "${secret:mysql/username}"
password: "${secret:mysql/password}"
table: "products"
target: v_external_products
description: "Load products table from external MySQL database"
Anatomy of a JDBC load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - JDBC typically uses batch mode
operational_metadata: Add custom metadata columns (e.g., extraction timestamp)
- source:
type: Use JDBC connection as source
url: JDBC connection string with database server details
driver: JDBC driver class name (database-specific)
user: Database username (supports secret substitution)
password: Database password (supports secret substitution)
query: Custom SQL query to execute (query option)
table: Table name to read entirely (table option)
target: Name of the temporary view created
description: Optional documentation for the action
See also
For JDBC drivers see the Databricks JDBC documentation.
Secret management: Substitutions & Secrets
Important
JDBC load actions require either a query or table field, but not both.
Use secret substitution (${secret:scope/key}) for secure credential management.
Ensure the appropriate JDBC driver is available in your Databricks cluster.
Note
Secret Management: Always use ${secret:scope/key} syntax for database credentials.
The framework automatically handles secret substitution during code generation.
The above YAML examples translate to the following PySpark code
For query-based JDBC:
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_external_customers():
6 """Load active customers from external PostgreSQL database"""
7 df = spark.read \
8 .format("jdbc") \
9 .option("url", "jdbc:postgresql://db.example.com:5432/production") \
10 .option("user", "{{ secret_substituted_username }}") \
11 .option("password", "{{ secret_substituted_password }}") \
12 .option("driver", "org.postgresql.Driver") \
13 .option("query", """
14 SELECT
15 customer_id,
16 first_name,
17 last_name,
18 email,
19 registration_date,
20 country
21 FROM customers
22 WHERE status = 'active'
23 AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
24 """) \
25 .load()
26
27 # Add operational metadata columns
28 df = df.withColumn('_extraction_timestamp', current_timestamp())
29
30 return df
For table-based JDBC:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_external_products():
5 """Load products table from external MySQL database"""
6 df = spark.read \
7 .format("jdbc") \
8 .option("url", "jdbc:mysql://mysql.example.com:3306/catalog") \
9 .option("user", "{{ secret_substituted_username }}") \
10 .option("password", "{{ secret_substituted_password }}") \
11 .option("driver", "com.mysql.cj.jdbc.Driver") \
12 .option("dbtable", "products") \
13 .load()
14
15 return df
python¶
Python load actions call custom Python functions that return DataFrames. This allows for complex data extraction logic, API calls, or custom data processing.
YAML Configuration:
actions:
- name: load_api_data
type: load
readMode: batch
operational_metadata: ["_api_call_timestamp"]
source:
type: python
module_path: "extractors/api_extractor.py"
function_name: "extract_customer_data"
parameters:
api_endpoint: "https://api.example.com/customers"
api_key: "${secret:apis/customer_api_key}"
batch_size: 1000
start_date: "2024-01-01"
target: v_api_customers
description: "Load customer data from external API"
Python Function (extractors/api_extractor.py):
1import requests
2from pyspark.sql import DataFrame
3from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
4
5def extract_customer_data(spark, parameters: dict) -> DataFrame:
6 """Extract customer data from external API.
7
8 Args:
9 spark: SparkSession instance
10 parameters: Configuration parameters from YAML
11
12 Returns:
13 DataFrame: Customer data as PySpark DataFrame
14 """
15 # Extract parameters from YAML configuration
16 api_endpoint = parameters.get("api_endpoint")
17 api_key = parameters.get("api_key")
18 batch_size = parameters.get("batch_size", 1000)
19 start_date = parameters.get("start_date")
20
21 # Call external API
22 headers = {"Authorization": f"Bearer {api_key}"}
23 response = requests.get(
24 f"{api_endpoint}?start_date={start_date}&limit={batch_size}",
25 headers=headers
26 )
27 response.raise_for_status()
28
29 # Convert API response to DataFrame
30 data = response.json()["customers"]
31
32 # Define schema for the DataFrame
33 schema = StructType([
34 StructField("customer_id", IntegerType(), True),
35 StructField("first_name", StringType(), True),
36 StructField("last_name", StringType(), True),
37 StructField("email", StringType(), True),
38 StructField("registration_date", TimestampType(), True)
39 ])
40
41 # Create and return DataFrame
42 return spark.createDataFrame(data, schema)
Anatomy of a Python load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - Python actions typically use batch mode
operational_metadata: Add custom metadata columns
- source:
type: Use Python function as source
module_path: Path to Python file containing the extraction function
function_name: Name of function to call (defaults to “get_df” if not specified)
parameters: Dictionary of parameters to pass to the function
target: Name of the temporary view created
description: Optional documentation for the action
See also
For PySpark DataFrame operations see the Databricks PySpark documentation.
Custom functions: Concepts & Architecture
Important
Python functions must accept two parameters: spark (SparkSession) and parameters (dict).
The function must return a PySpark DataFrame that will be used as the view source.
Note
File Organization: When using module_path, the path is relative to your YAML file location.
Common practice is to create an extractors/ or functions/ folder alongside your pipeline YAML files.
Note
Parameter Substitution: The parameters dictionary supports ${token}
substitution for environment-specific values:
parameters:
catalog: "${catalog}"
table_name: "${schema}.users"
api_endpoint: "${api_url}"
batch_size: 1000 # No substitution needed
All tokens are replaced with values from substitutions/{env}.yaml at generation time.
Secret references (${secret:scope/key}) are converted to dbutils.secrets.get() calls.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3from extractors.api_extractor import extract_customer_data
4
5@dp.temporary_view()
6def v_api_customers():
7 """Load customer data from external API"""
8 # Call the external Python function with spark and parameters
9 parameters = {
10 "api_endpoint": "https://api.example.com/customers",
11 "api_key": "{{ secret_substituted_api_key }}",
12 "batch_size": 1000,
13 "start_date": "2024-01-01"
14 }
15 df = extract_customer_data(spark, parameters)
16
17 # Add operational metadata columns
18 df = df.withColumn('_api_call_timestamp', current_timestamp())
19
20 return df
PySpark Custom DataSource¶
Custom data source load actions use PySpark’s DataSource API to implement specialized data ingestion from APIs, custom protocols, or any external system that requires custom logic. This allows for highly flexible data ingestion patterns.
YAML Configuration:
actions:
- name: load_currency_exchange
type: load
readMode: stream
operational_metadata: ["_processing_timestamp"]
source:
type: custom_datasource
module_path: "data_sources/currency_api_source.py"
custom_datasource_class: "CurrencyAPIStreamingDataSource"
options:
apiKey: "${secret:apis/currency_key}"
baseCurrencies: "USD,EUR,GBP"
progressPath: "/Volumes/catalog/schema/checkpoints/"
minCallIntervalSeconds: "300"
workspaceUrl: "adb-XYZ.azuredatabricks.net"
target: v_currency_bronze
description: "Load live currency exchange rates from external API"
Custom DataSource Implementation (data_sources/currency_api_source.py):
1from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
2from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType
3from typing import Iterator, Tuple
4import requests
5import time
6import json
7
8class CurrencyInputPartition(InputPartition):
9 """Input partition for currency API data source"""
10 def __init__(self, start_time, end_time):
11 self.start_time = start_time
12 self.end_time = end_time
13
14class CurrencyAPIStreamingDataSource(DataSource):
15 """
16 Custom data source for live currency exchange rates.
17 Fetches data from external API with rate limiting and progress tracking.
18 """
19
20 @classmethod
21 def name(cls):
22 return "currency_api_stream"
23
24 def schema(self):
25 return """
26 base_currency string,
27 target_currency string,
28 exchange_rate double,
29 api_timestamp timestamp,
30 fetch_timestamp timestamp,
31 rate_change_1h double,
32 is_crypto boolean,
33 data_source string,
34 pipeline_run_id string
35 """
36
37 def streamReader(self, schema: StructType):
38 return CurrencyAPIStreamingReader(schema, self.options)
39
40class CurrencyAPIStreamingReader(DataSourceStreamReader):
41 """Streaming reader implementation with API calls and progress tracking"""
42
43 def __init__(self, schema, options):
44 self.schema = schema
45 self.options = options
46 self.api_key = options.get("apiKey")
47 self.base_currencies = options.get("baseCurrencies", "USD").split(",")
48 self.progress_path = options.get("progressPath")
49 self.min_interval = int(options.get("minCallIntervalSeconds", "300"))
50
51 def initialOffset(self) -> dict:
52 return {"fetch_time": int(time.time() * 1000)}
53
54 def latestOffset(self) -> dict:
55 return {"fetch_time": int(time.time() * 1000)}
56
57 def partitions(self, start: dict, end: dict):
58 return [CurrencyInputPartition(start.get("fetch_time", 0), end.get("fetch_time", 0))]
59
60 def read(self, partition) -> Iterator[Tuple]:
61 """Fetch data from external API and yield as tuples"""
62 # API call logic here
63 for base_currency in self.base_currencies:
64 # Make API calls and yield data
65 yield (base_currency, "USD", 1.0, time.time(), time.time(), 0.0, False, "API", "run_1")
Anatomy of a custom data source load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - determines if custom DataSource uses batch or stream reader
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
- source: Custom data source configuration
type: Use custom_datasource as source type
module_path: Path to Python file containing the custom DataSource implementation
custom_datasource_class: Name of the DataSource class to register and use
options: Dictionary of parameters passed to the DataSource (available via self.options)
target: Name of the temporary view created
description: Optional documentation for the action
See also
For PySpark DataSource API see the PySpark DataSource documentation.
Custom integrations: Concepts & Architecture
Important
Custom DataSources require implementing the DataSource interface with appropriate reader methods. The framework automatically registers your DataSource and copies the implementation to the generated pipeline. Use options dictionary to pass configuration parameters from YAML to your DataSource.
Note
File Substitution Support
Custom DataSource Python files support substitution variables:
Environment tokens:
${catalog},${api_endpoint},${environment}Secret references:
${secret:scope/key}for API keys and credentialsSubstitutions are applied before the class is embedded in the generated code.
Key Implementation Requirements:
- Your DataSource class must implement the name() class method returning the format name used in .format()
- The framework uses the return value of name() method, not the class name, for the format string
- The custom source code is placed before the registration call to ensure proper class definition order
- Import management is handled automatically to resolve conflicts between source file imports and generated code
Note
File Organization: The module_path is relative to your YAML file location.
Common practice is to create a data_sources/ folder alongside your pipeline YAML files.
Schema Definition: Define your schema in the schema() method using DDL string format as shown in the example.
This schema should match the data structure returned by your read() method.
Import Management: The framework automatically handles import deduplication and conflict resolution.
If your custom source uses wildcard imports (e.g., from pyspark.sql.functions import *),
they will take precedence over alias imports, and operational metadata expressions will adapt accordingly.
The above YAML translates to the following PySpark code
1# Generated by LakehousePlumber
2# Pipeline: unirate_api_ingestion
3# FlowGroup: api_unirate_ingestion_bronze
4
5from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
6from pyspark.sql.functions import *
7from pyspark.sql.types import *
8from typing import Iterator, Tuple
9from pyspark import pipelines as dp
10import json
11import os
12import requests
13import time
14
15# Pipeline Configuration
16PIPELINE_ID = "unirate_api_ingestion"
17FLOWGROUP_ID = "api_unirate_ingestion_bronze"
18
19# ============================================================================
20# CUSTOM DATA SOURCE IMPLEMENTATIONS
21# ============================================================================
22# The following code was automatically copied from: data_sources/currency_api_source.py
23# Used by action: load_currency_exchange
24
25class CurrencyInputPartition(InputPartition):
26 """Input partition for currency API data source"""
27 def __init__(self, start_time, end_time):
28 self.start_time = start_time
29 self.end_time = end_time
30
31class CurrencyAPIStreamingDataSource(DataSource):
32 """
33 Real currency exchange data source powered by UniRateAPI.
34 Fetches live exchange rates on each triggered pipeline run.
35 """
36
37 @classmethod
38 def name(cls):
39 return "currency_api_stream"
40
41 def schema(self):
42 return """
43 base_currency string,
44 target_currency string,
45 exchange_rate double,
46 api_timestamp timestamp,
47 fetch_timestamp timestamp,
48 rate_change_1h double,
49 is_crypto boolean,
50 data_source string,
51 pipeline_run_id string
52 """
53
54 def streamReader(self, schema: StructType):
55 return CurrencyAPIStreamingReader(schema, self.options)
56
57# ... rest of custom data source implementation ...
58
59# ============================================================================
60# SOURCE VIEWS
61# ============================================================================
62
63# Try to register the custom data source
64try:
65 spark.dataSource.register(CurrencyAPIStreamingDataSource)
66except Exception:
67 pass # Ignore if already registered
68
69@dp.temporary_view()
70def v_currency_bronze():
71 """Load live currency exchange rates from external API"""
72 df = spark.readStream \
73 .format("currency_api_stream") \
74 .option("apiKey", dbutils.secrets.get(scope='apis', key='currency_key')) \
75 .option("baseCurrencies", "USD,EUR,GBP") \
76 .option("progressPath", "/Volumes/catalog/schema/checkpoints/") \
77 .option("minCallIntervalSeconds", "300") \
78 .option("workspaceUrl", "adb-XYZ.azuredatabricks.net") \
79 .load()
80
81 # Add operational metadata columns
82 df = df.withColumn('_processing_timestamp', current_timestamp())
83
84 return df