webMethods

webMethods

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

Kafka JMS How-to using Confluent JMS client 

Mon February 21, 2022 09:26 AM

Note:
If you are trying out in windows, download the Kafka version kafka_2.12-2.8.1 (For other versions there are some permission issues in windows like accessing/writing to the tmp location where the kafka logs are stored). Try removing C:\tmp\kafka-logs directory, sometimes that helps to resolve the issue.

Note:
SoftwareAG has not certified this combination officially.

Follow the steps below to setup a producer/consumer test scenario

  1. Download Apache Kafka

  2. Extract to a location (ex: C:\kafka\kafka_2.12-2.8.1)

  3. Start ZooKeeper service

C:\kafka\kafka_2.12-2.8.1\bin\windows>zookeeper-server-start.bat …/…/config/zookeeper.properties

  1. Start Kafka Broker

C:\kafka\kafka_2.12-2.8.1\bin\windows>kafka-server-start.bat …/…/config/server.properties

  1. Create Topic

C:\kafka\kafka_2.12-2.8.1\bin\windows>kafka-topics.bat --create --topic t1 --bootstrap-server localhost:9092

  1. Download confluent JMS client library with all it’s dependencies, for the demo the following libraries are used
  • jose4j-0.7.9.jar
  • kafka-jms-client-7.0.1.jar
  • gf.javax.jms.jar
  • connect-runtime-2.7.2.jar
  • protobuf-java.jar
  • protobuf-java-util.jar
  • connect-API-3.0.0.jar
  • kafka-clients-2.7.2.jar
  • com.fasterxml.jackson.databind.jar
  • jackson-annotations-2.1.2.jar
  • jackson-core-2.9.9.jar
  • guava-31.0-jre.jar
  1. JNDI configuration

Initial context factory: io.confluent.kafka.jms.KafkaInitialContextFactory
Consider providing all other properties like Topic, client id etc…

  1. We could either use a JNDI properties file (with the below contents and provide the file location in Provider URL section) or directly provide them into the JNDI context (see the sample program below), like
    java.naming.factory.initial=io.confluent.kafka.jms.KafkaInitialContextFactory
    bootstrap.servers=localhost:9092
    topic.t1=t1 #topic destination
    queue.q1=q1 QUEUE destination
    confluent.topic.replication.factor=3 (change based on requirement)
    client.id=my-test-client jms connection client ID

Sample program to test the connection

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class KafkaConnect {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "t1";
private static final String CLIENT_ID = "testClient";

public static void main(String[] args) {

Connection conn = null;
InitialContext _jndiContext = null;

try {
Properties props = new Properties();

props.put("java.naming.factory.initial", "io.confluent.kafka.jms.KafkaInitialContextFactory");

props.put("bootstrap.servers", "localhost:9092");
props.put("topic.t1", "t1");
props.put("queue.q1", "q1");
props.put("confluent.topic.replication.factor", "3");
props.put("client.id", "my-test-client");

// Initialize the JNDI context
Context ctx = new InitialContext(props);
System.out.println("-----Initial Content Initialized------");

// Lookup the connection factory to get a connection
_jndiContext = new InitialContext(props);
ConnectionFactory cf = (ConnectionFactory) _jndiContext.lookup("ConnectionFactory");
System.out.println("-----Connection Factory lookup success-----");

// Start the JMS connection from the factory
conn = cf.createConnection();
conn.start();
System.out.println("-----Connection Started-----" + conn);

//This is just a sample, change the producer and consumer code according to your need
new ProduceAndConsumerMessage(conn, (Destination) _jndiContext.lookup("q1")).sendReceive();

conn.close();
System.out.println("-----Connection Closed-----");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (_jndiContext != null) {
try {
_jndiContext.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}
}

class ProduceAndConsumerMessage {

Connection connection;
Destination destination;
javax.jms.MessageProducer producer;
Session session;
javax.jms.MessageConsumer consumer;

public ProduceAndConsumerMessage(Connection connection, Destination destination) throws JMSException {

this.connection = connection;
this.destination = destination;		
}

public void sendReceive() throws JMSException {

try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

TextMessage message = session.createTextMessage();
producer = session.createProducer(destination);
producer.send(destination, message);
System.out.println("Sent message");

//consume
consumer = session.createConsumer(destination);
Message msg = consumer.receive(500);
if (msg != null) {
System.out.println("Received message: " + msg.getJMSMessageID());
}
} finally {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}			
}
}
}

Trying out with Integration Server JMS

  1. For JNDI configuration, provide all properties in “Other Properties” section

  2. While creating the JMS connection, uncheck the “Create Temporary Queue” option as Kafka does not have a concept of a temporary queue.

  3. Use pub.jms:send to send a message to a destination

Note:
Currently the Integration Server JMS consumer implementation uses the javax.jms.Session.createConsumer(Destination, String) JMS API to create a consumer with a message selector, which kafka doesn’t support. So you can’t use JMS triggers for this combination and to add this support contact the product team.

Demo

kafkajms


#Integration-Server-and-ESB
#webMethods

Statistics
0 Favorited
0 Views
0 Files
0 Shares
0 Downloads

Comments

Sun May 05, 2024 09:24 AM

Hi Prasanta.
We are trying to configure Kafka SASL adapter producer connection in 10.15 webMethods and the target system is using Kafka 3.4.*.

We are trying to enable the adapter connection and we get

[ART.118.5042] Adapter Runtime (Connection): Unable to enable connection resource Default:TestKafka.
[ART.118.5063] Adapter Runtime (Connection): Unable to start connection Default:TestKafka: after 1 attempt(s).
[ART.118.5036] Adapter Runtime (Connection): Unable to configure connection manager.
[ADA.507.1001] Resource Connection Exception:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer: Could not find a public no-argument constructor for org.apache.kafka.common.serialization.Serializer

we have placed the below jars in WmKafkaAdapter/jars folder
kafka-clients-3.4.1.jar kafka-tools-3.4.1.jar kafka_2.13-3.7.0.jar static wm-kafka-v9.jar

|Value Serializer Class com.wm.adapter.wmkafka.idata.IDataSerializer
|Key Serializer Class org.apache.kafka.common.serialization.Serializer
Security Protocol SASL_PLAINTEXT

Are we missing anything else.


#webMethods
#Integration-Server-and-ESB

Tue February 21, 2023 01:36 AM

Wed February 08, 2023 04:35 AM

Hi Malik,

We have actually been able to fix our problem. The issue came from the SSL properties values we were provided which were incorrect.

Thanks and best regards,
Alexis


#webMethods
#Integration-Server-and-ESB

Mon February 06, 2023 10:50 AM

Hi Alexis,

Please provide the complete error stack from error log and check if the URL is well formed. If you are using SSL connection, also include SSL logs.

Thanks


#webMethods
#Integration-Server-and-ESB

Mon February 06, 2023 09:41 AM

Hello,

Thank you for this walkthrough on how to configure Confluent JMS client in Integration Server.

By any chance, have you ever encountered the following error while trying to start the JMS connection ?
com.wm.app.b2b.server.jms.JMSSubsystemException: [ISS.0134.9064] Error creating connection: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

Best regards,
Alexis


#webMethods
#Integration-Server-and-ESB

Tue March 01, 2022 01:49 AM

Good one @Prasanta_Malik might help many users who are looking to leverage the capabilities of IS and Kafka.

Thanks,
Sree


#webMethods
#Integration-Server-and-ESB