Transform Actions

Sub-type

Purpose

sql
Run an inline SQL statement or external .sql file.
python
Apply arbitrary PySpark code; useful for complex logic.
schema
Add, drop, or rename columns, or change data types.
data_quality
Attach expectations (fail, warn, drop) to validate data.
temp_table
Create an intermediate temp table or view for re-use.

sql

SQL transform actions execute SQL queries to transform data between views. They support both inline SQL and external SQL files.

Option 1: Inline SQL

actions:
  - name: customer_bronze_cleanse
    type: transform
    transform_type: sql
    source: v_customer_raw
    target: v_customer_bronze_cleaned
    sql: |
      SELECT
        c_custkey as customer_id,
        c_name as name,
        c_address as address,
        c_nationkey as nation_id,
        c_phone as phone,
        c_acctbal as account_balance,
        c_mktsegment as market_segment,
        c_comment as comment,
        _source_file_path,
        _source_file_size,
        _source_file_modification_time,
        _record_hash,
        _processing_timestamp
      FROM stream(v_customer_raw)
    description: "Cleanse and standardize customer data for bronze layer"

Option 2: External SQL File

actions:
  - name: customer_enrichment
    type: transform
    transform_type: sql
    source: v_customer_bronze
    target: v_customer_enriched
    sql_path: "sql/customer_enrichment.sql"
    description: "Enrich customer data with additional attributes"

Anatomy of an SQL transform action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - transforms data from one view to another

  • transform_type: Specifies this is an SQL-based transformation

  • source: Name of the input view to transform

  • target: Name of the output view to create

  • sql: SQL statement that defines the transformation logic (inline option)

  • sql_path: Path to external .sql file (external file option)

  • description: Optional documentation for the action

See also

Important

SQL transforms can use stream() function for streaming data or direct view references for batch processing. Column aliasing and data type transformations are common patterns in bronze layer cleansing.

Note

File Substitution Support

Substitution variables work in both inline SQL and external SQL files (sql_path). The same ${token} and ${secret:scope/key} syntax from YAML works in .sql files. Files are processed for substitutions before query execution.

Warning

When writing SQL statements, if your source or target is a streaming table you must use the stream() function. For example: `` FROM stream(v_customer_raw) ``

Note

File Organization: When using sql_path, the path is relative to your YAML file location. Common practice is to create a sql/ folder alongside your pipeline YAML files.

The above YAML examples translate to the following PySpark code

For inline SQL:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view(comment="Cleanse and standardize customer data for bronze layer")
 4def v_customer_bronze_cleaned():
 5    """Cleanse and standardize customer data for bronze layer"""
 6    return spark.sql("""
 7        SELECT
 8          c_custkey as customer_id,
 9          c_name as name,
10          c_address as address,
11          c_nationkey as nation_id,
12          c_phone as phone,
13          c_acctbal as account_balance,
14          c_mktsegment as market_segment,
15          c_comment as comment,
16          _source_file_path,
17          _source_file_size,
18          _source_file_modification_time,
19          _record_hash,
20          _processing_timestamp
21        FROM stream(v_customer_raw)
22    """)

For external SQL file:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view(comment="Enrich customer data with additional attributes")
 4def v_customer_enriched():
 5    """Enrich customer data with additional attributes"""
 6    return spark.sql("""
 7        -- Content from sql/customer_enrichment.sql file
 8        SELECT
 9          c.*,
10          n.name as nation_name,
11          r.name as region_name,
12          CASE
13            WHEN account_balance > 5000 THEN 'High Value'
14            WHEN account_balance > 1000 THEN 'Medium Value'
15            ELSE 'Standard'
16          END as customer_tier
17        FROM ${catalog}.${bronze_schema}.customer c
18        LEFT JOIN ${catalog}.${bronze_schema}.nation n ON c.nation_id = n.nation_id
19        LEFT JOIN ${catalog}.${bronze_schema}.region r ON n.region_id = r.region_id
20    """)

python

Python transform actions call custom Python functions to apply complex transformation logic that goes beyond SQL capabilities.

Tip

The framework automatically copies your Python functions into the generated pipeline and handles import management.

