Transform Actions¶
Sub-type |
Purpose |
|---|---|
sql
|
Run an inline SQL statement or external
.sql file. |
python
|
Apply arbitrary PySpark code; useful for complex logic.
|
schema
|
Add, drop, or rename columns, or change data types.
|
data_quality
|
Attach expectations (fail, warn, drop) to validate data.
|
temp_table
|
Create an intermediate temp table or view for re-use.
|
sql¶
SQL transform actions execute SQL queries to transform data between views. They support both inline SQL and external SQL files.
Option 1: Inline SQL
actions:
- name: customer_bronze_cleanse
type: transform
transform_type: sql
source: v_customer_raw
target: v_customer_bronze_cleaned
sql: |
SELECT
c_custkey as customer_id,
c_name as name,
c_address as address,
c_nationkey as nation_id,
c_phone as phone,
c_acctbal as account_balance,
c_mktsegment as market_segment,
c_comment as comment,
_source_file_path,
_source_file_size,
_source_file_modification_time,
_record_hash,
_processing_timestamp
FROM stream(v_customer_raw)
description: "Cleanse and standardize customer data for bronze layer"
Option 2: External SQL File
actions:
- name: customer_enrichment
type: transform
transform_type: sql
source: v_customer_bronze
target: v_customer_enriched
sql_path: "sql/customer_enrichment.sql"
description: "Enrich customer data with additional attributes"
Anatomy of an SQL transform action
name: Unique name for this action within the FlowGroup
type: Action type - transforms data from one view to another
transform_type: Specifies this is an SQL-based transformation
source: Name of the input view to transform
target: Name of the output view to create
sql: SQL statement that defines the transformation logic (inline option)
sql_path: Path to external .sql file (external file option)
description: Optional documentation for the action
See also
For SQL syntax see the Databricks SQL documentation.
Stream syntax: Use
stream(view_name)for streaming transformations
Important
SQL transforms can use stream() function for streaming data or direct view references for batch processing.
Column aliasing and data type transformations are common patterns in bronze layer cleansing.
Note
File Substitution Support
Substitution variables work in both inline SQL and external SQL files (sql_path).
The same ${token} and ${secret:scope/key} syntax from YAML works in .sql files.
Files are processed for substitutions before query execution.
Warning
When writing SQL statements, if your source or target is a streaming table you must use the stream() function.
For example: `` FROM stream(v_customer_raw) ``
Note
File Organization: When using sql_path, the path is relative to your YAML file location.
Common practice is to create a sql/ folder alongside your pipeline YAML files.
The above YAML examples translate to the following PySpark code
For inline SQL:
1from pyspark import pipelines as dp
2
3@dp.temporary_view(comment="Cleanse and standardize customer data for bronze layer")
4def v_customer_bronze_cleaned():
5 """Cleanse and standardize customer data for bronze layer"""
6 return spark.sql("""
7 SELECT
8 c_custkey as customer_id,
9 c_name as name,
10 c_address as address,
11 c_nationkey as nation_id,
12 c_phone as phone,
13 c_acctbal as account_balance,
14 c_mktsegment as market_segment,
15 c_comment as comment,
16 _source_file_path,
17 _source_file_size,
18 _source_file_modification_time,
19 _record_hash,
20 _processing_timestamp
21 FROM stream(v_customer_raw)
22 """)
For external SQL file:
1from pyspark import pipelines as dp
2
3@dp.temporary_view(comment="Enrich customer data with additional attributes")
4def v_customer_enriched():
5 """Enrich customer data with additional attributes"""
6 return spark.sql("""
7 -- Content from sql/customer_enrichment.sql file
8 SELECT
9 c.*,
10 n.name as nation_name,
11 r.name as region_name,
12 CASE
13 WHEN account_balance > 5000 THEN 'High Value'
14 WHEN account_balance > 1000 THEN 'Medium Value'
15 ELSE 'Standard'
16 END as customer_tier
17 FROM ${catalog}.${bronze_schema}.customer c
18 LEFT JOIN ${catalog}.${bronze_schema}.nation n ON c.nation_id = n.nation_id
19 LEFT JOIN ${catalog}.${bronze_schema}.region r ON n.region_id = r.region_id
20 """)
python¶
Python transform actions call custom Python functions to apply complex transformation logic that goes beyond SQL capabilities.
Tip
The framework automatically copies your Python functions into the generated pipeline and handles import management.
actions:
- name: customer_advanced_enrichment
type: transform
transform_type: python
source: v_customer_bronze
module_path: "transformations/customer_transforms.py"
function_name: "enrich_customer_data"
parameters:
api_endpoint: "https://api.example.com/geocoding"
api_key: "${secret:apis/geocoding_key}"
batch_size: 1000
target: v_customer_enriched
readMode: batch
operational_metadata: ["_processing_timestamp"]
description: "Apply advanced customer enrichment using external APIs"
Multiple Source Views Example:
actions:
- name: customer_order_analysis
type: transform
transform_type: python
source: ["v_customer_bronze", "v_orders_bronze"]
module_path: "analytics/customer_analysis.py"
function_name: "analyze_customer_orders"
parameters:
analysis_window_days: 90
min_order_count: 5
target: v_customer_order_insights
readMode: batch
description: "Analyze customer order patterns from multiple sources"
Python Function (transformations/customer_transforms.py):
1import requests
2from pyspark.sql import DataFrame
3from pyspark.sql.functions import col, when, lit, udf
4from pyspark.sql.types import StringType
5
6def enrich_customer_data(df: DataFrame, spark, parameters: dict) -> DataFrame:
7 """Apply advanced customer enrichment using external APIs.
8
9 Args:
10 df: Input DataFrame from source view
11 spark: SparkSession instance
12 parameters: Configuration parameters from YAML
13
14 Returns:
15 DataFrame: Enriched customer data
16 """
17 # Extract parameters from YAML configuration
18 api_endpoint = parameters.get("api_endpoint")
19 api_key = parameters.get("api_key")
20 batch_size = parameters.get("batch_size", 1000)
21
22 # Define UDF for geocoding
23 def geocode_address(address):
24 if not address:
25 return None
26 try:
27 response = requests.get(
28 f"{api_endpoint}?address={address}&key={api_key}",
29 timeout=5
30 )
31 if response.status_code == 200:
32 data = response.json()
33 return data.get("coordinates", {}).get("lat")
34 return None
35 except:
36 return None
37
38 geocode_udf = udf(geocode_address, StringType())
39
40 # Apply transformations
41 enriched_df = df.withColumn(
42 "latitude", geocode_udf(col("address"))
43 ).withColumn(
44 "customer_risk_score",
45 when(col("account_balance") < 0, lit("High"))
46 .when(col("account_balance") < 1000, lit("Medium"))
47 .otherwise(lit("Low"))
48 ).withColumn(
49 "address_normalized",
50 col("address").cast("string").alias("address")
51 )
52
53 return enriched_df
Multiple Sources Function Example (analytics/customer_analysis.py):
1from pyspark.sql import DataFrame
2from pyspark.sql.functions import col, count, sum, avg, datediff, current_date
3from typing import List
4
5def analyze_customer_orders(dataframes: List[DataFrame], spark, parameters: dict) -> DataFrame:
6 """Analyze customer order patterns from multiple source views.
7
8 Args:
9 dataframes: List of DataFrames [customers_df, orders_df]
10 spark: SparkSession instance
11 parameters: Configuration parameters from YAML
12
13 Returns:
14 DataFrame: Customer order insights
15 """
16 customers_df, orders_df = dataframes
17 analysis_window_days = parameters.get("analysis_window_days", 90)
18 min_order_count = parameters.get("min_order_count", 5)
19
20 # Join customers with their orders
21 customer_orders = customers_df.alias("c").join(
22 orders_df.alias("o"),
23 col("c.customer_id") == col("o.customer_id"),
24 "left"
25 )
26
27 # Filter orders within analysis window
28 recent_orders = customer_orders.filter(
29 datediff(current_date(), col("o.order_date")) <= analysis_window_days
30 )
31
32 # Calculate customer insights
33 insights = recent_orders.groupBy(
34 col("c.customer_id"),
35 col("c.customer_name"),
36 col("c.market_segment")
37 ).agg(
38 count("o.order_id").alias("order_count"),
39 sum("o.total_price").alias("total_spent"),
40 avg("o.total_price").alias("avg_order_value")
41 ).filter(
42 col("order_count") >= min_order_count
43 )
44
45 return insights
Anatomy of a Python transform action
name: Unique name for this action within the FlowGroup
type: Action type - transforms data from one view to another
transform_type: Specifies this is a Python-based transformation
source: Source view name(s) to transform (string for single view, list for multiple views)
module_path: Path to Python file containing the transformation function (relative to project root)
function_name: Name of function to call (required)
parameters: Dictionary of parameters to pass to the function (optional)
target: Name of the output view to create
readMode: Either batch or stream - determines execution mode
operational_metadata: Add custom metadata columns (optional)
description: Optional documentation for the action
File Management & Copying Process
Lakehouse Plumber automatically handles Python function deployment:
Automatic File Copying: Your Python functions are copied to
generated/pipeline_name/custom_python_functions/during generationSubstitution Processing: Files are processed for
${token}and${secret:scope/key}substitutions before copyingImport Management: Imports are automatically generated as
from custom_python_functions.module_name import function_nameWarning Headers: Copied files include prominent warnings not to edit them directly
State Tracking: All copied files are tracked and cleaned up when source YAML is removed
Package Structure: A
__init__.pyfile is automatically created to make the directory a Python package
Note
File Substitution Support
Python transform files support the same substitution syntax as YAML:
Environment tokens:
${catalog},${schema},${environment}Secret references:
${secret:scope/key}or${secret:key}
Substitutions are applied before the file is copied and imported.
See also
For PySpark DataFrame operations see the Databricks PySpark documentation.
Custom functions: Concepts & Architecture
Important
Function Requirements: Python functions must accept the appropriate parameters based on source configuration:
Single source:
function_name(df: DataFrame, spark: SparkSession, parameters: dict)Multiple sources:
function_name(dataframes: List[DataFrame], spark: SparkSession, parameters: dict)No sources:
function_name(spark: SparkSession, parameters: dict)(for data generators)
Note
File Organization Tips:
Keep your Python functions in a dedicated folder (e.g.,
transformations/,functions/)Use descriptive function names that clearly indicate their purpose
Always edit the original files in your project, never the copied files in
generated/The
module_pathis relative to your project root directoryMultiple transforms can reference the same Python file with different functions
Warning
DO NOT Edit Generated Files: The copied Python files in custom_python_functions/ are automatically regenerated and include warning headers. Always edit your original source files.
Generated File Structure
After generation, your Python functions appear in the pipeline output with warning headers:
generated/
└── pipeline_name/
├── flowgroup_name.py
└── custom_python_functions/
├── __init__.py
└── customer_transforms.py
Example of Generated File with Warning Header:
# ╔══════════════════════════════════════════════════════════════════════════════╗
# ║ WARNING ║
# ║ DO NOT EDIT THIS FILE DIRECTLY ║
# ╠══════════════════════════════════════════════════════════════════════════════╣
# ║ This file was automatically copied from: transformations/customer_transforms.py ║
# ║ during pipeline generation. Any changes made here will be OVERWRITTEN ║
# ║ on the next generation cycle. ║
# ║ ║
# ║ To make changes: ║
# ║ 1. Edit the original file: transformations/customer_transforms.py ║
# ║ 2. Regenerate the pipeline ║
# ╚══════════════════════════════════════════════════════════════════════════════╝
import requests
from pyspark.sql import DataFrame
# ... rest of your original function code ...
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3from custom_python_functions.customer_transforms import enrich_customer_data
4
5@dp.temporary_view()
6def v_customer_enriched():
7 """Apply advanced customer enrichment using external APIs"""
8 # Load source view(s)
9 v_customer_bronze_df = spark.read.table("v_customer_bronze")
10
11 # Apply Python transformation
12 parameters = {
13 "api_endpoint": "https://api.example.com/geocoding",
14 "api_key": "{{ secret_substituted_api_key }}",
15 "batch_size": 1000
16 }
17 df = enrich_customer_data(v_customer_bronze_df, spark, parameters)
18
19 # Add operational metadata columns
20 df = df.withColumn('_processing_timestamp', current_timestamp())
21
22 return df
For multiple source views:
1from pyspark import pipelines as dp
2from custom_python_functions.customer_analysis import analyze_customer_orders
3
4@dp.temporary_view()
5def v_customer_order_insights():
6 """Analyze customer order patterns from multiple sources"""
7 # Load source views
8 v_customer_bronze_df = spark.read.table("v_customer_bronze")
9 v_orders_bronze_df = spark.read.table("v_orders_bronze")
10
11 # Apply Python transformation with multiple sources
12 parameters = {
13 "analysis_window_days": 90,
14 "min_order_count": 5
15 }
16 dataframes = [v_customer_bronze_df, v_orders_bronze_df]
17 df = analyze_customer_orders(dataframes, spark, parameters)
18
19 return df
data_quality¶
Data quality transform actions apply data validation rules using Databricks DLT expectations. They automatically handle data that fails validation based on configured actions.
actions:
- name: customer_bronze_DQE
type: transform
transform_type: data_quality
source: v_customer_bronze_cleaned
target: v_customer_bronze_DQE
readMode: stream
expectations_file: "expectations/customer_quality.json"
description: "Apply data quality checks to customer data"
Expectations File (expectations/customer_quality.json):
1{
2 "version": "1.0",
3 "table": "customer",
4 "expectations": [
5 {
6 "name": "valid_custkey",
7 "expression": "customer_id IS NOT NULL AND customer_id > 0",
8 "failureAction": "fail"
9 },
10 {
11 "name": "valid_customer_name",
12 "expression": "name IS NOT NULL AND LENGTH(TRIM(name)) > 0",
13 "failureAction": "fail"
14 },
15 {
16 "name": "valid_phone_format",
17 "expression": "phone IS NULL OR LENGTH(phone) >= 10",
18 "failureAction": "warn"
19 },
20 {
21 "name": "valid_account_balance",
22 "expression": "account_balance IS NULL OR account_balance >= -10000",
23 "failureAction": "warn"
24 },
25 {
26 "name": "suspicious_balance",
27 "expression": "account_balance IS NULL OR account_balance < 50000",
28 "failureAction": "drop"
29 }
30 ]
31}
Anatomy of a data quality transform action
name: Unique name for this action within the FlowGroup
type: Action type - transforms data with quality validation
transform_type: Specifies this is a data quality transformation
source: Name of the input view to validate
target: Name of the output view after validation
readMode: Must be stream - data quality transforms require streaming mode
expectations_file: Path to JSON file containing validation rules
description: Optional documentation for the action
Expectation Actions: - fail: Stop the pipeline if any records violate the rule - warn: Log warnings but continue processing all records - drop: Remove records that violate the rule but continue processing
See also
For DLT expectations see the Databricks DLT expectations documentation.
Data quality patterns: Concepts & Architecture
Important
Data quality transforms require readMode: stream and generate DLT streaming tables with built-in quality monitoring.
Use fail for critical business rules, warn for monitoring, and drop for data cleansing.
Note
File Organization: Expectations files are typically stored in an expectations/ folder.
JSON format allows for version control and reuse across multiple pipelines.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4# These expectations will fail the pipeline if violated
5@dp.expect_all_or_fail({
6 "valid_custkey": "customer_id IS NOT NULL AND customer_id > 0",
7 "valid_customer_name": "name IS NOT NULL AND LENGTH(TRIM(name)) > 0"
8})
9# These expectations will drop rows that violate them
10@dp.expect_all_or_drop({
11 "suspicious_balance": "account_balance IS NULL OR account_balance < 50000"
12})
13# These expectations will log warnings but not drop rows
14@dp.expect_all({
15 "valid_phone_format": "phone IS NULL OR LENGTH(phone) >= 10",
16 "valid_account_balance": "account_balance IS NULL OR account_balance >= -10000"
17})
18def v_customer_bronze_DQE():
19 """Apply data quality checks to customer data"""
20 df = spark.readStream.table("v_customer_bronze_cleaned")
21
22 return df
See also
For advanced quarantine mode with Dead Letter Queue (DLQ) recycling — routing failed rows to an external table and automatically recycling fixed records back into the pipeline — see Quarantine (Dead Letter Queue).
schema¶
Schema transform actions apply column mapping, type casting, and schema enforcement to standardize data structures.
Action Format Structure
Schema transform actions use a flat structure with schema definition at the action level:
actions:
- name: standardize_customer_schema
type: transform
transform_type: schema
source: v_customer_raw # Simple view name (string)
target: v_customer_standardized
schema_file: "schemas/bronze/customer_transform.yaml" # OR use inline schema
enforcement: strict # Optional: strict or permissive (default: permissive)
readMode: stream # Optional: stream (default) or batch
description: "Standardize customer schema and data types"
Key Points:
source: Must be a simple string (view name), not a dictionary
schema_file OR schema: Choose one (external file or inline definition)
enforcement: Action-level field (strict or permissive)
readMode: Defaults to stream (not batch)
External Schema Files (Recommended)
External schema files contain only column definitions (no enforcement):
# In schemas/bronze/customer_transform.yaml:
columns:
- "c_custkey -> customer_id: BIGINT" # Rename and cast
- "c_name -> customer_name" # Rename only
- "c_address -> address" # Rename only
- "account_balance: DECIMAL(18,2)" # Cast only
- "phone_number: STRING" # Cast only
Arrow Format Syntax:
"old_col -> new_col: TYPE"- Rename and cast in one line"old_col -> new_col"- Rename only"col: TYPE"- Cast only (no rename)"col"- Pass-through (strict mode only, explicitly keep column)
Schema File Paths:
Schema files can be organized in subdirectories relative to your project root:
# Root level
schema_file: "customer_transform.yaml"
# In schemas/ directory
schema_file: "schemas/customer_transform.yaml"
# Nested subdirectories (recommended for organization by layer)
schema_file: "schemas/bronze/dimensions/customer_transform.yaml"
# Absolute paths also supported
schema_file: "/absolute/path/to/schema.yaml"
Inline Schema (Arrow Format)
For simple transformations, use inline schema with arrow syntax:
- name: standardize_customer_schema
type: transform
transform_type: schema
source: v_customer_raw
target: v_customer_standardized
enforcement: strict
schema_inline: |
c_custkey -> customer_id: BIGINT
c_name -> customer_name
account_balance: DECIMAL(18,2)
phone_number: STRING
Inline Schema (Full YAML Format)
For more complex schemas, use full YAML structure inline:
- name: standardize_customer_schema
type: transform
transform_type: schema
source: v_customer_raw
target: v_customer_standardized
enforcement: strict
schema_inline: |
columns:
- "c_custkey -> customer_id: BIGINT"
- "c_name -> customer_name"
- "account_balance: DECIMAL(18,2)"
Schema Enforcement Modes:
Schema transforms support two enforcement modes (specified at action level):
strict: Only explicitly defined columns are kept in the output. All other columns are dropped. This ensures data quality and prevents schema drift.
permissive (default): Explicitly defined columns are transformed, but all other columns pass through unchanged. Useful when you only want to standardize specific columns.
# Strict enforcement example
- name: transform_strict
type: transform
transform_type: schema
source: v_raw
target: v_clean
enforcement: strict
schema_file: "schemas/transform.yaml"
# Input: c_custkey, c_name, c_address, c_phone, extra_col
# Output: customer_id, customer_name (+ operational metadata)
# ↑ All unmapped columns dropped
# Permissive enforcement example (default)
- name: transform_permissive
type: transform
transform_type: schema
source: v_raw
target: v_clean
enforcement: permissive # or omit (permissive is default)
schema_file: "schemas/transform.yaml"
# Input: c_custkey, c_name, c_address, c_phone, extra_col
# Output: customer_id, customer_name, c_address, c_phone, extra_col
# ↑ All unmapped columns kept
Breaking Change: Old Format No Longer Supported
Warning
The nested format with schema definition inside source is no longer supported and will raise an error:
# OLD FORMAT (NO LONGER WORKS):
source:
view: v_customer_raw
schema_file: "path.yaml"
# NEW FORMAT (REQUIRED):
source: v_customer_raw
schema_file: "path.yaml"
enforcement: strict
If you see an error about “deprecated nested format”, move schema_inline or schema_file to the top level of the action.
Anatomy of a schema transform action
name: Unique name for this action within the FlowGroup
type: Must be
transformtransform_type: Must be
schemasource: Name of the input view to transform (simple string, not a dictionary)
target: Name of the output view with transformed schema
schema_file: Path to external schema file (arrow or legacy format) - use this OR schema_inline
schema_inline: Inline schema definition (arrow or YAML format) - use this OR schema_file
enforcement: Optional - Schema enforcement mode (
strictorpermissive, default:permissive)readMode: Optional - Either
stream(default) orbatch- determines execution modedescription: Optional documentation for the action
See also
For Spark data types see the PySpark SQL types documentation.
Schema evolution: Concepts & Architecture
Important
Schema transforms preserve operational metadata columns automatically. These columns are never renamed or cast, and are always included in the output regardless of enforcement mode. Use schema transforms for standardizing column names and ensuring consistent data types across your lakehouse.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql import functions as F
3from pyspark.sql.types import StructType
4
5@dp.temporary_view()
6def v_customer_standardized():
7 """Standardize customer schema and data types"""
8 df = spark.readStream.table("v_customer_raw") # Default is stream mode
9
10 # Apply column renaming
11 df = df.withColumnRenamed("c_custkey", "customer_id")
12 df = df.withColumnRenamed("c_name", "customer_name")
13 df = df.withColumnRenamed("c_address", "address")
14
15 # Apply type casting
16 df = df.withColumn("customer_id", F.col("customer_id").cast("BIGINT"))
17 df = df.withColumn("account_balance", F.col("account_balance").cast("DECIMAL(18,2)"))
18 df = df.withColumn("phone_number", F.col("phone_number").cast("STRING"))
19
20 # Strict schema enforcement - select only specified columns
21 # Schema-defined columns (will fail if missing)
22 columns_to_select = [
23 "customer_id",
24 "customer_name",
25 "address",
26 "account_balance",
27 "phone_number"
28 ]
29
30 # Add operational metadata columns only if they exist (optional)
31 available_columns = set(df.columns)
32 metadata_columns = [
33 "_ingestion_timestamp",
34 "_source_file"
35 ]
36 for meta_col in metadata_columns:
37 if meta_col in available_columns:
38 columns_to_select.append(meta_col)
39
40 df = df.select(*columns_to_select)
41
42 return df
Temporary Tables¶
Temp table transform actions create temporary streaming tables for intermediate processing and reuse across multiple downstream actions.
Option 1: Simple Passthrough
actions:
- name: create_customer_temp
type: transform
transform_type: temp_table
source: v_customer_processed
target: customer_intermediate
readMode: stream
description: "Create temporary table for customer intermediate processing"
Option 2: With SQL Transformation
actions:
- name: create_temp_aggregate
type: transform
transform_type: temp_table
source: v_customer_raw
target: temp_daily_summary
readMode: stream
sql: |
SELECT
DATE(order_date) as date,
COUNT(*) as order_count,
SUM(total_amount) as total_amount
FROM stream({source})
GROUP BY DATE(order_date)
description: "Create temporary aggregate table with daily summaries"
Anatomy of a temp table transform action
name: Unique name for this action within the FlowGroup
type: Action type - creates temporary table
transform_type: Specifies this is a temporary table transformation
source: Name of the input view to materialize as temporary table
target: Name of the temporary table to create
readMode: Either batch or stream - determines table type
sql: Optional SQL transformation to apply (inline option)
description: Optional documentation for the action
See also
For SDP table types see the Databricks SDP table types documentation.
Intermediate processing: Concepts & Architecture
Important
Temp tables are automatically cleaned up when the pipeline completes. Use for complex multi-step transformations where intermediate materialization improves performance.
For instance, if you have a complex transformation that will be used by several downstream actions, you can create a temporary table to prevent the transformation from being recomputed each time.
Warning
When using the sql property with streaming tables (readMode: stream), you must use the
stream() function in your SQL query to maintain streaming semantics. Without it, the query
will process data in batch mode.
The above YAML examples translate to the following PySpark code
For simple passthrough:
1from pyspark import pipelines as dp
2
3@dp.table(
4 temporary=True,
5)
6def customer_intermediate():
7 """Create temporary table for customer intermediate processing"""
8 df = spark.readStream.table("v_customer_processed")
9
10 return df
For SQL transformation:
1from pyspark import pipelines as dp
2
3@dp.table(
4 temporary=True,
5)
6def temp_daily_summary():
7 """Create temporary aggregate table with daily summaries"""
8 df = spark.sql("""
9 SELECT
10 DATE(order_date) as date,
11 COUNT(*) as order_count,
12 SUM(total_amount) as total_amount
13 FROM stream(v_customer_raw)
14 GROUP BY DATE(order_date)
15 """)
16
17 return df