In this post, I’ll describe how to use App Connect Enterprise to process Kafka messages that were serialized to a stream of bytes using Apache Avro schemas.
Best practice when using Apache Kafka is to define Apache Avro schemas with a definition of the structure of your Kafka messages.
(For more detail about this, see my last post on From bytes to objects: describing Kafka events, or the intro to Avro that I wrote a couple of years ago.)
In this post, I’m assuming that you have embraced Avro, and you have Kafka topics with messages that were serialized using Avro schemas.
Perhaps you used a Java producer with an Avro SerDe that handled the serialization automatically for you.
Or your messages are coming from a Kafka Connect source connector, with an Avro converter that is handling the serialization for you.
Or you are doing the serialization yourself, such as if you’re producing Avro-serialized messages from a Python app.
Now you want to use IBM App Connect Enterprise to develop and host integrations for processing those Kafka messages. But you need App Connect to know how to:
- retrieve the Avro schemas it needs
- use the schemas to turn the binary stream of bytes on your Kafka topics into structured objects that are easy for ACE to manipulate and process
Using a Java compute node
The simplest way to do this is using a JavaCompute node that serves the same purpose as a SerDe client in a traditional Java Kafka application.
I’ve written a sample to demonstrate how this could be done, which is available on github.
Instructions for how to use the sample are in the README.md in that repository, but I’ll add some additional background and context here.
Configure this node to start consuming messages from your Kafka topics. It will emit BLOB binary messages each containing the raw stream of bytes from a Kafka message.
For example, it could be emitting binary data such as this (which was created by a PostgreSQL Kafka Connect connector from Debezium using an Avro converter).
Screenshot from IBM Event Streams but the principle holds for any Avro-serialized messages on any Kafka cluster
Avro deserialize JavaCompute node
This node will:
Similar to how most SerDe clients work, performance is improved by caching and reusing Avro schemas where appropriate.
If the node cannot deserialize the message data, it emits the raw Kafka message as-is to the alt output terminal, so they can be reviewed or otherwise processed. Possible reasons why this might happen include:
- The schema that was originally used to serialize the message data has since been deleted from the Schema Registry and is no longer available
- The schema registry is not currently available
- Schema registry credentials provided to the node were rejected
I’ve put a lot more comments in the sample code that explains what the deserialize node needs to do, and how you can adapt it for your own requirements.
I’ve put some of the most obvious aspects (like whether to use a binary or json decoder, based on how the original messages were encoded) in a configuration policy.
Other customizations, (such as supporting different schema ID handlers, or working with alternate schema registry implementations) will need some minor code changes – but I’ve put comments and placeholders in the sample to show where to start!