actions:
  - name: customer_advanced_enrichment
    type: transform
    transform_type: python
    source: v_customer_bronze
    module_path: "transformations/customer_transforms.py"
    function_name: "enrich_customer_data"
    parameters:
      api_endpoint: "https://api.example.com/geocoding"
      api_key: "${secret:apis/geocoding_key}"
      batch_size: 1000
    target: v_customer_enriched
    readMode: batch
    operational_metadata: ["_processing_timestamp"]
    description: "Apply advanced customer enrichment using external APIs"

Multiple Source Views Example:

actions:
  - name: customer_order_analysis
    type: transform
    transform_type: python
    source: ["v_customer_bronze", "v_orders_bronze"]
    module_path: "analytics/customer_analysis.py"
    function_name: "analyze_customer_orders"
    parameters:
      analysis_window_days: 90
      min_order_count: 5
    target: v_customer_order_insights
    readMode: batch
    description: "Analyze customer order patterns from multiple sources"

Python Function (transformations/customer_transforms.py):

 1import requests
 2from pyspark.sql import DataFrame
 3from pyspark.sql.functions import col, when, lit, udf
 4from pyspark.sql.types import StringType
 5
 6def enrich_customer_data(df: DataFrame, spark, parameters: dict) -> DataFrame:
 7    """Apply advanced customer enrichment using external APIs.
 8
 9    Args:
10        df: Input DataFrame from source view
11        spark: SparkSession instance
12        parameters: Configuration parameters from YAML
13
14    Returns:
15        DataFrame: Enriched customer data
16    """
17    # Extract parameters from YAML configuration
18    api_endpoint = parameters.get("api_endpoint")
19    api_key = parameters.get("api_key")
20    batch_size = parameters.get("batch_size", 1000)
21
22    # Define UDF for geocoding
23    def geocode_address(address):
24        if not address:
25            return None
26        try:
27            response = requests.get(
28                f"{api_endpoint}?address={address}&key={api_key}",
29                timeout=5
30            )
31            if response.status_code == 200:
32                data = response.json()
33                return data.get("coordinates", {}).get("lat")
34            return None
35        except:
36            return None
37
38    geocode_udf = udf(geocode_address, StringType())
39
40    # Apply transformations
41    enriched_df = df.withColumn(
42        "latitude", geocode_udf(col("address"))
43    ).withColumn(
44        "customer_risk_score",
45        when(col("account_balance") < 0, lit("High"))
46        .when(col("account_balance") < 1000, lit("Medium"))
47        .otherwise(lit("Low"))
48    ).withColumn(
49        "address_normalized",
50        col("address").cast("string").alias("address")
51    )
52
53    return enriched_df

Multiple Sources Function Example (analytics/customer_analysis.py):

 1from pyspark.sql import DataFrame
 2from pyspark.sql.functions import col, count, sum, avg, datediff, current_date
 3from typing import List
 4
 5def analyze_customer_orders(dataframes: List[DataFrame], spark, parameters: dict) -> DataFrame:
 6    """Analyze customer order patterns from multiple source views.
 7
 8    Args:
 9        dataframes: List of DataFrames [customers_df, orders_df]
10        spark: SparkSession instance
11        parameters: Configuration parameters from YAML
12
13    Returns:
14        DataFrame: Customer order insights
15    """
16    customers_df, orders_df = dataframes
17    analysis_window_days = parameters.get("analysis_window_days", 90)
18    min_order_count = parameters.get("min_order_count", 5)
19
20    # Join customers with their orders
21    customer_orders = customers_df.alias("c").join(
22        orders_df.alias("o"),
23        col("c.customer_id") == col("o.customer_id"),
24        "left"
25    )
26
27    # Filter orders within analysis window
28    recent_orders = customer_orders.filter(
29        datediff(current_date(), col("o.order_date")) <= analysis_window_days
30    )
31
32    # Calculate customer insights
33    insights = recent_orders.groupBy(
34        col("c.customer_id"),
35        col("c.customer_name"),
36        col("c.market_segment")
37    ).agg(
38        count("o.order_id").alias("order_count"),
39        sum("o.total_price").alias("total_spent"),
40        avg("o.total_price").alias("avg_order_value")
41    ).filter(
42        col("order_count") >= min_order_count
43    )
44
45    return insights

