After further investigation, I realized that the Kafka topic had only one partition. As a result, only one Liberty instance in the consumer group was assigned to that partition, which explains the uneven distribution we observed.
Therefore, the first thing I tried was increasing the number of partitions. However, my emitter application kept sending messages to the same partition. To achieve load distribution on the consumer side, we need to find a way for the message producer to distribute messages across partitions.
The easiest way would be to use org.apache.kafka.clients.producer.RoundRobinPartitioner.
bootstrap.properties
mp.messaging.outgoing.test.connector=liberty-kafka
mp.messaging.outgoing.test.bootstrap.servers=fyre1:9092
mp.messaging.outgoing.test.group.id=test
mp.messaging.outgoing.test.topic=test
mp.messaging.outgoing.test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.test.partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
Kafka determines the target partition based on the key of the ProducerRecord. Therefore, setting a random key for each message can help achieve a similar load balancing.
Example1:
@Inject
@Channel("test")
Emitter<ProducerRecord<String, String>> emitter;
@GET
@Path("/send")
public Response sendMessage() {
String theMsg = "Hello " + new SimpleDateFormat("MMM dd,yyyy HH:mm:ss").format(new Date());
ProducerRecord<String, String> rec = new ProducerRecord<>("", Integer.toString( new Random().nextInt(2)), theMsg);
emitter.send(rec);
return Response.ok("Message sent: " + theMsg).build();
Example2:
@Incoming("test2")
@Outgoing("test")
public ProducerRecord<String, String> processMessage(String in) {
String theMsg = "Hello " + new SimpleDateFormat("MMM dd,yyyy HH:mm:ss").format(new Date());
ProducerRecord<String, String> rec = new ProducerRecord<>("", Integer.toString( new Random().nextInt(2)), theMsg);
return rec;
}
Note: Even if you specify the topic name programmatically, the topic name defined in bootstrap.properties will take precedence.
Hope this helps.