How to Build a Python Framework for DataStage to Databricks Migration
Published on: December 25, 2025 08:15 PM
How to Build a Python Framework for DataStage to Databricks Migration
A Principal Engineer's Blueprint for Enterprise-Scale ETL Modernization
For over 15 years, I worked extensively with DataStage, building complex data pipelines for large enterprises. For the last 10, my focus has been on moving those workloads to modern cloud platforms, specifically Databricks. Early on, we realized that manual, one-by-one migrations were a recipe for failure—slow, expensive, and inconsistent. The only viable path forward was through disciplined, robust automation.
This guide is the blueprint for the Python framework we built and refined over numerous successful migrations. It's not a "magic button" solution; such a thing doesn't exist. Instead, it's an engineering accelerator that automates 70-80% of the repetitive work, freeing up your best engineers to focus on the complex 20% that truly requires their expertise.
1. Framework Design Principles
Before writing a single line of code, we established our architectural tenets. A poorly designed framework creates more technical debt than it solves.
- Modular and Decoupled: The framework must be composed of independent Python packages/modules for each phase:
discovery,conversion,orchestration, andvalidation. These modules communicate through well-defined data contracts (typically JSON or YAML files), not direct function calls. This allows a team to work on the validation module without needing to understand the internals of the conversion engine.
[DataStage DSX Exports] -> (Discovery) -> [Metadata JSON Store] -> (Conversion) -> [PySpark Code & Config] -> (Orchestration) -> [Databricks Workflows] -> (Execution) -> [Logs & Data] -> (Validation)
-
Configuration-Driven: No hardcoded paths, server names, or stage mappings. Every aspect of the migration—from source credentials to stage-to-PySpark function mappings—must be externalized into configuration files (we prefer YAML for its readability). This allows you to retarget the entire migration from Dev to QA to Prod by simply switching a config file.
-
Idempotent and Re-runnable: Running the conversion for a job that has already been converted should either overwrite it with the latest version or do nothing if the source hasn't changed. This is critical for CI/CD and iterative development.
-
Extensible by Design: We knew we couldn't predict every DataStage stage or custom routine. We designed the conversion module using a Factory Pattern. A central registry maps DataStage stage types (e.g.,
CTransformerStage,CDb2Connector,CLookupStage) to specific Python converter classes. Adding support for a new stage type means writing a new class and adding one line to the registry, not refactoring the core engine.
2. The Discovery Module: Creating the Bill of Materials
You can't migrate what you don't understand. The goal of this module is to parse DataStage assets and create a structured, machine-readable inventory.
Input: A collection of DataStage job export files (.dsx).
Output: A structured metadata store (e.g., a folder of JSON files, one per job, or a simple database like SQLite).
Implementation Steps:
-
Extract Metadata: The
.dsxfiles are essentially XML. We use Python's built-inxml.etree.ElementTreeto parse them. We're interested inJob,Stage,Link, andRecordtags. We extract properties like stage type, stage name, SQL queries, table names, column derivations, and constraints. -
Dependency Mapping: As we parse, we build a dependency graph for each job. The Python library
networkxis perfect for this. Stages are nodes, and links are edges. This graph is crucial for understanding data flow within a job and later for translating sequences.- We also build a higher-level dependency graph between jobs (e.g., Job B depends on a
DataSetwritten by Job A).
- We also build a higher-level dependency graph between jobs (e.g., Job B depends on a
-
Complexity Scoring: We created a heuristic scoring system to prioritize our migration waves. This is stored in the output JSON for each job.
- Low (1-3): Simple pass-through jobs. E.g.,
DB Connector -> Copy -> Sequential File. - Medium (4-7): Jobs with standard joins, lookups, and simple transformations.
- High (8-10): Jobs with complex
Transformerstages (many derivations, stage variables), obscure function calls, or complex job control logic (loops).
- Low (1-3): Simple pass-through jobs. E.g.,
Python Snippet: Simplified DSX Parser
import xml.etree.ElementTree as ET
import json
def parse_dsx_job(dsx_file_path: str) -> dict:
"""Parses a DSX file and extracts basic job and stage information."""
tree = ET.parse(dsx_file_path)
root = tree.getroot()
job_info = {"name": "", "stages": [], "links": []}
# Find the Job element and its name
job_element = root.find(".//Job")
if job_element:
job_info["name"] = job_element.get("Identifier")
# Find all stages and their types
for stage in root.findall(".//Stage"):
stage_data = {
"name": stage.get("Identifier"),
"type": stage.get("StageType"),
"properties": {} # Placeholder for deep property extraction
}
# A real implementation would dive deep into <Record> tags here
job_info["stages"].append(stage_data)
# Find all links
for link in root.findall(".//Link"):
link_data = {
"source": link.get("SourceStage"),
"target": link.get("TargetStage")
}
job_info["links"].append(link_data)
# In reality, you'd save this to a file or DB
return job_info
# Example usage:
# job_metadata = parse_dsx_job("export_job_01.dsx")
# with open(f"{job_metadata['name']}.json", "w") as f:
# json.dump(job_metadata, f, indent=2)
3. The Conversion Module: The Translation Engine
This is the heart of the framework. It reads the JSON metadata from the Discovery phase and generates runnable PySpark code.
Input: The JSON metadata for a single job.
Output: A parameterized Python/PySpark script and a corresponding YAML configuration file.
Implementation Strategy:
-
Template-based Code Generation: We use
Jinja2as our templating engine. We have a base template for a PySpark job that includes sections for imports, argument parsing, SparkSession creation, and function definitions. The conversion module's job is to fill in the "blanks" in this template. -
The Stage Converter Factory:
- We define a base class
AbstractStageConverter. - We create concrete classes for common stages:
OracleConnectorConverter,TransformerConverter,LookupConverter, etc. - A dictionary maps the stage type string from the metadata to the appropriate class instance.
- We define a base class
# A simplified view of the factory
class TransformerConverter(AbstractStageConverter):
def convert(self, stage_metadata, input_dfs):
# ... complex logic to generate withColumn calls ...
return pyspark_code_string, output_df_name
CONVERTER_REGISTRY = {
"CTransformerStage": TransformerConverter(),
"COracleConnector": OracleConnectorConverter(),
# ... etc.
}
-
Handling Key Stages:
- Connectors (Source/Target): These are the easiest. The
OracleConnectorConvertergeneratesspark.read.format("jdbc")...ordf.write.format("delta").... The connection details are parameterized and read from the job's YAML config. - Transformer Stage: This is the most complex.
- We parse the column derivations one by one.
DSLink.Column -> NewColumnbecomesdf.withColumn("NewColumn", col("Column")).Trim(DSLink.Column)becomesdf.withColumn("NewColumn", trim(col("Column"))).- We maintain a large mapping file (
functions.yml) that translates DataStage functions (TimestampFromDateTime,StringToDecimal) to their PySpark equivalents. If a mapping doesn't exist, the conversion fails with a clear error, prompting an engineer to add it. - Stage variables are handled by chaining
withColumntransformations in the correct order.
- Lookup Stage: This maps directly to a
join. TheLookupConvertergeneratessource_df.join(lookup_df, join_condition, "left"). It correctly identifies the lookup keys and the payload columns. - Filter/Aggregator: These map cleanly to
df.filter()anddf.groupBy().agg().
- Connectors (Source/Target): These are the easiest. The
-
Code Generation Flow: The main conversion script iterates through the job's stages in dependency order (using the graph). It calls the appropriate converter for each stage, passing in the names of the input DataFrames. The converter returns a snippet of PySpark code and the name of its output DataFrame, which becomes the input for the next stage. The final output is a single, clean, and commented PySpark file.
4. The Orchestration Module: From Sequences to Workflows
DataStage jobs rarely run in isolation. This module translates DataStage sequences into multi-task Databricks Workflows.
Input: Metadata for DataStage sequences.
Output: Configuration files for deployment (e.g., Databricks Asset Bundle YAML).
Implementation Strategy:
-
Parse Sequence Logic: Similar to job discovery, we parse exported sequences to understand the flow: start, job activity, conditional logic, and termination.
-
Translate to Workflow Tasks: We use the Databricks Python SDK or generate Databricks Asset Bundle (DABs) YAML files.
- A
Job Activitystage becomes anotebook_taskorspark_python_taskin the workflow. - Dependencies (
linkin DataStage) are translated into thedepends_onproperty of a Databricks task. - Conditional logic (
if-then-else) is mapped to therun_ifconditions in Databricks Workflows (e.g.,ALL_SUCCESS,AT_LEAST_ONE_FAILED).
- A
-
Handling Edge Cases:
- Loops: DataStage loops are notoriously difficult to translate directly. Our framework flags these for manual intervention. The recommendation is almost always to refactor the logic. For example, a loop that processes files can be replaced by a single Databricks job using Auto Loader.
- Exception Handlers: These are mapped to separate Databricks tasks that run on failure conditions.
5. The Validation & Reconciliation Module
Automation is useless if you can't trust the output. This module is non-negotiable for gaining business sign-off.
Input: Pointers to the "legacy" output (e.g., a Teradata table) and the "modern" output (e.g., a Delta table).
Output: A detailed reconciliation report (JSON/HTML).
Implementation Strategy: A standalone Python utility that can be run as part of a CI/CD pipeline or a Databricks Workflow.
-
Level 1: Metadata & Counts:
- Compare row counts.
SELECT COUNT(1) FROM legacy_tablevs.spark.table("new_table").count(). - Compare schemas (column names, data types). Be mindful of type differences (e.g.,
VARCHARvs.string).
- Compare row counts.
-
Level 2: Aggregate Validation:
- For all numeric columns, calculate and compare
SUM(),AVG(),MIN(),MAX(). This catches many transformation errors without a full data scan.
- For all numeric columns, calculate and compare
-
Level 3: Row-level Hash Comparison (The Ultimate Check):
- This is the most definitive but also the most expensive check.
- Create a consistent, concatenated string of all columns for each row (handling nulls carefully).
- Compute a hash (e.g.,
md5orsha256) on this string for both the legacy and new datasets. - The PySpark equivalent of
MINUSorEXCEPTisdf_legacy_hashes.exceptAll(df_new_hashes). If the result of this operation on both sides is zero, the data is identical.
Python Snippet: Conceptual Validator
def validate_datasets(legacy_table_ref, new_table_ref, key_columns):
"""A conceptual data validator."""
report = {}
# L1: Row Counts
legacy_count = run_legacy_sql(f"SELECT COUNT(1) FROM {legacy_table_ref}")
new_count = spark.table(new_table_ref).count()
report["counts"] = {"legacy": legacy_count, "new": new_count, "match": legacy_count == new_count}
if not report["counts"]["match"]:
return report # Stop here if counts don't match
# L2: Aggregate Checks (example for one column)
# ... code to calculate and compare SUM(some_col) ...
# L3: Hash Comparison (simplified)
# This requires reading both datasets into Spark
df_legacy = spark.read.jdbc(...)
df_new = spark.table(new_table_ref)
# Assume a function create_row_hash(df) exists
df_legacy_hashes = create_row_hash(df_legacy)
df_new_hashes = create_row_hash(df_new)
mismatched_in_new = df_new_hashes.exceptAll(df_legacy_hashes).count()
mismatched_in_legacy = df_legacy_hashes.exceptAll(df_new_hashes).count()
report["row_hash_validation"] = {"mismatched_rows": mismatched_in_new + mismatched_in_legacy}
return report
6. Logging, Monitoring, and Observability
This is what makes the framework manageable at scale.
- Framework Logging: Our entire Python framework uses the standard
logginglibrary, configured to output structured JSON logs. We log every decision: "Discovered 50 jobs," "Starting conversion for Job_X," "Mapping DS function 'Abs' to PySpark 'abs'," "Validation failed for Job_Y: Row count mismatch." These logs are shipped to a central location (e.g., an S3 bucket or Elasticsearch). - Job Metrics: We track the outcome of each automated step in a central database table:
(job_name, discovery_status, conversion_status, validation_status, conversion_timestamp). This gives us a real-time dashboard of the migration's progress. - Performance and Cost: The generated PySpark code includes logging for execution time and reads the cluster specs. We correlate this with Databricks cost reports to track the cost per job run.
7. Environment & Deployment Strategy
- Parameterization: We use a hierarchy of YAML config files (
base.yml,dev.yml,prod.yml) managed with a library likeOmegaConf. The generated PySpark code usesargparseto accept an environment name, which determines which config to load. - Secrets: All credentials are stored in Databricks Secrets, backed by Azure Key Vault or AWS Secrets Manager. The generated code references secrets using
dbutils.secrets.get(scope="...", key="..."). - CI/CD Integration: We live and die by our Git repository and CI/CD pipeline (GitHub Actions/Azure DevOps).
- Repo Structure:
datastage_exports/,framework/src/,config/,generated_code/,databricks_workflows/. - Pipeline on Merge to
main:- Lint and test the framework code itself.
- Run the Discovery module on
datastage_exports/. - Run the Conversion module.
- Commit the
generated_code/anddatabricks_workflows/back to the repository. - Use the Databricks CLI or DABs to deploy the updated jobs and workflows to the Dev workspace.
- Trigger the deployed workflow.
- Run the Validation module against the output.
- Report success/failure in the pipeline.
- Repo Structure:
8. Sample End-to-End Workflow
DataStage Job: ORA_SRC_CUSTOMERS (Oracle) -> XFM_CLEAN_NAMES (Transformer) -> SEQ_TGT_CUSTOMERS (Sequential File)
- Transformer Logic:
TRIM(FirstName),UPPER(LastName), createFullNameasFirstName || ' ' || LastName.
-
Discovery: The module runs, producing
ORA_SRC_CUSTOMERS.json. It identifies the job, the Oracle source, the Transformer with 3 derivations, and the target file. Complexity score: 4 (Medium). -
Conversion:
- The conversion engine is invoked:
python -m framework.convert --job-name ORA_SRC_CUSTOMERS. - It uses the
Jinja2template and theOracleConnectorConverterandTransformerConverter. - Generated
ora_src_customers.py(snippet):
- The conversion engine is invoked:
# ... boilerplate SparkSession setup ...
# Read from Oracle source
source_df = (spark.read.format("jdbc")
.option("url", config["source_db"]["url"])
.option("dbtable", "CUSTOMERS")
.option("user", dbutils.secrets.get(scope="...", key="..."))
# ... other options
.load())
# Transformations from XFM_CLEAN_NAMES
transformed_df = (source_df
.withColumn("FirstName_clean", trim(col("FirstName")))
.withColumn("LastName_clean", upper(col("LastName")))
.withColumn("FullName", concat_ws(" ", col("FirstName_clean"), col("LastName_clean")))
)
# Write to Delta Lake target
(transformed_df.write.format("delta")
.mode("overwrite")
.save(config["target"]["path"]))
* **Generated `ora_src_customers.yml`:**
source_db:
url: "jdbc:oracle:thin:@..."
target:
path: "/mnt/data/customers_delta"
-
Orchestration: A simple one-task Databricks Workflow is defined in a DABs YAML file pointing to the generated Python script.
-
Validation: After both jobs run, the validation script compares the legacy sequential file with the new Delta table, checking counts and hashes, and produces a report.
9. Common Challenges and Mitigation
-
Ultra-Complex Transformers: For jobs with hundreds of derivations and obscure functions, the automation will likely fail or produce convoluted code. Mitigation: Our framework is designed to fail gracefully. It flags these jobs as "Manual Conversion Required." The goal is to assist, not to achieve 100% conversion. The generated code, even if incomplete, serves as a great starting point.
-
Data Skew and Performance: The auto-generated PySpark code is functionally correct but may not be performant for skewed data. Mitigation: This is where you need senior engineers. The framework gets the logic right. The engineer then applies performance tuning techniques like salting, repartitioning, or broadcast hints. The framework provides the 80%, the expert provides the crucial 20%.
-
Sequence Loops and Branching: Translating procedural
if/elseandforloops from DataStage sequences is often a fool's errand. Mitigation: We flag these for mandatory manual review and refactoring. The goal is to adopt a cloud-native, declarative pattern, not to lift-and-shift an old one. -
Proprietary Functions/Routines: Many legacy systems have custom-built DataStage routines. Mitigation: Our
functions.ymlmapping file is the key. For each custom routine, an engineer writes an equivalent PySpark UDF and adds the mapping. The framework can then handle it for all future jobs.
10. Best Practices & Maintenance
- Treat the Framework as a Product: It needs a product owner, a backlog, and a regular release cycle. It's a living piece of software, not a one-off script.
- Document Everything: Every module has a
README.md. Every converter class has a docstring explaining what it does and its limitations. We use Sphinx to auto-generate documentation. - Onboard New Team Members: We have a "Day 1" guide for new migration engineers: "How to set up the framework, convert your first job, and interpret the validation report."
- The Feedback Loop is Everything: When an engineer manually fixes a generated job, the first question is always: "Can we teach the framework to handle this pattern?" If yes, a ticket is created to enhance the converter. This continuous improvement is what turns a good framework into a great one.
By embracing this structured, engineering-first approach, you move from a chaotic series of individual migrations to a predictable, scalable, and repeatable modernization factory.