Anatomy of a Python transform action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - transforms data from one view to another

  • transform_type: Specifies this is a Python-based transformation

  • source: Source view name(s) to transform (string for single view, list for multiple views)

  • module_path: Path to Python file containing the transformation function (relative to project root)

  • function_name: Name of function to call (required)

  • parameters: Dictionary of parameters to pass to the function (optional)

  • target: Name of the output view to create

  • readMode: Either batch or stream - determines execution mode

  • operational_metadata: Add custom metadata columns (optional)

  • description: Optional documentation for the action

File Management & Copying Process

Lakehouse Plumber automatically handles Python function deployment:

  1. Automatic File Copying: Your Python functions are copied to generated/pipeline_name/custom_python_functions/ during generation

  2. Substitution Processing: Files are processed for ${token} and ${secret:scope/key} substitutions before copying

  3. Import Management: Imports are automatically generated as from custom_python_functions.module_name import function_name

  4. Warning Headers: Copied files include prominent warnings not to edit them directly

  5. State Tracking: All copied files are tracked and cleaned up when source YAML is removed

  6. Package Structure: A __init__.py file is automatically created to make the directory a Python package

Note

File Substitution Support

Python transform files support the same substitution syntax as YAML:

  • Environment tokens: ${catalog}, ${schema}, ${environment}

  • Secret references: ${secret:scope/key} or ${secret:key}

Substitutions are applied before the file is copied and imported.

See also

Important

Function Requirements: Python functions must accept the appropriate parameters based on source configuration:

  • Single source: function_name(df: DataFrame, spark: SparkSession, parameters: dict)

  • Multiple sources: function_name(dataframes: List[DataFrame], spark: SparkSession, parameters: dict)

  • No sources: function_name(spark: SparkSession, parameters: dict) (for data generators)

Note

File Organization Tips:

  • Keep your Python functions in a dedicated folder (e.g., transformations/, functions/)

  • Use descriptive function names that clearly indicate their purpose

  • Always edit the original files in your project, never the copied files in generated/

  • The module_path is relative to your project root directory

  • Multiple transforms can reference the same Python file with different functions

Warning

DO NOT Edit Generated Files: The copied Python files in custom_python_functions/ are automatically regenerated and include warning headers. Always edit your original source files.

Generated File Structure

After generation, your Python functions appear in the pipeline output with warning headers:

generated/
└── pipeline_name/
    ├── flowgroup_name.py
    └── custom_python_functions/
        ├── __init__.py
        └── customer_transforms.py

Example of Generated File with Warning Header:

# ╔══════════════════════════════════════════════════════════════════════════════╗
# ║                                    WARNING                                   ║
# ║                          DO NOT EDIT THIS FILE DIRECTLY                      ║
# ╠══════════════════════════════════════════════════════════════════════════════╣
# ║ This file was automatically copied from: transformations/customer_transforms.py ║
# ║ during pipeline generation. Any changes made here will be OVERWRITTEN        ║
# ║ on the next generation cycle.                                                ║
# ║                                                                              ║
# ║ To make changes:                                                             ║
# ║ 1. Edit the original file: transformations/customer_transforms.py           ║
# ║ 2. Regenerate the pipeline                                                   ║
# ╚══════════════════════════════════════════════════════════════════════════════╝

import requests
from pyspark.sql import DataFrame
# ... rest of your original function code ...

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import current_timestamp
 3from custom_python_functions.customer_transforms import enrich_customer_data
 4
 5@dp.temporary_view()
 6def v_customer_enriched():
 7    """Apply advanced customer enrichment using external APIs"""
 8    # Load source view(s)
 9    v_customer_bronze_df = spark.read.table("v_customer_bronze")
10
11    # Apply Python transformation
12    parameters = {
13        "api_endpoint": "https://api.example.com/geocoding",
14        "api_key": "{{ secret_substituted_api_key }}",
15        "batch_size": 1000
16    }
17    df = enrich_customer_data(v_customer_bronze_df, spark, parameters)
18
19    # Add operational metadata columns
20    df = df.withColumn('_processing_timestamp', current_timestamp())
21
22    return df

For multiple source views:

 1from pyspark import pipelines as dp
 2from custom_python_functions.customer_analysis import analyze_customer_orders
 3
 4@dp.temporary_view()
 5def v_customer_order_insights():
 6    """Analyze customer order patterns from multiple sources"""
 7    # Load source views
 8    v_customer_bronze_df = spark.read.table("v_customer_bronze")
 9    v_orders_bronze_df = spark.read.table("v_orders_bronze")
