Data Integration

Data Integration

Connect with experts and peers to elevate technical expertise, solve problems and share insights.

 View Only

Getting started with the watsonx.data integration Python SDK

By John Wen posted 14 hours ago

  

This guide walks through your first SDK workflow including authentication, creating a project, building a simple DataStage flow, running it, and regenerating code from a flow export.

Prerequisites

  • A valid watsonx.data integration instance - Create your free instance here
  • A valid API key for watsonx.data integration
  • Python environment and the SDK installed
    • To install the package run 
    • pip3 install ibm-watsonx-data-integration

You can generate an API key in the platform. Reference the watsonx.data integration SDK documentation for API key and set up help.

1: Set API key 

api_key = 'SET_API_KEY_HERE'

2: Authentication

from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform
from ibm_watsonx_data_integration.services.datastage import * auth = IAMAuthenticator(api_key=api_key)
platform = Platform(auth, base_api_url="https://api.ca-tor.dai.cloud.ibm.com")

3: Create a project

To create a new project, call Platform.create_project(). A name is required. Other fields such as description, tags, public and project_type are optional. If project_type is not provided it defaults to wx.

project = platform.create_project(
    name='My first project',
    description='Building sample batch flows',
    tags=['flow_test_project'],
    public=True,
    project_type='wx'
) project

Expected output

Project(guid='xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', name='My first project')

4: Create a basic RowGenPeek flow

# Flow
flow = project.create_flow(
    name="RowGenPeek",
    environment=None,
    flow_type="datastage"
) # Stages
row_generator = flow.add_stage("Row Generator", "Row_Generator")
row_generator.configuration.runtime_column_propagation = False peek = flow.add_stage("Peek", "Peek")
peek.configuration.runtime_column_propagation = False # Graph
link_1 = row_generator.connect_output_to(peek)
link_1.name = "Link_1" row_generator_schema = link_1.create_schema()
row_generator_schema.add_field("VARCHAR", "COLUMN_1").length(100) project.update_flow(flow)
print("Flow created successfully:", flow.name)

5: Run a job from the flow

row_gen_peek_job = project.create_job(name="RowGenPeek_job", flow=flow) job_start = row_gen_peek_job.start(name="RowGenPeek_job_run", description="") print(f"Job name: {job_start.job_name}") print(f"Job state: {job_start.state}")

6: Regenerate the SDK code for an existing flow

This requires exporting the flow first. Documentation reference

from ibm_watsonx_data_integration.services.datastage.codegen import PythonGenerator code_gen = PythonGenerator()
code_gen.configuration.mode = "file_per_flow"
code_gen.generate(input_path="RowGenPeek.zip", output_path="generated_code")

7: Full working example script

# Set API key api_key = "SET_API_KEY_HERE" # Authentication from ibm_watsonx_data_integration.common.auth import IAMAuthenticator from ibm_watsonx_data_integration import Platform from ibm_watsonx_data_integration.services.datastage import * auth = IAMAuthenticator(api_key=api_key) platform = Platform(auth, base_api_url="https://api.ca-tor.dai.cloud.ibm.com") # Create Project project = platform.create_project( name="My first project", description="Building sample batch flows", tags=["flow_test_project"], public=True, project_type="wx" ) print(project) # Expected: Project(guid='xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', name='My first project') # Create Flow flow = project.create_flow( name="RowGenPeek", environment=None, flow_type="datastage" ) # Stages row_generator = flow.add_stage("Row Generator", "Row_Generator") row_generator.configuration.runtime_column_propagation = False peek = flow.add_stage("Peek", "Peek") peek.configuration.runtime_column_propagation = False # Graph and Schema link_1 = row_generator.connect_output_to(peek) link_1.name = "Link_1" row_generator_schema = link_1.create_schema() row_generator_schema.add_field("VARCHAR", "COLUMN_1").length(100) # Persist flow project.update_flow(flow)
print("Flow created successfully:", flow.name)
# Create and Run Job row_gen_peek_job = project.create_job(name="RowGenPeek_job", flow=flow) job_start = row_gen_peek_job.start( name="RowGenPeek_job_run", description="" ) print("Job name:", job_start.job_name) print("Job state:", job_start.state)

0 comments
34 views

Permalink