WebSphere Application Server & Liberty

 View Only

 How to distribute microprofile reactive messages.

Yoshiki Yamada's profile image
Yoshiki Yamada posted Thu June 05, 2025 06:02 AM
I'm developing a microprofile reactive application running on liberty with mpReactiveMessaging feature.
I was expecting that messages in a Kafka topic would be distributed evenly across multiple Liberty instances with the same group id.
However, all messages were distributed to a specific liberty instance in the group.
How can I distribute the load?
Yoshiki Yamada's profile image
Yoshiki Yamada
After further investigation, I realized that the Kafka topic had only one partition. As a result, only one Liberty instance in the consumer group was assigned to that partition, which explains the uneven distribution we observed.
Therefore, the first thing I tried was increasing the number of partitions. However, my emitter application kept sending messages to the same partition. To achieve load distribution on the consumer side, we need to find a way for the message producer to distribute messages across partitions.

The easiest way would be to use org.apache.kafka.clients.producer.RoundRobinPartitioner.

bootstrap.properties
mp.messaging.outgoing.test.connector=liberty-kafka
mp.messaging.outgoing.test.bootstrap.servers=fyre1:9092
mp.messaging.outgoing.test.group.id=test
mp.messaging.outgoing.test.topic=test
mp.messaging.outgoing.test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.test.partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

Kafka determines the target partition based on the key of the ProducerRecord. Therefore, setting a random key for each message can help achieve a similar load balancing.

Example1:
@Inject
@Channel("test")
Emitter<ProducerRecord<String, String>> emitter;


@GET
@Path("/send")
public Response sendMessage() {
String theMsg = "Hello " + new SimpleDateFormat("MMM dd,yyyy HH:mm:ss").format(new Date());
ProducerRecord<String, String> rec = new ProducerRecord<>("", Integer.toString( new Random().nextInt(2)), theMsg);
emitter.send(rec);
return Response.ok("Message sent: " + theMsg).build();

Example2:
@Incoming("test2")
@Outgoing("test")
public ProducerRecord<String, String> processMessage(String in) {
String theMsg = "Hello " + new SimpleDateFormat("MMM dd,yyyy HH:mm:ss").format(new Date());
ProducerRecord<String, String> rec = new ProducerRecord<>("", Integer.toString( new Random().nextInt(2)), theMsg);
return rec;
}

Note: Even if you specify the topic name programmatically, the topic name defined in bootstrap.properties will take precedence.


Hope this helps.