10
11    # Apply Python transformation with multiple sources
12    parameters = {
13        "analysis_window_days": 90,
14        "min_order_count": 5
15    }
16    dataframes = [v_customer_bronze_df, v_orders_bronze_df]
17    df = analyze_customer_orders(dataframes, spark, parameters)
18
19    return df

data_quality

Data quality transform actions apply data validation rules using Databricks DLT expectations. They automatically handle data that fails validation based on configured actions.

actions:
  - name: customer_bronze_DQE
    type: transform
    transform_type: data_quality
    source: v_customer_bronze_cleaned
    target: v_customer_bronze_DQE
    readMode: stream
    expectations_file: "expectations/customer_quality.json"
    description: "Apply data quality checks to customer data"

Expectations File (expectations/customer_quality.json):

 1{
 2  "version": "1.0",
 3  "table": "customer",
 4  "expectations": [
 5    {
 6      "name": "valid_custkey",
 7      "expression": "customer_id IS NOT NULL AND customer_id > 0",
 8      "failureAction": "fail"
 9    },
10    {
11      "name": "valid_customer_name",
12      "expression": "name IS NOT NULL AND LENGTH(TRIM(name)) > 0",
13      "failureAction": "fail"
14    },
15    {
16      "name": "valid_phone_format",
17      "expression": "phone IS NULL OR LENGTH(phone) >= 10",
18      "failureAction": "warn"
19    },
20    {
21      "name": "valid_account_balance",
22      "expression": "account_balance IS NULL OR account_balance >= -10000",
23      "failureAction": "warn"
24    },
25    {
26      "name": "suspicious_balance",
27      "expression": "account_balance IS NULL OR account_balance < 50000",
28      "failureAction": "drop"
29    }
30  ]
31}

Anatomy of a data quality transform action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - transforms data with quality validation

  • transform_type: Specifies this is a data quality transformation

  • source: Name of the input view to validate

  • target: Name of the output view after validation

  • readMode: Must be stream - data quality transforms require streaming mode

  • expectations_file: Path to JSON file containing validation rules

  • description: Optional documentation for the action

Expectation Actions: - fail: Stop the pipeline if any records violate the rule - warn: Log warnings but continue processing all records - drop: Remove records that violate the rule but continue processing

See also

Important

Data quality transforms require readMode: stream and generate DLT streaming tables with built-in quality monitoring. Use fail for critical business rules, warn for monitoring, and drop for data cleansing.

Note

File Organization: Expectations files are typically stored in an expectations/ folder. JSON format allows for version control and reuse across multiple pipelines.

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4# These expectations will fail the pipeline if violated
 5@dp.expect_all_or_fail({
 6    "valid_custkey": "customer_id IS NOT NULL AND customer_id > 0",
 7    "valid_customer_name": "name IS NOT NULL AND LENGTH(TRIM(name)) > 0"
 8})
 9# These expectations will drop rows that violate them
10@dp.expect_all_or_drop({
11    "suspicious_balance": "account_balance IS NULL OR account_balance < 50000"
12})
13# These expectations will log warnings but not drop rows
14@dp.expect_all({
15    "valid_phone_format": "phone IS NULL OR LENGTH(phone) >= 10",
16    "valid_account_balance": "account_balance IS NULL OR account_balance >= -10000"
17})
18def v_customer_bronze_DQE():
19    """Apply data quality checks to customer data"""
20    df = spark.readStream.table("v_customer_bronze_cleaned")
21
22    return df

See also

For advanced quarantine mode with Dead Letter Queue (DLQ) recycling — routing failed rows to an external table and automatically recycling fixed records back into the pipeline — see Quarantine (Dead Letter Queue).

schema

Schema transform actions apply column mapping, type casting, and schema enforcement to standardize data structures.

Action Format Structure

Schema transform actions use a flat structure with schema definition at the action level:

actions:
  - name: standardize_customer_schema
    type: transform
    transform_type: schema
    source: v_customer_raw                              # Simple view name (string)
    target: v_customer_standardized
    schema_file: "schemas/bronze/customer_transform.yaml"  # OR use inline schema
    enforcement: strict                                  # Optional: strict or permissive (default: permissive)
    readMode: stream                                     # Optional: stream (default) or batch
    description: "Standardize customer schema and data types"

