Data Integration

Data Integration

Connect with experts and peers to elevate technical expertise, solve problems and share insights.


#Data
#Data
 View Only

Getting started with the watsonx.data integration Python SDK

By John Wen posted Wed November 05, 2025 04:16 PM

  

This guide walks through your first SDK workflow including authentication, creating a project, building a simple DataStage flow, running it, and regenerating code from a flow export.

Prerequisites

  • A valid watsonx.data integration instance - Create your free instance here
  • A valid API key for watsonx.data integration
  • Python environment and the SDK installed
    • To install the package run 
    • pip3 install ibm-watsonx-data-integration

You can generate an API key in the platform. Reference the watsonx.data integration SDK documentation for API key and set up help.

1: Set API key 

api_key = 'SET_API_KEY_HERE'

2: Authentication

from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform
from ibm_watsonx_data_integration.services.datastage import * auth = IAMAuthenticator(api_key=api_key)
platform = Platform(auth, base_api_url="https://api.ca-tor.dai.cloud.ibm.com")

3: Create a project

To create a new project, call Platform.create_project(). A name is required. Other fields such as description, tags, public and project_type are optional. If project_type is not provided it defaults to wx.

project = platform.create_project(
    name='My first project',
    description='Building sample batch flows',
    tags=['flow_test_project'],
    public=True,
    project_type='wx'
) project

Expected output

Project(guid='xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', name='My first project')

4: Create a basic RowGenPeek flow

# Flow
flow = project.create_flow(
    name="RowGenPeek",
    environment=None,
    flow_type="datastage"
) # Stages
row_generator = flow.add_stage("Row Generator", "Row_Generator")
row_generator.configuration.runtime_column_propagation = False peek = flow.add_stage("Peek", "Peek")
peek.configuration.runtime_column_propagation = False # Graph
link_1 = row_generator.connect_output_to(peek)
link_1.name = "Link_1" row_generator_schema = link_1.create_schema()
row_generator_schema.add_field("VARCHAR", "COLUMN_1").length(100) project.update_flow(flow)
print("Flow created successfully:", flow.name)

5: Run a job from the flow

row_gen_peek_job = project.create_job(name="RowGenPeek_job", flow=flow) job_start = row_gen_peek_job.start(name="RowGenPeek_job_run", description="") print(f"Job name: {job_start.job_name}") print(f"Job state: {job_start.state}")

6: Regenerate the SDK code for an existing flow

This requires exporting the flow first. Documentation reference

from ibm_watsonx_data_integration.services.datastage.codegen import PythonGenerator code_gen = PythonGenerator()
code_gen.configuration.mode = "file_per_flow"
code_gen.generate(input_path="RowGenPeek.zip", output_path="generated_code")

7: Full working example script

# Set API key api_key = "SET_API_KEY_HERE" # Authentication from ibm_watsonx_data_integration.common.auth import IAMAuthenticator from ibm_watsonx_data_integration import Platform from ibm_watsonx_data_integration.services.datastage import * auth = IAMAuthenticator(api_key=api_key) platform = Platform(auth, base_api_url="https://api.ca-tor.dai.cloud.ibm.com") # Create Project project = platform.create_project( name="My first project", description="Building sample batch flows", tags=["flow_test_project"], public=True, project_type="wx" ) print(project) # Expected: Project(guid='xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', name='My first project') # Create Flow flow = project.create_flow( name="RowGenPeek", environment=None, flow_type="datastage" ) # Stages row_generator = flow.add_stage("Row Generator", "Row_Generator") row_generator.configuration.runtime_column_propagation = False peek = flow.add_stage("Peek", "Peek") peek.configuration.runtime_column_propagation = False # Graph and Schema link_1 = row_generator.connect_output_to(peek) link_1.name = "Link_1" row_generator_schema = link_1.create_schema() row_generator_schema.add_field("VARCHAR", "COLUMN_1").length(100) # Persist flow project.update_flow(flow)
print("Flow created successfully:", flow.name)
# Create and Run Job row_gen_peek_job = project.create_job(name="RowGenPeek_job", flow=flow) job_start = row_gen_peek_job.start( name="RowGenPeek_job_run", description="" ) print("Job name:", job_start.job_name) print("Job state:", job_start.state)

0 comments
79 views

Permalink