Message Image  

Enhancements in ACE v11 for Integration with a Kafka Server

 View Only
Mon July 27, 2020 09:22 AM

In this article, I will describe how to use the new KafkaRead node and a Kafka policy type which has been added in IBM App Connect Enterprise v11 fixpack 7 for integration with a Kafka server.

Overview of Apache Kafka

Apache Kafka is a distributed streaming platform which allows applications to publish and subscribe to streams of records. Kafka is architected as a cluster of one or more servers. A stream of records is called a ‘topic’. A ‘Producer’ can publish messages to a topic. A ‘Consumer’ can subscribe to a topic and consume the messages that were published.

The above diagram shows that when messages are published on a Kafka topic, the messages are added to the tail of the log. As each message is published on the topic, it is identified by the ‘offset’ of the message in the log. All consumers retrieve messages from the same log, and as messages are consumed they are not destroyed but remain in the log for the pre-defined retention period. The offset is used by consumers to identify their position when receiving messages. The retention period can be defined by time, or by log size.

When a consumer starts, it must tell Kafka from where in the commit log it wishes to start receiving messages. This can either be the earliest available message or the latest message. When a consumer disconnects and then re-connects and it wishes to see all messages published while it was disconnected, then it must remember the offset of the last message it consumed.

To balance load and achieve scalability, Kafka allows topics to be divided into ‘partitions’. Each partition is an ordered, immutable sequence of records that is continually appended to in a structured commit log.

By defining a topic to have multiple partitions, messages published on the topic will be distributed equally amongst the partitions. Consumers in a consumer group will also split equally across the partitions. Kafka manages the distribution of messages and the assignment of consumers.

For more information, see the Apache Kafka documentation.

Kafka Nodes

In IBM Integration Bus v10, the following nodes had been provided:

In IBM App Connect Enterprise v11 fixpack 7, a new mid-flow node has been added:

Kafka Read Node

Using the new Kafka Read node, you can read a message from any offset in the topic.

Typical uses of the Kafka Read node might be:

  • In error processing scenarios to re-process an individual message which had encountered a failure while being processed in a Kafka Consumer message flow.
  • In conjunction with a Timer node where a defined range of messages on a partition on a topic are to be processed.

In this example, the KafkaRead node has values configured in the Basic tab. The details of the bootstrap server are defined in a policy.


The message to be read is identified by topic name, partition number and message offset which are supplied as node properties or as Local Environment overrides.

The Local Environment overrides can be set using any Compute node or using a Mapper.

For example, using a Compute node:

Or using a Mapper:

Kafka Policy

The bootstrap server details can be defined on the node, or in a policy which has policy type of Kafka. The policy is referenced in the Policy tab:

The Kafka policy is created within a Policy Project.

No Match Terminal

If an offset is specified which does not exist, then you can specify what action to take. By default the input message tree is propagated to the No Match terminal.


You can alternatively specify the earliest message or the latest message in the topic. If you change the action to be ‘earliest’, the earliest message on the topic which has not yet expired will be read. If ‘latest’ is specified, then the message flow will block waiting for the next message to be published to the topic.

Merge Options

You can use attributes in the Result tab to indicate:

  • What part of the message data should be read from the offset in the kafka topic.
  • Where the message data should be placed in the message tree that is propagated from the KafkaRead node.

By default the values are:


Using these default values, it means that the entire message is read from the offset in the kafka topic and replaces the input message in the output message tree.

The picture below illustrates what happens to the Output Message tree when the default values are used in the Result tab.

If the input data and result data needed to be merged, then the Result tab could contain:

In this case the result data is read from the element starting with <film> from the offset in the kafka topic and is placed in the output message tree under the <original> element.

The picture below illustrates what happens to the Output Message tree when the default values are used in the Result tab.

Integration with IBM Event Streams

The Kafka nodes can publish and subscribe to topics that are defined in IBM Event Streams. In order to interact with one of these topics, a username and password is required using the SASL_SSL security protocol. I have taken the security credentials from IBM Event Streams and used them to configure a policy.

The policy is referencing a credential called ‘event_streams’. For production, you would normally store the username and password in an encrypted vault. For development purposes and for this demo, I am storing the username and password in a Kafka credential defined in server.conf.yaml:

The message can now be read from a specified offset in the Kafka topic in IBM Event Streams using the Kafka Read node.

If the incorrect username or password details were used, then the Input message tree would be propagated to the failure terminal and the Exception tree contains the BIP message indicating that authentication failed.

Summary

This article describes how to use the new Kafka Read node to read a message from a specified offset in a Kafka topic. The new Kafka policy type has been added which can be used by all Kafka nodes to specify details for the Kafka server that is being used.


#AppConnectEnterprise(ACE)
#ACEV11
#Integration
#kafka
#ApacheKafka