We have just announced the general availability of the IBM watsonx.data integration Python SDK, marking a major milestone for the team.
Starting with new Batch features
Parameter Sets
Parameter Sets allow you to manage configuration variables across your flows, making it easy to switch between environments (dev, test, prod) or manage multi-tenant deployments without modifying your pipeline code.
Example:
# Create and configure parameter set
paramset = project.create_parameter_set('testparamset')
paramset.add_parameter(parameter_type='INTEGER', name='qty', value='100')
paramset.add_parameter(parameter_type='STRING', name='region', value='us-south')
project.update_parameter_set(paramset)
# Retrieve parameter sets
paramsets = project.parameter_sets
paramset = project.parameter_sets.get(name='testparamset')
# Delete parameter set
project.delete_parameter_set(paramset)
Value Sets
Value Sets extend Parameter Sets by letting you define multiple configurations within a single parameter set. This is perfect for scenarios like customer tiers or regional variants where the same parameters need different values.
Example:
# Get parameter set
paramset = project.parameter_sets.get(name='testparamset')
# Create value sets for different customer tiers
premium_set = ValueSet(name='premium_customers').add_value(name='qty', value='1000')
standard_set = ValueSet(name='standard_customers').add_value(name='qty', value='500')
# Add to parameter set
paramset.add_value_set(premium_set)
paramset.add_value_set(standard_set)
project.update_parameter_set(paramset)
Subflows
Subflows are reusable flow components that help you build modular pipelines. Define common processing logic once (like data validation or transformation patterns) and reuse it across multiple flows, reducing duplication and making maintenance easier.
Example:
# Create subflow with entry/exit nodes
subflow = project.create_subflow(name='validation_subflow', description='...')
entry_node = subflow.add_entry_node('Entry_node_1')
exit_node = subflow.add_exit_node('Exit_node_1')
# Add stages and connect
filter_stage = subflow.add_stage("Filter", "Filter_1")
link = entry_node.connect_output_to(filter_stage)
link.name = "Link_1"
# Update and retrieve
project.update_subflow(subflow)
subflows = project.subflows
subflow = project.subflows.get(name='validation_subflow')
# Duplicate and delete
duplicated = project.duplicate_subflow(subflow, name='copy', description='...')
project.delete_subflow(duplicated)
Batch Flow Runtime Settings
Configure job-level runtime behavior directly through the SDK, including warning limits, log retention, and notification preferences. This gives you programmatic control over how your jobs execute and report their status.
# Create job from flow
job = project.create_job(name="my_job", flow=flow, description="Processing job")
# Configure runtime settings
job.edit_configuration(
warn_limit=50,
retention_amount=10,
notify_success=True,
notify_failure=True
)
New Authentication Methods & On-Premises Support
This release adds support for BearerToken, ZenApiKeyAuthenticator, and ICP4DAuthenticator authentication methods, enabling seamless integration with IBM Cloud Pak for Data on-premises deployments. You can now use the SDK across both cloud and on-premises environments with the appropriate authentication method for your infrastructure.
IAMAuthenticator (IBM Cloud)
Use this for IBM Cloud SaaS deployments. It authenticates using your IBM Cloud API key.
Example:
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform
auth = IAMAuthenticator(api_key='your_ibm_cloud_api_key')
platform = Platform(auth=auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')
ICP4DAuthenticator (Cloud Pak for Data - Username/Password)
Use this for on-premises Cloud Pak for Data deployments with username and password authentication.
Example:
from ibm_watsonx_data_integration.common.auth import ICP4DAuthenticator
from ibm_watsonx_data_integration import Platform
auth = ICP4DAuthenticator(
username='your_username',
password='your_password',
url='https://your-cpd-cluster.com'
)
platform = Platform(auth=auth, base_url='https://your-cpd-cluster.com', base_api_url='https://your-cpd-cluster.com')
ZenApiKeyAuthenticator (Cloud Pak for Data - API Key)
Use this for on-premises Cloud Pak for Data deployments when authenticating with a Zen API key instead of username/password.
Example:
from ibm_watsonx_data_integration.common.auth import ZenApiKeyAuthenticator
from ibm_watsonx_data_integration import Platform
auth = ZenApiKeyAuthenticator(
api_key='your_zen_api_key',
url='https://your-cpd-cluster.com'
)
platform = Platform(auth=auth, base_url='https://your-cpd-cluster.com', base_api_url='https://your-cpd-cluster.com')
BearerToken (Advanced Use Cases)
Use this when you already have a valid bearer token from another authentication flow or service.
Example:
from ibm_watsonx_data_integration.common.auth import BearerTokenAuthenticator
from ibm_watsonx_data_integration import Platform
auth = BearerTokenAuthenticator(bearer_token='your_bearer_token')
platform = Platform(auth=auth, base_api_url='https://your-api-url.com')
For a complete guide on getting started with on-premises deployments, see our blog post: Getting Started with the watsonx.data integration Python SDK (DataStage on Cloud Pak for Data).
New Streaming features:
The SDK now supports importing and exporting StreamingFlows. This capability enables users to move streaming pipelines across projects, environments, and teams using a simple programmatic interface. StreamingFlows can be packaged as portable artifacts and restored with full fidelity, including stage configuration and flow metadata.
Exporting
Exporting StreamingFlows packages one or more streaming pipelines into a zip archive containing the full flow definition. The export function returns the file path where the zip was written. This enables users to promote streaming pipelines across environments, create backups, and share reusable pipeline templates without rebuilding flows manually.
Example:
project.export_streaming_flows(flows=project.flows)
Importing
Importing StreamingFlows recreates streaming pipelines from an exported zip archive. The function returns the imported StreamingFlow object or a list of StreamingFlow objects. Import support ensures consistent and repeatable pipeline creation, reducing configuration errors and speeding up environment setup.
Example:
project.import_streaming_flows(source='flows_to_import.zip', conflict_resolution='skip')
Code Generation for Streaming Flows (Reverse Engineering)
This powerful feature allows users to take an existing flow and generate the Python SDK code required to rebuild it. Useful for migration or learning. A user can design a flow visually in the UI and use this tool to see how to write the equivalent SDK code.
Example:
import os
from ibm_watsonx_data_integration.platform import Platform
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration.codegen import PythonGenerator
auth = IAMAuthenticator(api_key=os.getenv('IBM_CLOUD_API_KEY'))
platform = Platform(auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')
project = platform.projects.get(name='Test Project')
flow = project.flows.get(name='Flow name')
generator = PythonGenerator(
source=flow,
destination='/tmp/output.py',
auth=auth, # pragma: allowlist secret
base_api_url='https://api.ca-tor.dai.cloud.ibm.com'
)
generator.save()
Code generation for Streaming Connections
This powerful feature allows users to take an existing streaming connection and generate the Python SDK code required to rebuild it. Useful for migration or learning. A user can create a connection visually in the UI and use this tool to see how to write the equivalent SDK code.
Example:
from ibm_watsonx_data_integration.codegen import PythonGenerator
connection_name = "Connection name"
connection = project.connections.get(name=connection_name)
generator = PythonGenerator(
source=connection,
destination='/tmp/output.py',
auth=auth, # pragma: allowlist secret
base_api_url='https://api.ca-tor.dai.cloud.ibm.com'
)
generator.save()
Added support for managing Project Collaborators
The SDK now supports retrieving, adding, removing, and updating project collaborators, allowing teams to manage access and roles programmatically. Manage project access programmatically by adding, removing, and updating collaborators. This enables automated team management and access control workflows for your data integration projects.
Example:
# Add collaborator to project
project.add_collaborator(user_id='user@example.com', role='Editor')
# Retrieve collaborators
collaborators = project.collaborators
# Update collaborator role
project.update_collaborator(user_id='user@example.com', role='Viewer')
# Remove collaborator
project.remove_collaborator(user_id='user@example.com')
Other updates