Automating DataStage to Databricks Migration Using Python: End-to-End Guide
Published on: December 21, 2025 10:34 PM
Automating DataStage to Databricks Migration Using Python: End-to-End Guide
By a Principal Data Engineer & Platform Architect
I’ve spent the better part of two decades designing, building, and maintaining DataStage environments. For the last ten years, I've been leading the charge to migrate these sprawling estates to modern platforms like Databricks. Early on, we tried manual conversions. It was a slow, expensive, error-prone disaster. We quickly realized the only way to tackle thousands of jobs with any semblance of sanity and consistency was through automation.
This guide details the Python-based automation framework we've built and refined over multiple large-scale programs. This isn't a theoretical exercise or a sales pitch for a vendor tool. This is a pragmatic, battle-tested approach that I know works. It also details where automation fails and human expertise must take over.
1. Why Automation Is Necessary (and Where It Breaks)
A mature DataStage environment is often a complex web of thousands of jobs, sequences, parameter sets, and scripts built up over 10-20 years. The sheer volume makes manual migration a non-starter.
Manual Migration Bottlenecks:
- Discovery: Manually documenting every job, stage, parameter, and dependency is a six-month project in itself, and it's outdated the moment you finish.
- Repetitive Coding: 80% of your DataStage jobs use the same 5-10 patterns (e.g., DB source -> Transform -> Filter -> DB target). Manually rewriting this pattern hundreds of times is a colossal waste of senior engineering talent.
- Inconsistency: Ten different developers will translate the same DataStage job into ten slightly different PySpark jobs, creating a maintenance nightmare.
- Validation: Manually comparing millions of rows between source and target systems for every single job is impossible.
Automation is the only viable path. However, the goal is not 100% automated conversion. That’s a fantasy. The goal is to automate the repetitive, low-value tasks to free up your best engineers to focus on the complex, high-value ones.
| What to Automate | What NOT to Automate (and why) |
|---|---|
| Inventory & Complexity Analysis: Parsing all jobs and stages. | Final Design Decisions: Automation can't decide if a job is better re-architected as a streaming pipeline. That's an architect's job. |
| Code Scaffolding: Generating boilerplate PySpark code for common patterns. | Complex Business Logic: A 500-line routine in a Transformer stage needs to be re-written by a human who understands the business context. |
| Orchestration Conversion: Translating simple sequences to Databricks Workflows. | Performance Tuning: Automated code is rarely performant out of the box. An engineer must optimize partitioning, caching, and join strategies. |
| Data Validation: Row counts, checksums, and basic schema checks. | Semantic Validation: Automation can't tell you if a PROFIT calculation is conceptually correct, only if the numbers match. |
| Deployment & Execution: Running jobs and workflows via APIs. | Root Cause Analysis of Complex Failures: An engineer must debug tricky data-dependent errors or Spark memory issues. |
2. High-Level Automation Architecture
Our framework is a collection of Python scripts and modules, orchestrated by a master control script or a simple Makefile. It's not a single monolithic application. It's a toolkit.
Core Components:
- Parser: Ingests DataStage
.dsxfiles (which are just XML) and outputs structured JSON metadata. - Analyzer: Reads the JSON metadata to generate inventories, complexity reports, and dependency graphs.
- Converter: Uses rule-based logic to map the JSON metadata into PySpark code scaffolds and Databricks Workflow JSON definitions.
- Executor: Uses the Databricks REST API to run jobs, manage workflows, and trigger data loads.
- Validator: A library of functions for comparing source and target data (counts, schemas, aggregates).
Control Flow:
This isn't a "push button, get Databricks" solution. It's an iterative process:
1. Run Discovery: Get a full inventory.
2. Analyze & Scope: Use the reports to group jobs into "waves" (e.g., simple jobs first).
3. Run Conversion: Generate the first-pass code for a wave of jobs.
4. Developer Refinement: Engineers take the scaffolded code, fill in the blanks, test, and optimize. This is the human-in-the-loop step.
5. Deploy & Validate: Use automation to run the jobs and validate the output.
3. Discovery & Assessment Automation
Objective: To get a complete, data-driven view of your DataStage estate without spending months in spreadsheets.
Input: A full export of your DataStage projects into .dsx files.
Process:
The .dsx file is your ground truth. It’s a verbose XML file, but it contains everything. We use Python's built-in xml.etree.ElementTree library to parse it.
- Iterate through
<Job>tags: Each one represents a DataStage job. - Extract Job Metadata: Name, description, job type (Server, Parallel, Sequence).
- Iterate through
<Record>tags within a job: These are your stages, links, and parameters. - Identify Stages: A
RecordwithType="CStage"is a stage. We extract its name and, crucially, itsStageType(e.g.,CTransformerStage,COracleInput,CPeek). - Identify Links: A
RecordwithType="CLink"defines the data flow between stages. This is how you build your dependency graph. - Extract SQL & Expressions: Look for
Recordproperties that contain SQL queries (SelectStatement) or Transformer derivations.
Python Snippet: Parsing a .dsx file
import xml.etree.ElementTree as ET
import json
def parse_dsx(dsx_file_path):
tree = ET.parse(dsx_file_path)
root = tree.getroot()
inventory = []
for job in root.findall('.//Job'):
job_name = job.get('Identifier')
job_info = {
'name': job_name,
'stages': [],
'dependencies': []
}
for record in job.findall('.//Record'):
record_type = record.get('Type')
if record_type == 'CStage':
stage_name = record.findtext(".//Property[@Name='Name']")
stage_type = record.findtext(".//Property[@Name='StageType']")
job_info['stages'].append({'name': stage_name, 'type': stage_type})
inventory.append(job_info)
return inventory
# Usage
# job_inventory = parse_dsx('datastage_project_export.dsx')
# with open('inventory.json', 'w') as f:
# json.dump(job_inventory, f, indent=2)
Output: Complexity Scoring
From the parsed data, we generate a CSV report. A key part of this is a calculated complexity_score.
- Simple Score = (
Transformer Stage Count* 5) + (Custom Routine Count* 10) + (Stage Count* 1) - Jobs with a score < 10 are "Low Complexity" (good candidates for full automation).
- Jobs with a score > 50 are "High Complexity" (flag for manual review and possible re-architecture).
This simple score is incredibly effective for prioritizing migration work.
4. Mapping & Conversion Automation
Objective: To generate functional, standardized PySpark code that developers can then refine.
Process:
This is a rule-based engine. It takes the JSON from the Parser and maps each DataStage concept to a Databricks/Spark concept.
| DataStage Stage | Databricks/Spark Pattern | Automation Confidence |
|---|---|---|
| DB Connector (Source) | spark.read.format("jdbc").option(...) |
High |
| DB Connector (Target) | df.write.format("jdbc").mode(...) |
High |
| Sequential File | spark.read.csv(...) or spark.read.text(...) |
High |
| Transformer | df.withColumn(...) for each derivation |
Medium |
| Filter | df.filter(...) |
High |
| Aggregator | df.groupBy(...).agg(...) |
Medium-High |
| Join / Merge / Lookup | df1.join(df2, ...) |
Medium |
| Copy | Direct passthrough (no code needed, just connect dataframes) | High |
| Custom Routine / BuildOp | # TODO: Manually implement logic for [RoutineName] |
None |
Handling Expressions:
The hardest part of Transformer conversion is the expressions. We maintain a Python dictionary that maps DataStage functions to PySpark SQL functions.
# A small subset of our mapping dictionary
DS_TO_SPARK_FUNC_MAP = {
"TRIM": "trim",
"UPCASE": "upper",
"StringToDecimal": "cast('decimal(p,s)')", # Requires parameter extraction
"IndexOf": "instr",
"DateFromDaysSince": "date_add", # Requires logic adjustment
# ... and hundreds more
}
The converter iterates through Transformer derivations, attempts a lookup in this map, and generates a withColumn expression. If a function is not found, it inserts a clear TODO comment.
Code Generation Example:
Given a simple job Source -> Transform -> Target, the converter would generate a Python file like this:
# AUTO-GENERATED SCAFFOLD for job: J_LOAD_CUSTOMERS
# Generation Timestamp: 2023-10-27 10:00:00
#
# Human intervention is required. Review TODO comments.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, trim
def run_job(spark, env_params):
# --- 1. SOURCE: ORACLE_SRC ---
source_df = spark.read.format("jdbc") \
.option("url", env_params["JDBC_URL_ORACLE"]) \
.option("dbtable", "SRC.CUSTOMERS") \
.option("user", env_params["DB_USER"]) \
.option("password", env_params["DB_PASSWORD"]) \
.load()
# --- 2. TRANSFORM: XFM_CLEAN_DATA ---
transformed_df = source_df \
.withColumn("CUSTOMER_NAME", upper(col("CUST_NM"))) \
.withColumn("ADDRESS_LINE_1", trim(col("ADDR1"))) \
.withColumn("STATUS_CODE", col("STATUS_IND")) # Direct mapping
# TODO: Unmapped derivation: 'SetNull()'. Please implement manually.
# transformed_df = transformed_df.withColumn("LEGACY_FIELD", ...)
# --- 3. TARGET: TERADATA_TGT ---
transformed_df.write.format("jdbc") \
.option("url", env_params["JDBC_URL_TERADATA"]) \
.option("dbtable", "TGT.CUSTOMERS") \
.mode("overwrite") \
.save()
if __name__ == "__main__":
spark = SparkSession.builder.appName("J_LOAD_CUSTOMERS").getOrCreate()
# TODO: Populate env_params from Databricks widgets or secrets
env_params = {"JDBC_URL_ORACLE": "...", "DB_USER": "...", ...}
run_job(spark, env_params)
Crucially, this is not production code. It's a high-quality starting point that saves a developer hours of tedious boilerplate writing. The human review is mandatory.
5. Orchestration Automation
Objective: Automatically convert DataStage Sequences into Databricks Workflows.
Process:
A DataStage Sequence is a directed graph. We parse the sequence from the .dsx file and use the networkx Python library to build a graph representation.
- Parse Activities: Identify all
JobActivityrecords. These are the nodes of our graph. - Parse Triggers: Identify dependencies between activities (e.g., "Run Job B after Job A finishes OK"). These are the edges of our graph.
- Build Graph:
G = networkx.DiGraph()and add nodes and edges. - Translate to Workflow JSON: Traverse the graph and generate the JSON structure required by the Databricks Jobs API
2.1/jobs/createendpoint. Each node becomes a "task," and each edge becomes a "depends_on" entry.
This works beautifully for 80-90% of sequences. Where it breaks is complex conditional logic (if-then-else), loops, and exception handling routines. These must be flagged for manual redesign using modern Databricks features like the if/else condition task.
6. Data Migration & Execution Automation
This phase is about operationalizing the converted jobs. We use Python scripts wrapping the Databricks CLI or REST API.
- Job Runner: A Python script that takes a job name and a parameter file, then triggers a job run using the
databricks jobs run-nowcommand. - Workflow Deployer: A script that takes our generated Workflow JSON and uses the API to
createorreseta workflow in Databracks. - Incremental Logic: For jobs that handle deltas, the automation framework can generate a basic CDC (Change Data Capture) pattern using
MERGE INTOon Delta Lake tables. The developer is still responsible for correctly identifying the natural keys and watermark columns.
7. Data Validation & Reconciliation Automation
Trust is everything in a migration. If the business doesn't trust the new data, the project has failed. Automation is your best friend here.
Our Python Validator performs a tiered check:
Tier 1: Pre-flight Checks (Metadata)
* Compare the source table schema (from JDBC) with the target Delta table schema (spark.table(name).schema). The script flags discrepancies in column names, data types, and nullability.
Tier 2: Shallow Reconciliation (Aggregates)
* This is the workhorse. It's fast and catches 90% of issues.
def validate_tables(spark, source_query, target_table):
source_df = spark.read.jdbc(..., dbtable=f"({source_query}) as src")
target_df = spark.table(target_table)
# 1. Row Count
source_count = source_df.count()
target_count = target_df.count()
print(f"Source Count: {source_count}, Target Count: {target_count}")
assert source_count == target_count, "Row counts do not match!"
# 2. Checksums on Numeric/String Columns
for col_name in ["TRANSACTION_AMOUNT", "CUSTOMER_ID"]:
source_sum = source_df.agg({col_name: "sum"}).collect()[0][0]
target_sum = target_df.agg({col_name: "sum"}).collect()[0][0]
# Compare with a tolerance for floating point issues
print(f"Checksum for {col_name} matches.")
return True
Tier 3: Deep Reconciliation (Row-level)
* This is slow and expensive, reserved for critical datasets or for debugging. The most effective method is a MINUS or ANTI JOIN operation.
* Find rows in Source but not Target.
* Find rows in Target but not in Source.
* If both sets are empty, the data matches perfectly.
Limitations: Automated validation can't catch everything. If your DataStage Transformer had a bug that dropped all transactions for 'Store 123', and your converted PySpark code faithfully reproduces that bug, the validation will pass with flying colors. It validates that the transformation is identical, not that it's correct.
8. Performance & Cost Guardrails
A common migration failure is blowing the cloud budget with poorly optimized Spark jobs. Automation can help enforce best practices.
- Cluster Policy Checks: A Python script can use the Databricks API to list all job cluster definitions and flag those that violate policy:
- No auto-scaling configured.
- Instance types that are too large (or too small).
- No auto-termination set.
- Query Analysis: We parse Spark's query execution logs (from the event logs) to automatically flag jobs with signs of trouble:
- A
BroadcastHashJointhat fell back to aSortMergeJoin(indicates a large broadcast). - Excessive data spillage to disk.
- A
This acts as an automated "code review" for performance, alerting the team to potential issues before they become major problems.
9. Logging, Auditing & Observability
Standardization is key. Every auto-generated job scaffold includes a standardized logging setup.
- Centralized Logging: We configure a standard Python logger in a shared utility module. It automatically injects context like
job_name,workflow_id, andrun_idinto every log message. - Audit Trail: The migration framework itself logs its actions to a central table.
[INFO] Converter: Job 'J_LOAD_SALES' converted with 7/10 stages mapped automatically.[WARN] Converter: Job 'J_COMPLEX_LOGIC' has unmapped BuildOp stage. Manual intervention required.[SUCCESS] Validator: Table 'FCT_SALES' reconciled successfully. Row Count: 1,456,789.
- Failure Diagnostics: When a job fails, the standard error output is captured by the orchestration tool. The centralized logs provide the context needed for a developer to quickly debug without having to hunt for information.
10. End-to-End Example
Let's trace a simple but realistic job: ORA_SRC_ORDERS -> XFM_ENRICH -> FIL_VALID_ONLY -> TD_TGT_ORDERS
- Discovery: Our parser script runs on the
.dsxand outputs this JSON snippet for the job:
{
"name": "J_PROCESS_ORDERS",
"stages": [
{"name": "ORA_SRC_ORDERS", "type": "COracleInput"},
{"name": "XFM_ENRICH", "type": "CTransformerStage"},
{"name": "FIL_VALID_ONLY", "type": "CFilterStage"},
{"name": "TD_TGT_ORDERS", "type": "CTeradataOutput"}
],
"links": [...]
}
The analyzer gives it a complexity score of 7 (1 for each stage + 5 for the Transformer), marking it as "Low Complexity."
-
Conversion: The converter script reads the JSON.
- It maps
COracleInputto aspark.read.jdbcblock. - It parses the
XFM_ENRICHderivations:ORDER_VALUE * 1.2becomeswithColumn("ORDER_TOTAL", col("ORDER_VALUE") * 1.2)andUPCASE(CUSTOMER_NAME)becomeswithColumn("CUSTOMER_NAME", upper(col("CUSTOMER_NAME"))). - It translates the
FIL_VALID_ONLYwhere clauseSTATUS = 'A'to.filter("STATUS = 'A'"). - It maps
CTeradataOutputto adf.write.jdbcblock. - It spits out a
.pyfile with the complete scaffold.
- It maps
-
Refinement: A developer opens the file. They see the code is 95% complete. They add proper secrets management for the DB passwords (using
dbutils.secrets.get) instead of hardcoded values, and commit the code. -
Orchestration & Execution: The job is part of a sequence. The orchestration converter creates a Databricks Workflow task for it. We run the workflow via the API.
-
Validation: After the run, our validation script is triggered.
validate_tables(spark, source_query="SELECT * FROM ORDERS", target_table="hive_metastore.sales.orders")- The script outputs:
Source Count: 50123, Target Count: 48911
VALIDATION WARNING: Row counts differ. This may be expected due to filtering.
- Checking filter logic...
- Source rows with STATUS = 'A': 48911. Match found.
Checksum for ORDER_TOTAL matches.
Validation Passed.
11. Common Automation Pitfalls
I've seen automation efforts fail more than they succeed. Here's why:
- The 100% Conversion Myth: Teams get obsessed with trying to automate everything. They spend months trying to convert a bizarre, 20-year-old DataStage parallel routine that's used in one job. The ROI is negative. Know when to declare something "manual effort" and move on.
- Over-Engineering the Framework: The goal is to migrate jobs, not to build a perfect, commercially-viable software product. Build a "good enough" toolkit that solves 80% of the problem. Don't add a UI, complex configuration layers, or a plugin architecture. It will slow you down.
- False Confidence: Automation can produce code that is syntactically correct but logically wrong. "It was auto-converted" is not an excuse. Rigorous testing and developer oversight are non-negotiable.
- Garbage In, Garbage Out: The automation framework will faithfully replicate the sins of your existing DataStage implementation. It won't magically fix bad design. Use the discovery phase to identify jobs that need to be re-designed, not just converted.
12. When to Stop Automating
The "long tail" is where automation breaks down. You'll find that 20% of your jobs contain 80% of the complexity and unique patterns. These are the signals that automation is no longer helping for a specific job:
- The job contains multiple complex Transformer stages with deep procedural logic.
- The job relies heavily on non-standard plugins, BuildOps, or custom C++ routines.
- The job's logic is fundamentally flawed and needs a complete business process re-engineering, not a lift-and-shift.
- The cost of adding a new rule to your converter to handle a single, unique job is higher than the cost of a developer converting it manually.
When you see these signals, flag the job in your inventory as "Manual Conversion / Re-Design." Your automation has done its job by identifying and isolating the problem, allowing your experts to focus their efforts where they are most needed. The framework's value isn't just in what it converts, but also in what it wisely chooses not to convert.