One of the microservices we are currently developing consists of a Kafka cluster of six nodes running inside Docker containers with identical configurations, managed by AWS Elastic Container Service (ECS).
The cluster is set up with 20 topics and results in a very heterogeneous traffic load pattern: the data production rate of the heaviest used topic, for example, is bigger than the sum of the remaining 19 topics.
We monitor the health of this cluster using a home grown monitoring solution called MachZ which we’ve written about extensively here and here. We use MachZ to store and visualize Kafka’s built-in JMX metrics by using Jolokia and jmx2graphite.
One day we noticed a particular node in the cluster (referred henceforth as the “bad node”) was running with a higher CPU load than other nodes (Figure 1). Although it only represented 16% of the capacity of the cluster, it was dramatically degrading the entire cluster’s capacity to handle traffic.
Figure 1 - Orange Line = Bad Node
Preliminary Observation: High Network I/O
After some basic triaging, we observed the following:
On the bad node, there was no special process that was consuming additional CPU load; all the additional CPU load came from the Kafka process
Network metrics (bytes in/out per second) of the bad node was significantly higher than those of other nodes in the cluster
Kafka’s own ‘message inbound per second’ metric (kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec) was also higher in the bad node than other nodes.
Superficially speaking, it seemed that the bad node was accepting more traffic than the other nodes therefore experiencing a higher CPU load.
Digging Deeper: Uneven Message Production to the internal topic __consumer_offsets”
Given the basis of our earlier findings, we used the network monitoring tool “iftop” to check where exactly the excessive amount of network traffic was occuring. By comparing the result from the bad node to the other normal nodes, we found the bad node was sending a large amount of traffic to two other nodes in the cluster and these two other nodes were ranked second and third in CPU usage ranking (descending order).
Based on this additional observation, one might have guessed that the cause was Kafka topic related, since we were using 3 as the replication factor for most of our topics. Was it possible that there was an abnormal topic sending most of its traffic to the bad node, instead of evenly distributed traffic to all nodes?
To check the incoming traffic on partition and node level for a topic, we took two snapshots of consumer offsets for each partition of the topic. By comparing the offset increase rate of all the topics, we found something interesting. As shown in Figure 2, data production of topic __consumer_offsets was highly uneven. Partition #12 was taking 95% of the traffic, even though there were 50 partitions in total.
The __consumer_offsets Topic
A little about this interesting topic: It’s managed by Kafka and was introduced in version 0.9. As the name implies, Kafka uses it to track each consumer’s offset data on its topic consumption. Before version 0.9, this data were stored in Zookeeper. Per Kafka’s official documentation:
“When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets….”
After looking at Kafka’s source responsible for producing __consumer_offsets requests, we found the evidence that explains everything we had observed:
“For a given __consumer_offsets message that stores offset of topic/partition/consumer group, its target partition number is basically the hash of consumer group id. “
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
This clarified everything for us! Our most heavily used topic, “thl_raw”, which tracks all database row change events, is consumer by a single consumer group named “RawChangeConsumer”, which is unsurprisingly mapped to partition #12 by Kafka. As a result, all __consumer_offsets messages for this topic were sent the bad node which is the leader of that partition, and then replicated to two other nodes, which were the second and third ranked in the CPU usage ranking list.
Reproducing The Problem
Let’s take a moment to take a step back and flush out the root cause of problem. just like most of the Kafka broker CPU related issues we have experienced, this particular problem was mainly related to the consumer side. The status data (consumer offsets) of topic consumption of a consumer is stored in a internal topic. The high CPU usage is caused by the traffic of these status data, in other words, it’s caused by the frequency in which the consumers commit their offsets. Therefore, the uneven CPU is caused by the Kafka design which dictates all status data for the same consumer group (with same group id) will flush into the same single partition.
To confirm our findings, we reproduced the scenario in a testing environment by using the Kafka CLI to launch a consumer configured with a high frequency of offset commitment and an explicit consumer Group ID.
./kafka-console-consumer.sh --bootstrap-server xxx:2181 --topic test --from-beginning --consumer-property group.id=76 --consumer-property auto.commit.interval.ms=20
Group ID “76” hashed to 9, and partition #9 was lead by node with 10.19.61.227 in our test cluster. Unsurprisingly, after executing the command, this node immediately began to incur a much higher CPU load than other nodes (Figure 3).
Figure 3 - CPU behavior of node 1019.61.227
Impact Analysis and Next Steps
So it seems like the uneven CPU usage was just a result of Kafka’s design, and after understanding how it works under the hood we realized it’s almost impossible to resolve the uneven load of __consumer_offsets topic in a real world production environment, unless:
one carefully designs consumer Group IDs so that their hashes are evenly distributed among the 50 partitions of __consumer_offsets topic AND
one can somehow ensure that all consumers consume message at the same rate
However, how realistic is it to achieve the above two bullets? The more important question, we believe, is whether this will cause issues in real production environments.
To that end, we ran the above test for two days and with increasing commit frequency (auto.commit.interval.ms 20 then 10, then 1), and found the CPU usage stayed pretty constant with no increase. We believe this was because sending and storing messages doesn’t require much CPU. What actually requires CPU is the internal compacting process on this special topic, and its computational complexity is better than linear.
Since the initial draft of this blog post, we haven’t found any other issues caused by this design but are currently testing the same scenario for the newly released Kafka 1.0, and will continue to remain active in the Kafka community to discuss the origins of this issue.