You want to use a Kafka client application to consume messages from Kafka topics that are shared in Event Endpoint Management. In this post, I'll give some pointers for what you need to do to configure your application to connect through the Event Gateway.
The Developer Portal displays most of the sample code and connection properties that you need. You can get these from the sample code at the top of the page, or if you're configuring an existing application, from the table at the bottom of the page.

When you run a Kafka client application, part of the connection includes an SSL handshake. For this to be successful, you need to provide your app with a truststore that includes the CA for the certificate being presented by your Kafka cluster. This is something we've written about before, in a Kafka post on How to avoid SSL handshake errors in your Kafka client because of a self-signed cluster CA.
When you're using Event Endpoint Management, your client applications are connecting to the Event Gateway (rather than directly to the back-end Kafka brokers) so you need a truststore that includes the CA for the certificate presented by the Event Gateway. Otherwise the principle is the same as described in that post.
In this post, I'll share a couple of examples for how you can set up the truststore that you will need.
- option 1: retrieve the keystore from the API Manager, and put it into a new truststore
- option 2: make a connection to the Event Gateway, get the certificate that the Gateway presents, and put it into a new truststore
option 1: retrieve the keystore from the API Manager
The apic
CLI can be used to retrieve the keystore used by gateway services.
# -------------------------------------------------------------------
# update these to match your Event Endpoint Management instance
# -------------------------------------------------------------------
NAMESPACE=eventendpointmanagement
INSTANCE=eem
echo "\n\033[1;33m getting SSL/TLS details for Event Gateway in...\033[0m"
echo "namespace : $NAMESPACE"
echo "instance : $INSTANCE"
# -------------------------------------------------------------------
# verify dependencies are all available
# -------------------------------------------------------------------
echo "\n\033[1;33m checking for script dependencies...\033[0m"
check_dependency () {
if hash $1 2>/dev/null; then
echo "verified $1"
else
echo "$1 could not be found"
exit
fi
}
check_dependency "apic"
check_dependency "curl"
check_dependency "jq"
check_dependency "keytool"
check_dependency "oc"
# -------------------------------------------------------------------
# cleanup from previous runs
# -------------------------------------------------------------------
rm my.p12
# -------------------------------------------------------------------
# log into apic CLI
# -------------------------------------------------------------------
echo "\n\033[1;33m logging into apic CLI...\033[0m"
CP4I_NAMESPACE=$(oc get zenservice -A -o jsonpath='{..namespace}')
echo "creating IAM token"
CS_HOST=https://$(oc -n kube-public get cm ibmcloud-cluster-info -o jsonpath='{.data.cluster_address}')
IAM_PASSWORD=$(oc get secret -n ibm-common-services platform-auth-idp-credentials -o jsonpath='{..admin_password}' | base64 -d)
IAM_TOKEN=$(curl -k -s -X POST -H 'Content-Type: application/x-www-form-urlencoded' -H 'Accept: application/json' -d "grant_type=password&username=admin&password=${IAM_PASSWORD}&scope=openid" "${CS_HOST}"/v1/auth/identitytoken | jq -r .access_token)
echo "creating Zen token"
ZEN_HOST=https://$(oc get route -n $CP4I_NAMESPACE cpd -o=jsonpath='{.spec.host}')
ZEN_TOKEN=$(curl -k -s "${ZEN_HOST}"/v1/preauth/validateAuth -H "username: admin" -H "iam-token: ${IAM_TOKEN}" | jq -r .accessToken)
echo "downloading apic config json file"
PLATFORM_API_URL=$(oc get eventendpointmanagers $INSTANCE -n $NAMESPACE -o=jsonpath='{.status.endpoints[?(@.name=="platformApi")].uri}')
TOOLKIT_CREDS_URL="$PLATFORM_API_URL/cloud/settings/toolkit-credentials"
curl -k $TOOLKIT_CREDS_URL -H "Authorization: Bearer ${ZEN_TOKEN}" -H "Accept: application/json" -H "Content-Type: application/json" -o creds.json
yes | apic client-creds:set creds.json
echo "creating apic API key"
APIC_APIKEY=$(curl -k -s -X POST "${PLATFORM_API_URL}"/cloud/api-keys -H "Authorization: Bearer ${ZEN_TOKEN}" -H "Accept: application/json" -H "Content-Type: application/json" -d '{"client_type":"toolkit","description":"Tookit API key"}' | jq -r .api_key)
echo "logging into API manager"
APIM_ENDPOINT=$(oc -n $NAMESPACE get mgmt $INSTANCE-mgmt -o jsonpath="https://{.status.zenRoute}")
yes n | apic login --context provider --server $APIM_ENDPOINT --sso --apiKey $APIC_APIKEY
rm creds.json
# -------------------------------------------------------------------
# setting up truststore
# -------------------------------------------------------------------
echo "\n\033[1;33m retrieving keystore from APIC and putting into a truststore...\033[0m"
apic keystores:get \
--server $APIM_ENDPOINT \
--org admin \
--format json \
tls-server-for-gateway-services-default-keystore \
--output - | jq -r .public_certificate_entry.pem > gateway.pem
keytool -import -noprompt \
-alias gatewayca \
-file gateway.pem \
-keystore my.p12 -storetype pkcs12 \
-storepass password
rm gateway.pem
# -------------------------------------------------------------------
# get Event Gateway connection address
# -------------------------------------------------------------------
echo "\n\033[1;33m querying openshift for gateway connection address...\033[0m"
GATEWAY_ROUTE=$(oc get route -n $NAMESPACE -lapp.kubernetes.io/instance=$INSTANCE-egw -lapp.kubernetes.io/name=event-gateway -o name | grep gw-client)
GATEWAY_ADDRESS=$(oc get $GATEWAY_ROUTE -n $NAMESPACE -o jsonpath="{.spec.host}")
echo "gateway address: $GATEWAY_ADDRESS"
# -------------------------------------------------------------------
# outputting results
# -------------------------------------------------------------------
echo "\n\033[1;33m connection properties:\033[0m"
echo "\033[1m bootstrap.servers=$GATEWAY_ADDRESS:443\033[0m"
echo "\033[1m ssl.truststore.location=my.p12\033[0m"
echo "\033[1m ssl.truststore.type=PKCS12\033[0m"
echo "\033[1m ssl.truststore.password=password\033[0m"
echo "\033[1m ssl.endpoint.identification.algorithm=\033[0m"
option 2: use the certificate presented by the Gateway
An alternative approach is to just make a connection to the Gateway URL and assume that you can trust whatever certificate is presented. This is simpler as it avoids the need to authenticate with the API Manager, however it does depend on you being able to trust your initial connection to the Gateway.
# -------------------------------------------------------------------
# update these to match your Event Endpoint Management instance
# -------------------------------------------------------------------
NAMESPACE=eventendpointmanagement
INSTANCE=eem
echo "\n\033[1;33m getting SSL/TLS details for Event Gateway in...\033[0m"
echo "namespace : $NAMESPACE"
echo "instance : $INSTANCE"
# -------------------------------------------------------------------
# verify dependencies are all available
# -------------------------------------------------------------------
echo "\n\033[1;33m checking for script dependencies...\033[0m"
check_dependency () {
if hash $1 2>/dev/null; then
echo "verified $1"
else
echo "$1 could not be found"
exit
fi
}
check_dependency "keytool"
check_dependency "oc"
check_dependency "openssl"
# -------------------------------------------------------------------
# cleanup from previous runs
# -------------------------------------------------------------------
rm my.p12
# -------------------------------------------------------------------
# get Event Gateway connection address
# -------------------------------------------------------------------
echo "\n\033[1;33m querying openshift for gateway connection address...\033[0m"
GATEWAY_ROUTE=$(oc get route -n $NAMESPACE -lapp.kubernetes.io/instance=$INSTANCE-egw -lapp.kubernetes.io/name=event-gateway -o name | grep gw-client)
GATEWAY_ADDRESS=$(oc get $GATEWAY_ROUTE -n $NAMESPACE -o jsonpath="{.spec.host}")
echo "gateway address: $GATEWAY_ADDRESS"
# -------------------------------------------------------------------
# setting up truststore
# -------------------------------------------------------------------
echo "\n\033[1;33m putting the certificate presented by the Gateway into a truststore...\033[0m"
echo -n | openssl s_client -connect $GATEWAY_ADDRESS:443 -servername $GATEWAY_ADDRESS -showcerts | openssl x509 > bootstrap.crt
keytool -import -noprompt \
-alias bootstrapca \
-file bootstrap.crt \
-keystore my.p12 -storetype pkcs12 \
-storepass password
rm bootstrap.crt
# -------------------------------------------------------------------
# outputting results
# -------------------------------------------------------------------
echo "\n\033[1;33m connection properties:\033[0m"
echo "\033[1m bootstrap.servers=$GATEWAY_ADDRESS:443\033[0m"
echo "\033[1m ssl.truststore.location=my.p12\033[0m"
echo "\033[1m ssl.truststore.type=PKCS12\033[0m"
echo "\033[1m ssl.truststore.password=password\033[0m"
echo "\033[1m ssl.endpoint.identification.algorithm=\033[0m"
Whichever approach you pick, you'll end up with a .p12
file that you can use with your Kafka client application. For example:
kafka-console-consumer.sh \
--bootstrap-server "$GATEWAY_ADDRESS:443" \
--consumer-property "security.protocol=SASL_SSL" \
--consumer-property "sasl.mechanism=PLAIN" \
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"app\" password=\"password\";" \
--topic "YOUR.TOPIC" \
--consumer-property 'client.id=YOUR-CLIENT-ID' \
--consumer-property 'ssl.truststore.location=my.p12' \
--consumer-property 'ssl.truststore.type=PKCS12' \
--consumer-property 'ssl.truststore.password=password' \
--consumer-property 'ssl.endpoint.identification.algorithm='
#eventendpointmanagement