Data Integration

Data Integration

Connect with experts and peers to elevate technical expertise, solve problems and share insights.

 View Only

What's new in the IBM watsonx.data integration Python SDK

By Michael Dobson posted yesterday

  

Starting with new Batch features

Parameter Sets

Parameter Sets allow you to manage configuration variables across your flows, making it easy to switch between environments (dev, test, prod) or manage multi-tenant deployments without modifying your pipeline code.

Example:

# Create and configure parameter set
paramset = project.create_parameter_set('testparamset')
paramset.add_parameter(parameter_type='INTEGER', name='qty', value='100')
paramset.add_parameter(parameter_type='STRING', name='region', value='us-south')
project.update_parameter_set(paramset)

# Retrieve parameter sets
paramsets = project.parameter_sets
paramset = project.parameter_sets.get(name='testparamset')

# Delete parameter set
project.delete_parameter_set(paramset)

Value Sets

Value Sets extend Parameter Sets by letting you define multiple configurations within a single parameter set. This is perfect for scenarios like customer tiers or regional variants where the same parameters need different values.

Example:

# Get parameter set
paramset = project.parameter_sets.get(name='testparamset')

# Create value sets for different customer tiers
premium_set = ValueSet(name='premium_customers').add_value(name='qty', value='1000')
standard_set = ValueSet(name='standard_customers').add_value(name='qty', value='500')

# Add to parameter set
paramset.add_value_set(premium_set)
paramset.add_value_set(standard_set)
project.update_parameter_set(paramset)

Subflows

Subflows are reusable flow components that help you build modular pipelines. Define common processing logic once (like data validation or transformation patterns) and reuse it across multiple flows, reducing duplication and making maintenance easier.

Example:

# Create subflow with entry/exit nodes
subflow = project.create_subflow(name='validation_subflow', description='...')
entry_node = subflow.add_entry_node('Entry_node_1')
exit_node = subflow.add_exit_node('Exit_node_1')

# Add stages and connect
filter_stage = subflow.add_stage("Filter", "Filter_1")
link = entry_node.connect_output_to(filter_stage)
link.name = "Link_1"

# Update and retrieve
project.update_subflow(subflow)
subflows = project.subflows
subflow = project.subflows.get(name='validation_subflow')

# Duplicate and delete
duplicated = project.duplicate_subflow(subflow, name='copy', description='...')
project.delete_subflow(duplicated)

Batch Flow Runtime Settings

Configure job-level runtime behavior directly through the SDK, including warning limits, log retention, and notification preferences. This gives you programmatic control over how your jobs execute and report their status.

# Create job from flow
job = project.create_job(name="my_job", flow=flow, description="Processing job")

# Configure runtime settings
job.edit_configuration(
    warn_limit=50,
    retention_amount=10,
    notify_success=True,
    notify_failure=True
)

New Authentication Methods & On-Premises Support

This release adds support for BearerToken, ZenApiKeyAuthenticator, and ICP4DAuthenticator authentication methods, enabling seamless integration with IBM Cloud Pak for Data on-premises deployments. You can now use the SDK across both cloud and on-premises environments with the appropriate authentication method for your infrastructure.

IAMAuthenticator (IBM Cloud)

Use this for IBM Cloud SaaS deployments. It authenticates using your IBM Cloud API key.

Example:

from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform

auth = IAMAuthenticator(api_key='your_ibm_cloud_api_key')
platform = Platform(auth=auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')

ICP4DAuthenticator (Cloud Pak for Data - Username/Password)

Use this for on-premises Cloud Pak for Data deployments with username and password authentication.

Example:

from ibm_watsonx_data_integration.common.auth import ICP4DAuthenticator
from ibm_watsonx_data_integration import Platform

auth = ICP4DAuthenticator(
    username='your_username',
    password='your_password',
    url='https://your-cpd-cluster.com'
)
platform = Platform(auth=auth, base_url='https://your-cpd-cluster.com', base_api_url='https://your-cpd-cluster.com')

ZenApiKeyAuthenticator (Cloud Pak for Data - API Key)

Use this for on-premises Cloud Pak for Data deployments when authenticating with a Zen API key instead of username/password.

Example:

from ibm_watsonx_data_integration.common.auth import ZenApiKeyAuthenticator
from ibm_watsonx_data_integration import Platform

auth = ZenApiKeyAuthenticator(
    api_key='your_zen_api_key',
    url='https://your-cpd-cluster.com'
)
platform = Platform(auth=auth, base_url='https://your-cpd-cluster.com', base_api_url='https://your-cpd-cluster.com')

BearerToken (Advanced Use Cases)

Use this when you already have a valid bearer token from another authentication flow or service.

Example:

from ibm_watsonx_data_integration.common.auth import BearerTokenAuthenticator
from ibm_watsonx_data_integration import Platform

auth = BearerTokenAuthenticator(bearer_token='your_bearer_token')
platform = Platform(auth=auth, base_api_url='https://your-api-url.com')

For a complete guide on getting started with on-premises deployments, see our blog post: Getting Started with the watsonx.data integration Python SDK (DataStage on Cloud Pak for Data).

New Streaming features:

The SDK now supports importing and exporting StreamingFlows. This capability enables users to move streaming pipelines across projects, environments, and teams using a simple programmatic interface. StreamingFlows can be packaged as portable artifacts and restored with full fidelity, including stage configuration and flow metadata.

Exporting

Exporting StreamingFlows packages one or more streaming pipelines into a zip archive containing the full flow definition. The export function returns the file path where the zip was written. This enables users to promote streaming pipelines across environments, create backups, and share reusable pipeline templates without rebuilding flows manually.

Example:

project.export_streaming_flows(flows=project.flows)

Importing

Importing StreamingFlows recreates streaming pipelines from an exported zip archive. The function returns the imported StreamingFlow object or a list of StreamingFlow objects. Import support ensures consistent and repeatable pipeline creation, reducing configuration errors and speeding up environment setup.

Example:

project.import_streaming_flows(source='flows_to_import.zip', conflict_resolution='skip')

Code Generation for Streaming Flows (Reverse Engineering)

This powerful feature allows users to take an existing flow and generate the Python SDK code required to rebuild it. Useful for migration or learning. A user can design a flow visually in the UI and use this tool to see how to write the equivalent SDK code.

Example:

import os
from ibm_watsonx_data_integration.platform import Platform
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration.codegen import PythonGenerator

auth = IAMAuthenticator(api_key=os.getenv('IBM_CLOUD_API_KEY'))
platform = Platform(auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')
project = platform.projects.get(name='Test Project')
flow = project.flows.get(name='Flow name')

generator = PythonGenerator(
    source=flow,
    destination='/tmp/output.py',
    auth=auth,  # pragma: allowlist secret
    base_api_url='https://api.ca-tor.dai.cloud.ibm.com'
)

generator.save()

Code generation for Streaming Connections

This powerful feature allows users to take an existing streaming connection and generate the Python SDK code required to rebuild it. Useful for migration or learning. A user can create a connection visually in the UI and use this tool to see how to write the equivalent SDK code.

Example:

from ibm_watsonx_data_integration.codegen import PythonGenerator

connection_name = "Connection name"

connection = project.connections.get(name=connection_name)

generator = PythonGenerator(
    source=connection,
    destination='/tmp/output.py',
    auth=auth,  # pragma: allowlist secret
    base_api_url='https://api.ca-tor.dai.cloud.ibm.com'
)

generator.save()

Added support for managing Project Collaborators

The SDK now supports retrieving, adding, removing, and updating project collaborators, allowing teams to manage access and roles programmatically. Manage project access programmatically by adding, removing, and updating collaborators. This enables automated team management and access control workflows for your data integration projects.

Example:

# Add collaborator to project
project.add_collaborator(user_id='user@example.com', role='Editor')

# Retrieve collaborators
collaborators = project.collaborators

# Update collaborator role
project.update_collaborator(user_id='user@example.com', role='Viewer')

# Remove collaborator
project.remove_collaborator(user_id='user@example.com')

Other updates

  • Bug fixes and improvements

  • Minor updates for enhanced SDK functionality

0 comments
37 views

Permalink