Prerequisites
Before starting this integration, ensure the following prerequisites are in place:
✅ IBM Event Streams
-
- IBM Event Streams is installed and running.
- As part of IBM Cloud Pak for Integration (CP4I) on OpenShift.
- Access to the IBM Event Streams UI.
- Permissions to:
- Deploy Kafka Connect.
- Create and manage KafkaConnector custom resources.
- The Connectivity Pack is installed in the same cluster as IBM Event Streams, either in the same namespace or in a different namespace.
✅ Salesforce
-
- A Salesforce org with:
- Change Data Capture (CDC) enabled (or permission to enable it).
- OAuth 2.0 authorization grant flow enabled for API access.
✅ Slack
-
- A Slack workspace where incoming webhooks can be configured.
✅ Technical Familiarity
This blog assumes basic familiarity with:
-
- Kafka concepts such as topics and partitions.
- Kafka Connect fundamentals.
- Hands-on experience with IBM Event Streams.
- Building a custom Single Message Transform (SMT) and bundling it into a Kafka Connect container image.
- Familiarity with Salesforce Change Data Capture (CDC) events.
✅ Connectivity Pack in IBM Event Streams
The Connectivity Pack acts as an interface between Kafka Connect connectors and external systems you want to connect to. It provides ready-to-use integration runtime that communicate with external services while integrating seamlessly with Event Streams.
The Connectivity Pack
-
- The Connectivity Pack is deployed using Helm Chart.
- Runs inside your OpenShift or Kubernetes cluster.
- It's deployed as a standard kubernetes workload.
- Provides the runtime required to configure and run supported connectors.
- Integrates securely with IBM Event Streams.
For detailed information on installing the Connectivity Pack, supported IBM Event Streams versions, and the list of supported connectors, refer to the official documentation:
https://github.com/ibm-messaging/connectivity-pack-kafka-connectors/blob/main/README.md
Use Case Introduction
The purpose of this use case is to demonstrate how Salesforce CDC events can be ingested into an IBM Event Streams pipeline using Kafka Connect.
The use case is intentionally simple:
- Salesforce acts as the event producer.
- IBM Event Streams acts as the event backbone.
- Slack acts as a lightweight downstream consumer.
This example is not intended to position Slack notifications as the primary value. Instead, it is used to illustrate how events flow once they enter IBM Event Streams through the Salesforce Source Connector provided by the Connectivity Pack.
High-Level Flow
- A support Case is created in Salesforce.
- The connectivity pack Salesforce connector emits a CDC event (CaseChangeEvent).
- The Salesforce Source Connector publishes the event to a Kafka topic.
- The event becomes available to any downstream consumer.
- An HTTP Sink Connector forwards the event to Slack.
ℹ️ Salesforce Change Data Capture (CDC) Overview
Salesforce Change Data Capture (CDC) enables applications to receive near-real-time changes to Salesforce records and synchronize corresponding records in external systems. CDC publishes change events that represent modifications to Salesforce data. Changes include creation of a new record, update of an existing record, deletion of a record and undeletion of a record.
1. Key characteristics:
-
- CDC events are emitted for create, update, delete, and undelete.
- Events include metadata such as:
- Change type.
- Record identifiers.
- Replay ID.
- No custom Apex code is required.
2. CDC in this example:
-
- CDC is enabled on the Case object.
- Salesforce emits CaseChangeEvent whenever a Case is created or updated.
CDC allows Salesforce to integrate naturally with event-driven systems instead of relying on polling or synchronous APIs.
3. Enabling CDC for Salesforce objects:
To enable Change Data Capture in Salesforce:
-
- Go to Setup.
- Navigate to Integration → Change Data Capture.
- Move the required standard or custom object from Available Entities to Selected Entities.
- Click Save.
Salesforce Change Data Capture documentation:
https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm
ℹ️ Slack Configuration
Slack is used only as a simple HTTP-based consumer to visualize events flowing through the pipeline.
1. Slack Channel
-
- A channel such as #support-notifier is created.
- Support team members are added to the channel.
2. Incoming Webhook
-
- A Slack App is created.
- Incoming Webhooks are enabled.
- A webhook URL is generated for the #support-notifier channel.
The webhook URL acts as an HTTP endpoint that can receive JSON payloads via HTTP POST.
Slack webhook documentation:
https://docs.slack.dev/messaging/sending-messages-using-incoming-webhooks/
Kafka Connector configuration in IBM Event Streams
⬅️ Salesforce Source Connector Configuration
Create a Kafka Connector in IBM Event Streams using the following configuration.
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: salesforce-source
labels:
backup.eventstreams.ibm.com/component: kafkaconnector
eventstreams.ibm.com/cluster: kafka-connect
namespace: eem-setup
spec:
autoRestart:
enabled: true
maxRestarts: 2
class: com.ibm.eventstreams.connect.connectivitypack.source.ConnectivityPackSourceConnector
config:
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
connectivitypack.source: salesforce
connectivitypack.source.url: 'https://eu15.salesforce.com'
connectivitypack.topic.name.format: '${object}-${eventType}'
connectivitypack.source.objects: caseChangeEvent
connectivitypack.source.caseChangeEvent.events: CREATED
connectivitypack.source.endpoint.instanceUrlBasicOauth: 'https://eu15.salesforce.com'
connectivitypack.source.credentials.authType: BASIC_OAUTH
connectivitypack.source.credentials.refreshTokenBasicOauth: '${file:/mnt/connectivitypack/sf:refreshtoken}'
connectivitypack.source.credentials.accessTokenBasicOauth: '${file:/mnt/connectivitypack/sf:accesstoken}'
connectivitypack.source.credentials.clientIdentity: '${file:/mnt/connectivitypack/sf:clientid}'
connectivitypack.source.credentials.clientSecret: '${file:/mnt/connectivitypack/sf:clientsecret}'
state: running
tasksMax: 1
For detailed information about the Salesforce Source Connector configuration options, refer to the official documentation:
https://github.com/ibm-messaging/connectivity-pack-kafka-connectors/blob/main/systems/source%20systems/salesforce.md
➡️ HTTP Sink Connector configuration
The HTTP Sink Connector consumes records from a Kafka topic and sends them to an external HTTP endpoint.
In this pipeline:
-
- Source topic: caseChangeEvent-CREATED.
- Target endpoint: Slack Incoming Webhook.
- HTTP method: POST.
The connector demonstrates how IBM Event Streams can push events to external systems without writing custom consumer code.
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: http-sink-connector
labels:
backup.eventstreams.ibm.com/component: kafkaconnector
eventstreams.ibm.com/cluster: kafka-connect
namespace: eem-setup
spec:
autoRestart:
enabled: true
maxRestarts: 2
class: io.aiven.kafka.connect.http.HttpSinkConnector
config:
value.converter: org.apache.kafka.connect.json.JsonConverter
transforms.KeepFields.type: org.apache.kafka.connect.transforms.ReplaceField$Value
topics: caseChangeEvent-CREATED
tasks.max: '1'
transforms: 'KeepFields,CaseToSlack'
http.url: '${file:/mnt/connectivitypack/slack:slackurl}'
transforms.CaseToSlack.type: com.ibm.slacktext.smt.SalesforceCaseSlackText
value.converter.schemas.enable: false
transforms.CaseToSlack.only.create: true
key.converter: org.apache.kafka.connect.storage.StringConverter
http.authorization.type: none
batching.enabled: false
key.converter.schemas.enable: false
http.headers.content.type: application/json
transforms.KeepFields.whitelist: 'CaseNumber,Subject,Priority,Status,Origin,CreatedDate,Product__c,PotentialLiability__c,ChangeEventHeader,event'
state: running
🗨 Single Message Transforms (SMTs)
Salesforce CDC events contain a rich payload, including metadata and fields that may not be required by downstream consumers.
In this pipeline, the HTTP Sink Connector posts events to a Slack Incoming Webhook. Slack notifications work best when the message is compact, readable, and formatted (for example, highlighting Case number, priority, subject, and tagging the support team).
To construct a clean Slack message payload (for example {"text": "...formatted message..."}) directly inside the Kafka Connect pipeline, we use a custom Single Message Transform (SMT).
SalesforceCaseSlackText.java
package com.ibm.slacktext.smt;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.List;
import java.util.Map;
public class SalesforceCaseSlackText<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String CFG_ONLY_CREATE = "only.create";
private boolean onlyCreate = true;
@Override
public void configure(Map<String, ?> configs) {
Object v = configs.get(CFG_ONLY_CREATE);
this.onlyCreate = v == null || Boolean.parseBoolean(v.toString());
}
@Override
public R apply(R record) {
if (record == null || record.value() == null) return record;
Object v = record.value();
// Support schemaless JSON (Map) and schemaful (Struct)
String entity = get(v, "ChangeEventHeader.entityName");
String type = get(v, "ChangeEventHeader.changeType");
if (!"Case".equals(entity)) return record;
if (onlyCreate && !"CREATE".equals(type)) return record;
String caseNumber = get(v, "CaseNumber");
String subject = get(v, "Subject");
String description = get(v, "Description");
String priority = get(v, "Priority");
String status = get(v, "Status");
String origin = get(v, "Origin");
String created = get(v, "CreatedDate");
String product = get(v, "Product__c");
String liability = get(v, "PotentialLiability__c");
String recordId = firstArray(v, "ChangeEventHeader.recordIds");
String text = ":new: *Salesforce Case Created*\n"
+ "*Case:* " + nvl(caseNumber) + " (Id: " + nvl(recordId) + ")\n"
+ "*Priority:* " + nvl(priority) + " | *Status:* " + nvl(status) + " | *Origin:* " + nvl(origin) + "\n"
+ "*Subject:* " + nvl(subject) + "\n"
+ (isBlank(description) ? "" : "*Description:* " + description + "\n")
+ (isBlank(product) ? "" : "*Product:* " + product + "\n")
+ (isBlank(liability) ? "" : "*Potential Liability:* " + liability + "\n")
+ "*Created:* " + nvl(created);
String slackBody = "{\"text\":" + jsonString(text) + "}";
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
Schema.OPTIONAL_STRING_SCHEMA,
slackBody,
record.timestamp());
}
// ---------- helpers ----------
private static String nvl(String s) {
return isBlank(s) ? "(n/a)" : s;
}
private static boolean isBlank(String s) {
return s == null || s.trim().isEmpty();
}
private static String jsonString(String s) {
String esc = s.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\r", "\\r")
.replace("\n", "\\n");
return "\"" + esc + "\"";
}
@SuppressWarnings("unchecked")
private static String get(Object root, String path) {
String[] parts = path.split("\\.");
Object cur = root;
for (String p : parts) {
if (cur == null) return null;
if (cur instanceof Struct st) {
if (st.schema().field(p) == null) return null;
cur = st.get(p);
} else if (cur instanceof Map<?, ?> m) {
cur = ((Map<String, Object>) m).get(p);
} else {
return null;
}
}
return cur == null ? null : String.valueOf(cur);
}
@SuppressWarnings("unchecked")
private static String firstArray(Object root, String path) {
String[] parts = path.split("\\.");
Object cur = root;
for (String p : parts) {
if (cur == null) return null;
if (cur instanceof Struct st) {
if (st.schema().field(p) == null) return null;
cur = st.get(p);
} else if (cur instanceof Map<?, ?> m) {
cur = ((Map<String, Object>) m).get(p);
} else {
return null;
}
}
Object arr = cur;
if (arr instanceof List<?> l && !l.isEmpty()) return String.valueOf(l.get(0));
if (arr instanceof Object[] a && a.length > 0) return String.valueOf(a[0]);
return null;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(CFG_ONLY_CREATE, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW,
"If true, only transform CREATE Case events.");
}
@Override
public void close() { }
@Override
public String toString() {
return "SalesforceCaseSlackText";
}
}
What the SMT Does
The custom SMT performs the following steps:
-
- Accepts Salesforce CDC events coming from the Kafka topic.
- Checks the entityName and processes only Case CREATE events.
- Extracts a small set of useful fields (CaseNumber, Subject, Priority, Status, etc.)
- Constructs the Slack message text in a readable format.
- Outputs a Slack webhook-compatible JSON body (for example: {"text":"..."}), so the HTTP Sink Connector can post it directly to Slack.
End-to-End Data flow
This section explains how a Salesforce Case creation flows through the Salesforce Connector provided by the Connectivity Pack and into IBM Event Streams, before being consumed by downstream systems.
Step 1: Create a Case in Salesforce
A Salesforce Case is created using the Salesforce REST API.
curl -X POST \
https://d2x000005mwipea4-dev-ed.my.salesforce.com/services/data/v59.0/sobjects/Case/ \
-H "Authorization: Bearer $SF_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"Subject": "Issue in mechanical part",
"Description": "Customer reports abnormal noise in mechanical unit during operation.",
"Priority": "High",
"Status": "New",
"Origin": "Web",
"Type": "Mechanical",
"Product__c": "GC3020",
"PotentialLiability__c": "Yes"
}'
Note: Generate a Salesforce access token using OAuth 2.0 and export it in the environment where the curl command is executed.
This API call creates a new Case record in Salesforce.
Response of the above curl command
Step 2: Salesforce Publishes a CDC Event
After the Case is created:
-
- Salesforce generates a Change Data Capture (CDC) event.
- The event type is CaseChangeEvent.
- The event represents a CREATE operation on the Case object.
Step 3: Event Is Published to Kafka topic (Kafka connect)
-
- The Salesforce connector as part of connectivity pack receives the CaseChangeEvent from Salesforce.
- The Kafka connect Salesforce Source Connector publishes the converted event to an IBM Event Streams Kafka topic.
- In this example, the event is written to the topic: caseChangeEvent-CREATED.
Sample Payload from the Kafka topic as viewed in the IBM Event streams UI.
Below is the complete CDC event payload from the Kafka topic.
{
"schema": "SxvVKbyfRU5zmwl5KAZqqg",
"ChangeEventHeader": {
"entityName": "Case",
"recordIds": [
"500Ih000005GDPrIAO"
],
"changeType": "CREATE",
"changeOrigin": "com/salesforce/api/rest/59.0",
"transactionKey": "00061bcd-d629-d18e-8228-636f916b2497",
"sequenceNumber": 1,
"commitTimestamp": 1767106787000,
"commitNumber": 12186067490463,
"commitUser": "0052x0000023wCuAAI",
"changedFields": []
},
"CaseNumber": "00001099",
"ContactId": null,
"AccountId": null,
"AssetId": null,
"SourceId": null,
"BusinessHoursId": null,
"ParentId": null,
"SuppliedName": null,
"SuppliedEmail": null,
"SuppliedPhone": null,
"SuppliedCompany": null,
"Type": "Mechanical",
"Status": "New",
"Reason": null,
"Origin": "Web",
"Subject": "Issue in mechanical part",
"Priority": "High",
"Description": "Customer reports abnormal noise in mechanical unit during operation.",
"IsClosed": false,
"ClosedDate": null,
"IsEscalated": false,
"OwnerId": "0052x0000023wCuAAI",
"IsClosedOnCreate": null,
"CreatedDate": "2025-12-30T14:59:47.000Z",
"CreatedById": "0052x0000023wCuAAI",
"LastModifiedDate": "2025-12-30T14:59:47.000Z",
"LastModifiedById": "0052x0000023wCuAAI",
"AssetWarrantyId": null,
"EngineeringReqNumber__c": null,
"SLAViolation__c": null,
"Product__c": "GC3020",
"PotentialLiability__c": "Yes",
"event": {
"replayId": 5298907
}
}
Once published, the event becomes part of the IBM Event Streams pipeline and is available for consumption by any downstream application or connector.
Step 4: Event Is Delivered to Slack
-
- The HTTP Sink Connector consumes records from the caseChangeEvent-CREATED topic.
- A custom SMT formats the event into a Slack-friendly message.
- The connector sends the message to the Slack support-ticket channel using an Incoming Webhook.
Message in Slack channel Support Notifier.