Key Points:

  • source: Must be a simple string (view name), not a dictionary

  • schema_file OR schema: Choose one (external file or inline definition)

  • enforcement: Action-level field (strict or permissive)

  • readMode: Defaults to stream (not batch)

External Schema Files (Recommended)

External schema files contain only column definitions (no enforcement):

# In schemas/bronze/customer_transform.yaml:
columns:
  - "c_custkey -> customer_id: BIGINT"     # Rename and cast
  - "c_name -> customer_name"               # Rename only
  - "c_address -> address"                  # Rename only
  - "account_balance: DECIMAL(18,2)"        # Cast only
  - "phone_number: STRING"                  # Cast only

Arrow Format Syntax:

  • "old_col -> new_col: TYPE" - Rename and cast in one line

  • "old_col -> new_col" - Rename only

  • "col: TYPE" - Cast only (no rename)

  • "col" - Pass-through (strict mode only, explicitly keep column)

Schema File Paths:

Schema files can be organized in subdirectories relative to your project root:

# Root level
schema_file: "customer_transform.yaml"

# In schemas/ directory
schema_file: "schemas/customer_transform.yaml"

# Nested subdirectories (recommended for organization by layer)
schema_file: "schemas/bronze/dimensions/customer_transform.yaml"

# Absolute paths also supported
schema_file: "/absolute/path/to/schema.yaml"

Inline Schema (Arrow Format)

For simple transformations, use inline schema with arrow syntax:

- name: standardize_customer_schema
  type: transform
  transform_type: schema
  source: v_customer_raw
  target: v_customer_standardized
  enforcement: strict
  schema_inline: |
    c_custkey -> customer_id: BIGINT
    c_name -> customer_name
    account_balance: DECIMAL(18,2)
    phone_number: STRING

Inline Schema (Full YAML Format)

For more complex schemas, use full YAML structure inline:

- name: standardize_customer_schema
  type: transform
  transform_type: schema
  source: v_customer_raw
  target: v_customer_standardized
  enforcement: strict
  schema_inline: |
    columns:
      - "c_custkey -> customer_id: BIGINT"
      - "c_name -> customer_name"
      - "account_balance: DECIMAL(18,2)"

Schema Enforcement Modes:

Schema transforms support two enforcement modes (specified at action level):

  • strict: Only explicitly defined columns are kept in the output. All other columns are dropped. This ensures data quality and prevents schema drift.

  • permissive (default): Explicitly defined columns are transformed, but all other columns pass through unchanged. Useful when you only want to standardize specific columns.

# Strict enforcement example
- name: transform_strict
  type: transform
  transform_type: schema
  source: v_raw
  target: v_clean
  enforcement: strict
  schema_file: "schemas/transform.yaml"

# Input:  c_custkey, c_name, c_address, c_phone, extra_col
# Output: customer_id, customer_name (+ operational metadata)
#         ↑ All unmapped columns dropped
# Permissive enforcement example (default)
- name: transform_permissive
  type: transform
  transform_type: schema
  source: v_raw
  target: v_clean
  enforcement: permissive  # or omit (permissive is default)
  schema_file: "schemas/transform.yaml"

# Input:  c_custkey, c_name, c_address, c_phone, extra_col
# Output: customer_id, customer_name, c_address, c_phone, extra_col
#         ↑ All unmapped columns kept

Breaking Change: Old Format No Longer Supported

Warning

The nested format with schema definition inside source is no longer supported and will raise an error:

# OLD FORMAT (NO LONGER WORKS):
source:
  view: v_customer_raw
  schema_file: "path.yaml"

# NEW FORMAT (REQUIRED):
source: v_customer_raw
schema_file: "path.yaml"
enforcement: strict

If you see an error about “deprecated nested format”, move schema_inline or schema_file to the top level of the action.

Anatomy of a schema transform action

  • name: Unique name for this action within the FlowGroup

  • type: Must be transform

  • transform_type: Must be schema

  • source: Name of the input view to transform (simple string, not a dictionary)

  • target: Name of the output view with transformed schema

  • schema_file: Path to external schema file (arrow or legacy format) - use this OR schema_inline

  • schema_inline: Inline schema definition (arrow or YAML format) - use this OR schema_file

  • enforcement: Optional - Schema enforcement mode (strict or permissive, default: permissive)

  • readMode: Optional - Either stream (default) or batch - determines execution mode

  • description: Optional documentation for the action

