This blog post was written in collaboration with @DOGA TAV - STSM | Architect-Threat Mgmt | Master Inventor | Quantum Ambassador | CIL Rep&WW Chair Cloud Security IDT
Partitions are a way to take advantage of scaling and distributed capabilities of Kafka. When more than one partition is defined for a topic the Kafka broker will divide the messages written to the topic into the available partitions, resulting in smaller subsets of data that can greatly improve read throughput by parallelising the consumer clients. To take advantage of having data spread across multiple partitions the consumer applications should be deployed in a consumer group. The consumer group is a unique ID associated with the group, which enables parallel reading of the messages.
Let's look at an example of a consumer group. You have an IoT platform that collects events from sensors and runs a few post-processing operations before persisting the message in the database. One application reads the message, assigns the correct device group based on device ID and persists the message in the database and another application extracts the GPS coordinates from the message, runs a reverse geocoding query to convert them to an address, and updates the device entity in the database with it's last known address. The two actions don't depend on each other, so we can split them into two independent applications. We will assign different consumer groups to the applications. For the device group service we will use the group ID device-group-enrichment
and for the reverse geolocation service we will use the group ID geolocation-enrichment
.
Now when the device writes a message to the topic log, the Kafka broker will send the message to both applications. The IoT device will write continuously and it is expected for the two applications to process the messages at a different rate. This also means we can independently scale our application instances by adding replicas with the same consumer ID. In our example the geolocation service is expected to process messages at a much slower rate, because geo-lookup operations are more expensive than a simple database lookup. In this case we can scale the geolocation service to two or more instances and increase consumption rate.
The caveat here is that we cannot scale applications in the consumer group above the number of partitions per topic. Looking at the diagram below, we can see that for a topic with one partition only one instance of the application will be consuming messages. It might look tempting to create a unique consumer group per application instance, but it is important to remember that message order is preserved per consumer group. In this case where we have two consumer groups, message 1 will be received by both consumers. So we can't use different consumer groups for scaling our application.
You can have more consumers in a consumer group than partitions. The redundant consumers will be dormant and won't receive messages until an active consumer goes offline. Then the broker will start sending messages to the next available consumer. Keep in mind rebalancing is not instant, so it will take some time until the broker starts sending messages to the new consumer. This can be controlled by the consumer property heartbeat.interval.ms
, default is 3 seconds.
Now if we want to scale our application, we need more partitions and can then add more instances of our application, making sure we use the same consumer group ID. If the application is running in kubernetes, we can scale the pod replicas up. The below diagram shows how messages flow when we have two partitions and two consumers per consumer group. This time both consumers are active and Kafka makes sure that the each message is consumed only once per consumer group, allowing our application instances to do work in parallel. It is ok for our consumer group to have less instances than available partitions. In this case one instance will be subscribed to multiple partitions and the broker will make sure to send the messages in the correct order. The broker distributes partitions across consumers so that each consumer processes approximately an equal number of partitions. This ensures that processing is distributed evenly across consumer processes.
When the topic is split into partitions there are multiple strategies available to distribute messages across partitions. By default messages without a message key are distributed in a round robin between partitions. It is important to note that the default behavior for messages without keys has changed since Kafka 2.4 to a sticky partitioning strategy. This new strategy records batches of messages across partitions, which, depending on the use case, might lead to imbalance of messages across partitions.
If messages have a key, the default partitioner hashes the key and distributes the messages to partitions based on the hash value, resulting in messages with the same key being written to the same partition. The function looks like this: partition_id = hash(message_key) % partition_count
. It is important to note that if a new partition is added to the topic, the broker won't rebalance the old messages, but new messages might get written in a different partition than the old ones with the same key.
For a greater control, you can write a custom function to determine the partition a message will end up with, by implementing the Partitioner
interface. Doing so gives you full control over the data distribution across partitions, but you should take extra care of the overhead of the partition()
function to avoid adding lag to the system. If you need to use hashing in your custom partitioner function, consider using MurmurHash, which is the default hashing algorithm used by Kafka, because of it's superior performance compared to other hashing algorithms.
Adding partitions to an existing topic is possible, but can lead to unwanted behavior in some cases. Messages within a partition are delivered in order, so using one of the deterministic partitioning technique described above, you can expect messages to be consumed in the correct order. Adding partitions does not redistribute existing messages and the rebalancing can assign different partitions to existing consumers, which might lead in messages being consumed out of order. This can lead to problems, depending on your application use case.
To avoid these issues, you can over partition the topic, to allow for future growth of your application. Remember that you can have less consumers than partitions, in this case one consumer will read from multiple partitions. Apache Kafka with Zookeeper can support up to 200,000 partitions and later versions of Kafka (>3.1) running KRaft can potentially support up to 2 million partitions per topic.
Now that we have a basic understanding of how partitions work let's summarize the benefits of using partitions and outline some of the challenges we might face.
Benefits
- Scalability: Kafka partitions enable horizontal scaling which is essential for achieving high throughput and handling high volumes of data.
- Performance: Being able to process data in parallel with the use of partitions results in lower latencies and improved service performance.
- Fault tolerance: With multiple partitions, especially with multiple zones in cloud deployments, if one partition goes down the rest can continue operating, ensuring high availability.
- Data locality: Partitions can be utilized to store data locally to minimize data movement.
Challenges
- Partition management: Managing partitions can be complex task, especially in large-scale deployments.
- Data skew: Uneven data distribution across partitions can lead to performance issues and data hotspots.
- Partition key selection: Choosing the right partition key can be challenging, and a poor choice can lead to data skew and performance issues. At a high level, creating too many partitions(over-partitioning) or too few partitions (under-partitioning) can lead to increased overhead or data skew, hence decreased performance.
- Rebalancing: Rebalancing partitions can be time-consuming and very likely to impact performance.
- Monitoring and debugging: Monitoring and debugging partition-related issues can be difficult to tackle due to the distributed nature of the Kafka deployment.