From Black Box to Glass Box: A Practitioner's Guide to DataStage to Databricks Migration
So, you're moving from DataStage to Databricks. You’re trading a mature, GUI-driven ETL platform for a code-first, distributed computing powerhouse. This is more than a technology swap; it's a fundamental shift in how your team builds, manages, and thinks about data pipelines.
I've been in the trenches on these projects. I've seen teams struggle with the paradigm shift, trying to force DataStage's rigid, stage-based thinking onto Spark's flexible, lazy-evaluation model. I've also seen teams flourish by embracing the new patterns and unlocking performance and agility they never thought possible.
This is not a theoretical guide. This is a collection of hard-won lessons, practical coding patterns, and architectural principles drawn from real-world migration deliveries. We'll bypass the high-level marketing slides and get straight to the code and the "why" behind it.
The Core Paradigm Shift: Visual ETL vs. Code-First DataFrames
The most significant hurdle is mental. DataStage abstracts the "how." You drag a Transformer stage onto a canvas, connect links, and define mappings in a GUI. The underlying C++ code and parallel execution are largely a black box.
Databricks, powered by Apache Spark, is the opposite. It's a glass box. The primary tool is code (typically Python or Scala) and the primary abstraction is the DataFrame. You have complete control over—and responsibility for—the execution logic.
Understanding this difference is step one. Trying to replicate a DataStage job one-for-one, stage-by-stage, in a Databricks notebook is a recipe for unmaintainable, inefficient code. The goal is not to replicate the DataStage job; it's to achieve the same business outcome using idiomatic Databricks and Spark patterns.
Foundational Mapping: Translating DataStage Concepts to Databricks
Before we dive into code, let's establish a common language. When your team talks about a "DataStage Job," what is the equivalent in the Databricks world?
| DataStage Concept | Databricks Equivalent(s) | Practitioner's Nuance |
|---|---|---|
| Project | Databricks Workspace & Repos | A Workspace is the overarching environment. Git-backed Repos within the workspace are where you'll store your code, providing version control that is far superior to DataStage's export/import files. |
| Job (Parallel/Server) | Databricks Job (running a Notebook or Python Wheel) | A Notebook is for development and exploration. A production Databricks Job runs a notebook or a packaged Python/Scala library (a Wheel file) on a specific cluster configuration. Don't confuse the development artifact (notebook) with the production execution entity (job). |
| Job Sequence | Databricks Workflows | Workflows are the direct equivalent, allowing you to chain multiple tasks (jobs) together, define dependencies, and handle conditional logic (e.g., run task B only if task A succeeds). |
| Stage (e.g., Transformer, Filter, Join) | DataFrame Transformation | This is the core shift. A series of stages becomes a series of DataFrame transformations (.select(), .withColumn(), .join(), .filter()). These are not executed immediately; they build a logical plan that Spark optimizes and executes. |
| Link | DataFrame Variable | The lines connecting stages in DataStage are simply DataFrames being passed between transformations in your code. df_filtered = df_raw.filter(...) |
| Parameter Set | Job Parameters & Widgets | Job Parameters are key-value pairs passed at runtime. Notebook Widgets are for interactive parameterization during development. Production jobs should always use Job Parameters for clean execution. |
| Lookup Stage | Broadcast Join | A classic DataStage pattern. In Spark, this is almost always a join operation. For small lookup tables, a broadcast join is the direct, high-performance equivalent. |
| Reject Link | Explicit Filtering and Error Handling Logic | Spark doesn't have a built-in "reject" concept. You build it yourself. This is a good thing. You can filter for bad data, write it to a quarantine table (e.g., bad_records_df.write.saveAsTable(...)), and continue with the good data. It's more transparent and flexible. |
| Data Set / Persistent Stage | Delta Table | A Data Set is a temporary, DataStage-specific file format used to stage data between jobs. The modern, vastly superior equivalent is a Delta Lake table. It provides ACID transactions, time travel, and schema enforcement, making it the bedrock of a reliable data lakehouse. |
Migration Philosophy: Re-engineer, Don't Just Replicate
When faced with hundreds or thousands of DataStage jobs, the first question is always "How do we automate this?" While some automation can help, a blind "lift and shift" is a critical mistake.
The Pitfall of Automated Converters
Several tools claim to convert DataStage job exports (.dsx files) directly into PySpark code.
My Experience: The code they generate is often procedural, verbose, and un-idiomatic. It mimics the stage-by-stage flow of DataStage, creating dozens of intermediate DataFrames and failing to leverage Spark's Catalyst Optimizer effectively. You spend more time refactoring and debugging this machine-generated code than you would have spent writing it correctly from scratch.
- When to use them: For initial analysis. Running a converter on a representative set of jobs can give you a rough idea of the transformation complexity and help in estimation. Use it as a "dictionary" to see how a specific DataStage function might be translated, but do not check the output directly into your repository.
The Recommended Approach: Strategic Re-engineering
Treat this migration as an opportunity to pay down technical debt. Most DataStage estates have jobs that are overly complex, redundant, or built around limitations that no longer exist in the Databricks world.
- Inventory and Rationalize: Group jobs by business function (e.g., Sales Ingestion, Customer Dimension Build). Identify redundancy. You'll often find five slightly different DataStage jobs that can be replaced with a single, parameterized Databricks job.
- Prioritize by Value and Complexity: Start with a job that is medium-complexity but high-value. This gives you a meaningful win and allows the team to learn the new patterns on a manageable scope. Don't start with the simplest "file-to-table" job (too trivial) or the gnarliest, 100-stage monster (too risky).
- Focus on the What, Not the How: For each job, document its business purpose.
- What are the sources? (e.g.,
customers.csv,sales_topicon Kafka) - What are the key business rules? (e.g., "Calculate
order_totalasquantity * price * (1 - discount)," "Flag customers in 'CA', 'OR', 'WA' as 'West Coast'") - What is the final output and who consumes it? (e.g.,
fact_daily_salestable for a Power BI report)
- What are the sources? (e.g.,
- Design and Build Idiomatically: With the "what" defined, design a new solution using Databricks best practices (which we'll cover next). This almost always means a cleaner, simpler, and more efficient pipeline.
Core Coding Practices: From Stages to Spark
Let's get practical. Here's how to translate common DataStage patterns into clean, efficient, and maintainable PySpark code within Databricks.
Reading Data: Beyond the Sequential File Stage
DataStage jobs often start with a Sequential File stage or a database connector. In Databricks, data ingestion is far more powerful.
The DataStage Way
You configure a stage for a specific file path and format. For incremental data, you might have complex pre-job scripts to move files between 'new' and 'processed' directories.
The Databricks Way: Auto Loader and Structured Streaming
For incremental file ingestion, Auto Loader is the gold standard. It's a Structured Streaming source that automatically and efficiently processes new files as they land in cloud storage (S3, ADLS, GCS).
- It's idempotent: It uses a checkpoint location to track which files have been processed, preventing data duplication.
- It handles schema evolution: It can detect when the schema of your source data changes and either stop the job, ignore the new columns, or merge the new schema.
- It's scalable: It can use file notifications (SQS, Event Grid) to scale to millions of incoming files without expensive directory listings.
Example: Ingesting Incremental CSVs with Auto Loader
# Best Practice: Define schema for reliability, but Auto Loader can also infer it.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
# It's better to manage schema explicitly in a production setting
# This can be stored as JSON in a separate config file
source_schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("order_date", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("discount", DoubleType(), True)
])
# Configure the Auto Loader stream
raw_stream_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", "/path/to/schema_checkpoint") # Manages schema evolution
.schema(source_schema)
.load("/path/to/landing_zone/orders/")
)
# Add metadata for traceability
from pyspark.sql.functions import input_file_name, current_timestamp
# This is a common pattern for adding audit columns right at the source
raw_with_metadata_df = raw_stream_df.withColumn("source_file", input_file_name()) \
.withColumn("processing_time", current_timestamp())
# Write the stream to a Bronze Delta table
(raw_with_metadata_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/path/to/write_checkpoint") # Manages write progress
.trigger(availableNow=True) # Runs a micro-batch to process all available data, then stops. Perfect for job-based ETL.
.toTable("bronze_orders")
)
Practitioner's Tip: Notice the trigger(availableNow=True). This transforms a continuous streaming job into a batch-like job, which is exactly how most DataStage sequences operate. It's the perfect pattern for migrating daily or hourly file-based ETL.
Transformations: The Transformer Stage Reimagined
The Transformer is the workhorse of DataStage. It's where you derive new columns, apply functions, and implement business logic.
The DataStage Way
You open a GUI, drag links from input to output columns, and write expressions in a proprietary syntax. It's powerful but hard to version control and test in isolation.
The Databricks Way: Chaining DataFrame Transformations
In Spark, this becomes a sequence of .withColumn() and .select() calls. This code is self-documenting, testable, and version-controllable.
Example: Applying Business Rules
Let's continue with the orders data we ingested. We need to calculate the total price and categorize the order.
from pyspark.sql.functions import col, expr, when
# Assuming we read from our Bronze table in a new notebook or step
silver_df = spark.read.table("bronze_orders")
# 1. Type casting and basic data cleaning
# In DataStage, you'd set column types and handle nulls in the Transformer.
# Here, we do it explicitly.
cleaned_df = silver_df.withColumn("order_date", col("order_date").cast("date")) \
.withColumn("quantity", col("quantity").cast("int")) \
.withColumn("price", col("price").cast("double")) \
.na.fill(value=0, subset=["discount"]) # Replace NULL discounts with 0
# 2. Deriving new columns
# This is equivalent to stage variables and column derivations in the Transformer.
final_df = cleaned_df.withColumn(
"order_total",
# Using expr() allows for SQL-like expressions which are often more readable
expr("quantity * price * (1 - discount)")
).withColumn(
"priority",
# Equivalent to an IF-THEN-ELSE statement in a Transformer derivation
when(col("order_total") > 1000, "High")
.when((col("order_total") <= 1000) & (col("order_total") > 500), "Medium")
.otherwise("Low")
)
# 3. Selecting and renaming columns for the final output
# This is like defining the output link in the Transformer.
gold_df = final_df.select(
col("order_id"),
col("customer_id"),
col("order_date"),
col("order_total").alias("total_revenue"), # Renaming for the business-facing layer
col("priority")
)
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_daily_sales_summary")
Practitioner's Tip: Embrace functional programming. Instead of one monolithic notebook, break down complex logic into Python functions that each take a DataFrame as input and return a transformed DataFrame. This makes your code modular, reusable, and much easier to unit test.
# A better, more modular approach
def calculate_order_total(df):
return df.withColumn("order_total", expr("quantity * price * (1 - discount)"))
def assign_priority(df):
return df.withColumn("priority", when(col("order_total") > 1000, "High").otherwise("Low"))
# In your main script:
# The flow is now clean and readable
processed_df = spark.read.table("bronze_orders") \
.transform(calculate_order_total) \
.transform(assign_priority)
The .transform() method is a hidden gem in Spark that allows you to chain your custom functions like this, making the lineage clear.
Lookups: The Join Stage and Its Powerful Successor
Lookups are fundamental to ETL. You need to enrich your fact data with attributes from your dimension tables.
The DataStage Way
You use a Lookup stage, which typically reads the entire lookup table into memory for each partition of the main data flow. You configure conditions and specify what to do if a lookup fails (reject, continue with nulls, etc.).
The Databricks Way: Broadcast Joins
In Spark, a lookup is just a join. The key to performance is ensuring that small dimension tables are broadcast to every executor. This avoids a costly "shuffle" of the large fact table across the network.
Spark's optimizer is smart and will often broadcast small tables automatically (controlled by spark.sql.autoBroadcastJoinThreshold), but being explicit is a good practice for clarity and guaranteed performance.
Example: Enriching Sales Data with Customer Information
from pyspark.sql.functions import broadcast
# The "fact" table - our large sales data
sales_df = spark.read.table("gold_daily_sales_summary")
# The "dimension" table - our small customer data
customers_df = spark.read.table("dim_customer") # e.g., 50,000 customers
# Check the plan before and after to see the difference!
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # Disable to see the "bad" plan
# A standard left join. For a small customer_df, Spark will likely broadcast it.
# The `broadcast()` function gives the optimizer a strong hint.
enriched_sales_df = sales_df.join(
broadcast(customers_df), # Explicitly telling Spark to broadcast the smaller DataFrame
sales_df["customer_id"] == customers_df["customer_id"],
"left" # Use 'left' to keep all sales records, even if a customer is missing (like a DataStage lookup)
).select(
sales_df["order_id"],
sales_df["total_revenue"],
customers_df["customer_name"],
customers_df["state"],
customers_df["segment"]
)
Practitioner's Tip: Always verify the broadcast happened. After running a join, go to the Spark UI in the Databricks notebook. Look at the query plan for your job. You should see a BroadcastHashJoin or BroadcastNestedLoopJoin. If you see a SortMergeJoin, it means a shuffle occurred, and you need to investigate why (e.g., the dimension table was too large, statistics were stale).
Slowly Changing Dimensions (SCDs): Mastering the MERGE Command
This is one of the most critical ETL patterns to get right. DataStage has dedicated stages (like the Slowly Changing Dimension stage) to handle this. In Databricks, Delta Lake's MERGE statement is the elegant and powerful solution.
SCD Type 1: Overwriting Existing Records
An SCD Type 1 update overwrites the existing dimension record with new values.
The DataStage Way: You'd look up the target, and if a match is found on the business key, you route the flow to an "update" path that overwrites the target record.
The Databricks Way: A MERGE statement with a WHEN MATCHED THEN UPDATE clause.
Example: Updating Customer State (SCD Type 1)
# Target Delta table: dim_customer
# Source DataFrame with updates: customer_updates_df
(spark.table("dim_customer").alias("target")
.merge(
customer_updates_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdate(
set={
"state": "source.state",
"city": "source.city",
"last_updated_ts": "current_timestamp()"
}
)
.whenNotMatchedInsert(
values={
"customer_id": "source.customer_id",
"customer_name": "source.customer_name",
"state": "source.state",
"city": "source.city",
# Additional columns for new records
"is_active": "true",
"effective_start_date": "current_date()",
"effective_end_date": "null",
"last_updated_ts": "current_timestamp()"
}
)
.execute()
)
SCD Type 2: Preserving History
An SCD Type 2 update preserves history by expiring the old record and inserting a new one. This is essential for historical reporting.
The DataStage Way: This requires complex logic with multiple lookups, transformers, and conditional paths to expire old records and insert new ones. It's often slow and difficult to maintain.
The Databricks Way: MERGE handles this with astonishing elegance. The key is to have two source DataFrames: one for new/changed records and one to find records that need expiring. The common pattern is to UNION these into a single source for the MERGE operation.
Example: Implementing Full SCD Type 2 Logic
Assume dim_customer has is_active, effective_start_date, and effective_end_date columns.
# 1. Read the target table and the source updates
target_table = spark.table("dim_customer")
source_updates_df = spark.read.table("staging_customer_updates")
# 2. Identify new and changed records.
# We join the source to the active records in the target.
# A NULL target.customer_id means it's a new record.
# A non-NULL target but different address means it's a changed record.
updates_to_insert = (source_updates_df.alias("source")
.join(
target_table.alias("target"),
(source_updates_df.customer_id == target_table.customer_id) & (target_table.is_active == True),
"leftouter"
)
.where("target.customer_id IS NULL OR (source.address <> target.address)")
.select(
"source.*",
# Action flag to be used in the MERGE statement
expr("'INSERT' as merge_action")
)
)
# 3. Identify records in the target that need to be expired.
# This happens when a record with the same key has changed in the source.
# We do a semi-join (inner join where we only care about existence)
updates_to_expire = (source_updates_df.alias("source")
.join(
target_table.alias("target"),
(source_updates_df.customer_id == target_table.customer_id) & (target_table.is_active == True)
)
.where("source.address <> target.address")
.select(
"target.*",
expr("'EXPIRE' as merge_action")
)
)
# 4. Combine the two sets of changes
all_changes_df = updates_to_expire.unionByName(updates_to_insert)
# 5. Execute the MERGE
# This single, atomic operation handles all inserts and updates.
(target_table.alias("target")
.merge(
all_changes_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdate( # This clause handles the expiration
condition="target.is_active = true AND source.merge_action = 'EXPIRE'",
set={
"is_active": "false",
"effective_end_date": "current_date()"
}
)
.whenNotMatchedInsert( # This clause handles the new/changed records
condition="source.merge_action = 'INSERT'",
values={
"customer_id": "source.customer_id",
"address": "source.address",
"is_active": "true",
"effective_start_date": "current_date()",
"effective_end_date": "null",
# other columns...
}
)
.execute()
)
Practitioner's Tip: This SCD2 pattern is the single most valuable piece of reusable code you will build during a migration. Turn it into a generic function that takes a target table, a source DataFrame, a list of key columns, and a list of columns to track for changes. This one function can then replace hundreds of complex DataStage jobs.
Architectural Best Practices
Good code is not enough. You need a solid architecture to ensure your new data platform is reliable, scalable, and maintainable.
The Medallion Architecture: Your New Job Structure
Forget the arbitrary folder structures of DataStage projects. Embrace the Medallion Architecture. It's a data quality-centric approach for structuring your data lakehouse.
- Bronze (Raw): This is your landing zone. Data is ingested here from source systems with minimal transformation. The schema should match the source as closely as possible, plus audit columns (source file name, processing time). Auto Loader streams land data into Bronze Delta tables.
- Silver (Cleansed, Conformed): This is where the real ETL work happens. You read from Bronze, apply data quality rules, perform type casting, join and enrich data, and conform it into a clean, enterprise-wide view. Your SCD dimension tables live here. The output is one or more Silver Delta tables.
- Gold (Aggregated, Business-Ready): This is the consumption layer. You read from Silver tables to create aggregated, business-level tables or views optimized for analytics and reporting. A
gold_daily_salesorgold_customer_360table would live here.
How it relates to DataStage:
A complex DataStage job sequence often implicitly follows this pattern. A first job lands the raw file. A second job cleans and merges it. A final job creates an aggregate. The Medallion Architecture just formalizes this into distinct, quality-gated layers in your data lake.
Orchestration: From Sequences to Workflows
DataStage Job Sequences are powerful but are tied to the DataStage engine. Databricks Workflows are the cloud-native equivalent.
Best Practices for Workflows:
- Use Multiple Tasks: Don't put your entire Bronze-to-Silver-to-Gold logic in one giant notebook. Create separate notebooks (or better, Python Wheel tasks) for each logical step and orchestrate them as tasks in a workflow.
- Task 1: Ingest from Source to Bronze (runs
ingest_notebook.py) - Task 2: Build Dimensions (runs
build_dims_notebook.py, depends on Task 1) - Task 3: Build Facts (runs
build_facts_notebook.py, depends on Task 2)
- Task 1: Ingest from Source to Bronze (runs
- Pass Parameters Between Tasks: Use Task Values (
dbutils.jobs.taskValues.set() / get()) to pass small pieces of metadata between tasks, like the number of rows processed or a specific run ID. - Use Repair and Rerun: Workflows have built-in capabilities to rerun only the failed tasks and their dependents, which is a massive improvement over manually rerunning DataStage sequences.
- Consider Cluster Configurations: A small ingestion task might only need a small, single-node cluster. A heavy transformation task might need a large, auto-scaling cluster. Workflows allow you to assign the right-sized cluster for each task, optimizing cost.
Configuration Management: Kill the Hardcoding
A common sin in DataStage is hardcoding file paths, database names, and thresholds directly in jobs. This makes moving between Dev, Test, and Prod environments a nightmare.
The Databricks Way: Config Files and Secrets
- Use JSON or YAML Config Files: Store all your environmental configurations (paths, table names, connection strings) in a configuration file. Your code should read this file at runtime.
# Example: config.json
{
"dev": {
"source_path": "/mnt/dev/raw/orders",
"target_database": "sales_dev"
},
"prod": {
"source_path": "/mnt/prod/raw/orders",
"target_database": "sales_prod"
}
}
# In your code
import json
env = dbutils.widgets.get("environment") # e.g., "dev" or "prod"
with open("config.json", "r") as f:
config = json.load(f)[env]
source_path = config["source_path"]
target_db = config["target_database"]
- Use Databricks Secrets for Credentials: Never, ever store passwords, API keys, or connection strings in code or config files. Store them in a Databricks-backed or Azure Key Vault-backed secret scope.
# This is safe and does not expose the password
password = dbutils.secrets.get(scope="my-key-vault-scope", key="sql-db-password")
# Use it in a JDBC connection URL
jdbc_url = f"jdbc:sqlserver://...;password={password}"
CI/CD: From DSX Exports to Git and GitHub Actions
DevOps in DataStage often involves exporting .dsx files, checking them into a version control system, and then manually importing them into the target environment. It's clunky and error-prone.
With a code-first approach, you can implement true CI/CD.
- Databricks Repos: Connect your Databricks workspace to your Git provider (GitHub, Azure DevOps, etc.). This allows developers to work on branches, create pull requests, and have their code reviewed, just like software engineers.
dbxor the Databricks CLI: These tools allow you to bundle your code (notebooks, Python libraries) and workflow definitions (as YAML) and deploy them from a CI/CD pipeline.- A Typical Workflow (with GitHub Actions):
- A developer pushes code to a feature branch.
- They create a Pull Request to merge into
main. - CI: A GitHub Action triggers automatically. It runs unit tests (using
pytest) on the Python functions and integration tests (usingdbx launch) on a temporary job in a Dev Databricks workspace. - CD: Once the PR is approved and merged into
main, another GitHub Action triggers. It usesdbx deployto push the updated job definition and libraries to the Staging or Production Databricks workspace.
This brings a level of rigor, automation, and safety to your ETL development that is simply not achievable in a traditional GUI-based tool.
Case Study: Migrating a Daily Sales Aggregation Job
Let's tie this all together with a real-world example.
The "Before": A Classic DataStage Job Sequence
Business Goal: Create a daily aggregated sales fact table and update the customer dimension with any new addresses found in the daily sales feed.
The DataStage Sequence (seq_daily_sales.pjb):
Job_1_Land_Files: A parallel job that reads asales_YYYYMMDD.csvfile from an FTP server. It performs basic data type validation and writes the output to a persistent Data Set stage calledStagedSales.ds.Job_2_Update_Customer_Dim:- Reads from
StagedSales.ds. - Uses a Lookup stage against the
DIM_CUSTOMERtable in a DB2 database. - A Transformer stage compares the address from the sales file with the address in the dimension.
- If the address is different, it uses the Slowly Changing Dimension stage to perform an SCD Type 2 update on the
DIM_CUSTOMERtable (expire old record, insert new one). This part of the job is notoriously slow.
- Reads from
Job_3_Create_Sales_Fact:- Reads from
StagedSales.dsagain. - Joins with the (now updated)
DIM_CUSTOMERtable to get the correct surrogate key for the customer. - Uses an Aggregator stage to group by date and customer key, summing the sales amount.
- Writes the final result to the
FACT_SALEStable in DB2.
- Reads from
This sequence is fragile. If Job_2 fails, FACT_SALES might be built with incorrect customer keys. The dependency on persistent Data Sets makes it hard to debug and rerun specific parts.
The "After": A Modern Databricks Workflow
Architecture: We will use the Medallion Architecture and a Databricks Workflow.
Databricks Workflow (wf_daily_sales):
Task 1: ingest_sales_to_bronze (Notebook: bronze_ingestion.py)
This task replaces Job_1_Land_Files. It uses Auto Loader to pick up the CSV files.
# bronze_ingestion.py
# Parameters: source_path, bronze_table_name
source_path = dbutils.widgets.get("source_path")
bronze_table = dbutils.widgets.get("bronze_table_name")
schema_location = f"/path/to/checkpoints/{bronze_table}/schema"
write_checkpoint = f"/path/to/checkpoints/{bronze_table}/write"
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaLocation", schema_location)
.load(source_path)
.withColumn("source_file", input_file_name())
.withColumn("processing_ts", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", write_checkpoint)
.trigger(availableNow=True)
.toTable(bronze_table)
)
Task 2: update_customer_dim_silver (Notebook: silver_customer_dim.py)
This task replaces the complex and slow Job_2_Update_Customer_Dim. It depends on the successful completion of Task 1. It uses the powerful SCD Type 2 pattern with MERGE.
# silver_customer_dim.py
# Parameters: bronze_table_name, silver_dim_table_name
bronze_table = dbutils.widgets.get("bronze_table_name")
silver_dim = dbutils.widgets.get("silver_dim_table_name")
# Extract unique customer info from the latest batch of bronze data
source_updates_df = spark.read.table(bronze_table).select(
"customer_id", "customer_name", "customer_address"
).distinct()
# --- Reusable SCD2 Function would be called here ---
# (For brevity, using the logic directly)
# This MERGE statement is atomic, fast, and handles all SCD2 logic in one go.
target_table = spark.table(silver_dim)
# ... [Full SCD Type 2 logic as shown in the previous section] ...
# After the merge, the silver_dim_customer table is perfectly up-to-date.
Task 3: build_sales_fact_gold (Notebook: gold_sales_fact.py)
This task replaces Job_3_Create_Sales_Fact. It depends on Task 2, ensuring the dimension is updated before the fact is built.
# gold_sales_fact.py
# Parameters: bronze_table_name, silver_dim_table_name, gold_fact_table_name
bronze_table = dbutils.widgets.get("bronze_table_name")
silver_dim = dbutils.widgets.get("silver_dim_table_name")
gold_fact = dbutils.widgets.get("gold_fact_table_name")
bronze_df = spark.read.table(bronze_table)
customer_dim_df = spark.read.table(silver_dim).where("is_active = true") # Always join to the active records
# Join to get the dimension surrogate key
# This is a simple, fast join, as the dimension is already correct.
fact_data = bronze_df.join(
customer_dim_df,
"customer_id",
"inner"
).groupBy("order_date", "customer_sk") # Assuming customer_sk is the key in the dim
.agg(
sum("order_total").alias("total_sales")
)
# Append the new day's aggregates to the gold fact table
fact_data.write.format("delta").mode("append").saveAsTable(gold_fact)
Results & Lessons Learned
- Performance: The total run time for the workflow was 20 minutes, down from 2 hours for the DataStage sequence. The SCD Type 2
MERGEoperation, which was the main bottleneck, completed in under 5 minutes. - Reliability: The transactional nature of Delta Lake and the atomic
MERGEoperation eliminated the risk of data inconsistency between the dimension and fact tables. If a step fails, the tables are simply not updated, leaving them in a consistent state. - Maintainability: The code is modular, stored in Git, and uses centralized configuration. Onboarding a new developer is a matter of explaining three Python scripts, not a complex web of proprietary GUI components.
- Cost: By using job clusters that shut down after the workflow completes, the compute cost was significantly lower than maintaining a dedicated, always-on DataStage engine.
This migration was a success not because we translated DataStage stages into PySpark, but because we understood the business objective and re-implemented it using the strengths of the Databricks platform.
The journey from DataStage to Databricks is challenging, but the payoff in terms of performance, scalability, and developer agility is immense. Embrace the code, think in terms of data layers, and focus on re-engineering for the new paradigm. You're not just migrating jobs; you're building a foundation for a modern data platform.