See also

Important

Schema transforms preserve operational metadata columns automatically. These columns are never renamed or cast, and are always included in the output regardless of enforcement mode. Use schema transforms for standardizing column names and ensuring consistent data types across your lakehouse.

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2from pyspark.sql import functions as F
 3from pyspark.sql.types import StructType
 4
 5@dp.temporary_view()
 6def v_customer_standardized():
 7    """Standardize customer schema and data types"""
 8    df = spark.readStream.table("v_customer_raw")  # Default is stream mode
 9
10    # Apply column renaming
11    df = df.withColumnRenamed("c_custkey", "customer_id")
12    df = df.withColumnRenamed("c_name", "customer_name")
13    df = df.withColumnRenamed("c_address", "address")
14
15    # Apply type casting
16    df = df.withColumn("customer_id", F.col("customer_id").cast("BIGINT"))
17    df = df.withColumn("account_balance", F.col("account_balance").cast("DECIMAL(18,2)"))
18    df = df.withColumn("phone_number", F.col("phone_number").cast("STRING"))
19
20    # Strict schema enforcement - select only specified columns
21    # Schema-defined columns (will fail if missing)
22    columns_to_select = [
23        "customer_id",
24        "customer_name",
25        "address",
26        "account_balance",
27        "phone_number"
28    ]
29
30    # Add operational metadata columns only if they exist (optional)
31    available_columns = set(df.columns)
32    metadata_columns = [
33        "_ingestion_timestamp",
34        "_source_file"
35    ]
36    for meta_col in metadata_columns:
37        if meta_col in available_columns:
38            columns_to_select.append(meta_col)
39
40    df = df.select(*columns_to_select)
41
42    return df

Temporary Tables

Temp table transform actions create temporary streaming tables for intermediate processing and reuse across multiple downstream actions.

Option 1: Simple Passthrough

actions:
  - name: create_customer_temp
    type: transform
    transform_type: temp_table
    source: v_customer_processed
    target: customer_intermediate
    readMode: stream
    description: "Create temporary table for customer intermediate processing"

Option 2: With SQL Transformation

actions:
  - name: create_temp_aggregate
    type: transform
    transform_type: temp_table
    source: v_customer_raw
    target: temp_daily_summary
    readMode: stream
    sql: |
      SELECT
        DATE(order_date) as date,
        COUNT(*) as order_count,
        SUM(total_amount) as total_amount
      FROM stream({source})
      GROUP BY DATE(order_date)
    description: "Create temporary aggregate table with daily summaries"

Anatomy of a temp table transform action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - creates temporary table

  • transform_type: Specifies this is a temporary table transformation

  • source: Name of the input view to materialize as temporary table

  • target: Name of the temporary table to create

  • readMode: Either batch or stream - determines table type

  • sql: Optional SQL transformation to apply (inline option)

  • description: Optional documentation for the action

See also

Important

Temp tables are automatically cleaned up when the pipeline completes. Use for complex multi-step transformations where intermediate materialization improves performance.

For instance, if you have a complex transformation that will be used by several downstream actions, you can create a temporary table to prevent the transformation from being recomputed each time.

Warning

When using the sql property with streaming tables (readMode: stream), you must use the stream() function in your SQL query to maintain streaming semantics. Without it, the query will process data in batch mode.

The above YAML examples translate to the following PySpark code

For simple passthrough:

 1from pyspark import pipelines as dp
 2
 3@dp.table(
 4    temporary=True,
 5)
 6def customer_intermediate():
 7    """Create temporary table for customer intermediate processing"""
 8    df = spark.readStream.table("v_customer_processed")
 9
10    return df

For SQL transformation:

 1from pyspark import pipelines as dp
 2
 3@dp.table(
 4    temporary=True,
 5)
 6def temp_daily_summary():
 7    """Create temporary aggregate table with daily summaries"""
 8    df = spark.sql("""
 9        SELECT
10          DATE(order_date) as date,
11          COUNT(*) as order_count,
12          SUM(total_amount) as total_amount
13        FROM stream(v_customer_raw)
14        GROUP BY DATE(order_date)
15    """)
16
17    return df