App Connect

 View Only

Using IBM Cloud Pak for Integration to configure a Kafka schema registry

By Divya Jha posted Wed May 19, 2021 04:07 AM

  

Co-author - Mohammed Asif Kundgol 



A schema registry
is a centralized repository for schemas and metadata. A registry can contain multiple schemas with numerous versions because schemas can evolve over time as you update them to add new structures. If you're already using a schema registry, you can use IBM Cloud Pak for Integration to connect to a registry to retrieve schemas so that you can concentrate on the message transfer rather than the message's data infrastructure.

In IBM Cloud Pak for Integration, AVRO and JSON schema types are supported with Backward/Backward_Transitive compatibilities defined at the global/subject level in the schema registry.

The
Kafka connector supports the following schema registry providers:
  • Confluent
  • Apicurio
Using a schema registry with Kafka in IBM Cloud Pak for Integration
You can configure a schema registry with the Kafka connector in IBM Cloud Pak for Integration to use AVRO/JSON schemas to serialize or deserialize Kafka payloads.
The message/wire format complies with the Confluent SerDes (Serializer/Deserializer) which consists of a magic byte, followed by the schema ID and the serialized payload. 

Connecting to a schema registry
In addition to the credentials required to connect to Kafka, you need to supply the following details to connect to a schema registry:
  • Schema registry type - Select the schema registry that you want to connect to. IBM Cloud Pak for Integration supports Apicurio and Confluent schema registries. This field defaults to Confluent if you provide a schema registry API URL and do not specify a registry type.
  • Schema registry REST API URL - The REST API URL of the schema registry in the format http[s]://<hostname|ip address>:<port>. 
  • Schema registry username - The username of the schema registry.
  • Schema registry password - The password of the schema registry.
  • Schema registry CA certificate - The certificate of the schema registry certificate authority (CA) in PEM format.
             
Figure 1: Image to show the Kafka and schema registry fields

        Info: You only need to provide a schema registry username and password if you are trying to interact with an authenticated schema registry REST API. If not, the schema registry REST API URL on its own will suffice.

        Interacting with a schema registry
        The following examples demonstrate how you can interact with a schema registry in IBM Cloud Pak for Integration

        1. Sending messages from a schema registry
          To send a message based on a specific schema format, you need to use the Kafka send message action. The following method describes the steps for an event-driven flow.
        • In the IBM Cloud Pak for Integration Designer dashboard, create an event-driven flow.
        • Enter a name that identifies the purpose of your flow.
        • Select a trigger node.
        • For the action node, select the Kafka account using the schema registry that you want to work with, and then select Kafka > Send message.
        • Select the Kafka topic that you want to work with.
        • Click the dropdown arrow in the *Payload field. You are prompted to select a schema format as shown in the figure below based on the schema registry you are logged in with. The schema types available for the schema registry are AVRO or JSON.

          Figure 2: Image to show a list of available schema formats
          • Select the Subject name and Schema version from the window that is displayed and click Apply.

              Figure 3: Image to show the Schema settings window

          • If there is only one subject name/schema version then it will be selected by default.
          • If you want to change the schema, you can do that by clicking Regenerate schema. The Schema settings window will be presented again and you can change your settings to use a different schema.
                      
          Figure 4:  Image to show Regenerate schema  button

                 
                 2. Consuming messages from a schema registry 
                      To consume messages from a specific schema registry, you need to use the Kafka new message event node. This enables you to pull in the specified registry so that you can                     map it from an action node.
                 
          • From the IBM Cloud Pak for Integration Designer dashboard, click New > Event-driven flow to open the flow editor.
          • Enter a name that identifies the purpose of your flow.
          • In the Applications tab, select Kafka.
          • Select the Kafka account using the schema registry that you want to work with and then click New message. The event node is added to the flow and you are prompted to select the Kafka topic you are listening for messages on.
          • Select the Kafka topic that you want to work with.
          • Click the Select output schema button.
                      
          Figure 5: Image to show Select output schema button


          • Select the output schema by selecting schema type, subject name, and schema version from the Schema settings window. The schema types available are AVRO or JSON.

            Figure 6:  Image to show the Schema settings window

          • Click Apply
          • If you want to change the output schema, you can do that by clicking on Change output schema. The Schema settings window will be presented again and you can change your settings to use a different schema.

            Figure 7: Image to show the Change output schema button

            Use case scenario
            A company wants to build a lead management system to handle leads coming in from various channels with varying levels of interest in their products. The company's marketing team then wants to launch tailored marketing campaigns and nurture programs based on the leads’ category generated via various touchpoints. The leads can be generated via the company's website visitors, showroom walk-ins, social media campaigns, business partner-driven leads, and corporate partnership leads, etc. Due to the wide-ranging touchpoints, the details and structure of the data captured about leads can vary significantly. The leads need to be processed and classified into different buckets based on the marketing criteria and this information needs to be stored in the company's leads database.
            The company has adopted Salesforce as their CRM to capture the leads and IBM Db2 is the back-end database where the final leads reside after classification. It is this leads data that will be used by marketing and sales teams for nurturing the leads to further stages of lead conversion. 
            To achieve this use case, the company adopts IBM Cloud Pak for Integration to integrate the applications they use which are Salesforce, Kafka, and IBM Db2. Leads generated from Salesforce and other sources are synced to Kafka topics using different schemas based on the source of the leads and the details captured etc. In the breakdown below we will see how the company architect designs two event-driven flows to seamlessly integrate Salesforce with IBM Db2 via Kafka, adhering to a specified schema.
            The schema used in this scenario and displayed below in Figure 8 is called Leads-events.

            Figure 8: The Leads-events schema

            Figure 9 below displays the first event-driven flow called WriteLeadsEventsToKafka. This flow uses a Salesforce New lead trigger node and a Kafka Send message action node. This flow will sync leads to Kafka based on the AVRO Leads-event schema (displayed in Figure 8). This schema will be used to encode the message and once it has been selected the appropriate Salesforce fields can be mapped to it. So every time a new lead is found for this schema, Kafka will send a message to the Leads-value topic which has been selected to store the leads’ details.

            Figure 9: Event-driven flow to capture new leads

            Figure 10 below displays the second event-driven flow called ReadLeadsEventFromKafkaAndWriteToDatabase. This flow has a Kafka New message trigger node and a JDBC Custom Query action node. Notice that the Kafka New message node has the same schema selected (Leads-events) as the flow in Figure 9 to decode the message sent by the first event-driven flow. When a new Kafka message is received on the Leads-value topic, a JDBC Custom query action with an INSERT query is performed on the Salesforce leads table in IBM Db2. This action will insert the leads’ details into the corresponding table by mapping values from the Leads-events output schema.

            Figure 10: Event-driven flow to insert lead details into the Salesforce leads table

            Figure 11 below displays the leads table in IBM Db2 with the data adhering to the Leads-events schema and ready to be used by the marketing and sales teams.

            Figure 11: Updated Salesforce leads table


            Check out the following video to see how we implemented these flows in IBM Cloud Pak for Integration.



            You can also read more about connecting to a Kafka schema registry here - https://www.ibm.com/docs/en/app-connect/11.0.0?topic=kafka-connecting-schema-registry







            1 comment
            129 views

            Permalink

            Comments

            Wed September 01, 2021 10:23 PM

            Hello Divya, can you able to provide steps to use Kafka- Avro Schema registry on ACE Toolkit Integration (without using ACE Designer) . I couldnt find any documentation on IBM Site for the same. Can you help me?