visit
We have numerous data to process at speed, many microservices with their teams, that is why we chose Kafka as our main data exchange platform. We've faced several situations over the years of using it, and we want to talk more about some of them below.
Typical success stories are usually boring. However, it is always fun to read about failures and challenges. And this post is mostly about issues, misunderstandings, and (sometimes) heroic solutions.
Let's start from the beginning, literally the connection. When connecting to Kafka, you need to specify the so-called bootstrap servers. Usually, the addresses of all the brokers in the cluster are provided, but it’s enough to specify just a few of them (even one could work). Why?
To answer this question, let's look at how clients connect to Kafka. When connecting, the client specifies the topic and its partition (more on partitions later on). To start writing to / reading from that partition, you need to connect to its leader, which is always one of the cluster brokers. The authors of Kafka were kind enough to us developers and eliminated the need to search for a leader on our own. The client can connect to any broker, which will re-connect it to the leader of the requested partition.
It means that to connect to the cluster successfully, you only need to know the address of just one broker. So, why indicate the whole list?
On the one hand, Kafka seems to be dead simple, but on the other, it’s an incredibly complicated thing. Yes, it’s just a service that allows you to write and read bytes, but there are hundreds of different settings that control the transmission and storage of these bytes.
For example, you can set the duration of how long messages should be retained in a topic. Unlike typical message brokers that only transmit data, Kafka stores it. In essence, Kafka is a commit log – a structure where data is appended only to the end, and once Kafka has received the message, it is stored as long as required.
The retention settings, which provide different options, define the “as long as required” part. You can set up messages to be deleted after a certain amount of time or when their total size hits some threshold.
Besides straightforward deletion, there is also so-called compaction– when Kafka deletes not just old messages but all previous messages with the same key (more on keys later on). In fact, messages in the middle of the topic are getting deleted. Why do we need this?
Compaction saves space for storing data by removing data we don't need. If we make changes to an entity in snapshots (the current state after the change), we no longer need previous versions. The latest snapshot is enough. Compaction is all about removing the previous versions.
Again, the real deletion of data occurs in inactive segments and under certain circumstances. Many configuration parameters control the process, but the point is that the data won't be deleted for a long time, and this should be considered in your service design.
These tombstones are stored in the topic so that consumers (services that read from Kafka), once they reach them, would understand that a record with such a key has been deleted, and process this fact accordingly. Besides that, tombstones also get deleted after some time, leaving no trace of the original record inside the topic. The deletion time is configured separately, and it should not be set too short, especially if you do not know your topic consumers. After all, if the tombstone is deleted before some slow-moving consumers have managed to read it, then the record will not be deleted for them at all. It will remain forever.
The process seems to be really thoughtful and clearly described, with no signs of trouble. We came up with a service that reads the list of current events from a topic and stores it in memory. There are many sports events, but at some point, they all end, and then they could be deleted. We used the previously mentioned tombstone method in the topic with configured compaction.
To get that, we can imagine a Kafka topic as a stream, like a file or a buffer in memory. Working with such a stream consists of opening it, indicating the position for reading the data, and then reading items one by one in a loop. We do not need to tell Kafka that we have succeeded in reading something. If a consumer returns a message during the current call, the call will return the next message.
The position of each message is called “offset”. Consumers should provide the starting offset they want to read from. It can be set in absolute and relative values, from the beginning to the end. Since most scenarios imply that when the service restarts it should resume reading from the position where it stopped last time, and to achieve that, the last read offset should be provided during connection. Kafka has a feature out of the box to make things simpler.
The process of storing offsets is calledcommit. It has nothing to do with a confirmation that a message has been read or processed. It is merely providing a starting position for future consumers of that partition. And yes, a consumer can commit any offset, not just the one it just read.
Having that, it turns out that the frequency of commits may not coincide with the frequency of receiving messages. When the rate of messages is high, sending a commit to each one could significantly impact the performance negatively. For high load systems, offsets are usually being committed less often – for example, every few seconds.
This approach may result in some messages being processed more than once. For example, the service restarted after processing message 10 but only managed to commit message 5. As a result, after restarting, it will re-read messages 6-10.
One day, we noticed an invasion of strange consumer groups to our cluster. Consumer groups are usually given meaningful names, indicating the service, product, or team. Such naming helps to identify consumers of a topic. But these strange groups were empty (did not store any offsets) and did not have any details in the name – just opaque GUIDs. So, where did they come from?
When consumers connect using the Assign()
method, rather than Subscribe()
, they get full control over the situation and specify which partitions they want to read from.
And our mysterious new groups turned out to be consumer groups, created by a service that used Assign()
. But why were there so many of them, and where did the GUID come from?
It turned out that in the case of a .NET client from the official repository, a GUID is used to name the group. In the vast majority of scenarios that include a GUID, we require a unique identifier. Any .NET developer has a muscle memory – when they see Guid, they use Guid.NewGuid()
that produces a unique identifier. So did we. As a result, during every start of the service, a new consumer group was created, and the old one did not get removed. This was very strange and did not look like “by design” behavior.
However, during yet another study of examples of consumers with Assign
, we suddenly realized that they don’t useGuid.NewGuid()
, but rathernew Guid()
, which is an entirely different beast. It results not in a unique GUID but in a default value consisting of zeros. So in library samples, basically the Guid
type was used to get a constant that was used as a consumer group name. You can see these samples
If you start your journey with Kafka with a book (which is one of the best ways), the work of clients will, most likely, be described using Java as an example — with a lot of exciting and correct things. For example, you’ll discover that the consumer implements a rather complicated protocol, which hides many details like working with consumer groups, balancing, and so on.
Librdkafka, in turn, works differently. Their offsets are committed in the background by a dedicated thread. After calling the Commit method, the commit may or may not reach Kafka. What is worse, with default settings, the offset could be committed, but the message may not be processed (
Our main stack is .NET, but at some point, we decided to add colors to our boring life by mixing a bit (as it seemed to us back then) of JVM to it, namely Scala. Why? Well, Kafka is written in Java-Scala, the JVM has a higher-level API – Kafka Streams. In their difference, these APIs are similar to C and Python for reading files.
For the former, you need to mess with an opening (and closing) a file, allocating (and freeing) a buffer, loops, and all other benefits of low-level byte handling. Tens of lines of code. And for the latter, well, it all could be done with a simple one-liner.
In Kafka streams, topics are introduced in the form of streams, from which you can read, join with other streams (topics) by a key, or write a predicate and filter the message by criteria.
So, we wrote some code, ran it, but it did not work as expected. No errors and no results. We started to dig deeper and found some interesting things.
To understand and appreciate the value of our findings, let's take a closer look at the Kafka concepts of keys and partitions. Messages in Kafka are stored in topics, and topics are split into partitions. Each partition is a kind of shard. Data from each topic is split into parts, stored on different brokers, thus increasing the number of producers and consumers of that topic.
Novices often confuse partitions (shards) with replicas (copies). The difference is that a partition keeps only a part of a topic data, while a replica stores all topic data. These two things are not mutually exclusive, and in most cases, topics have several partitions and several replicas.
Partitions improve efficiency, while replicas are used to improve reliability and availability. The increase in efficiency is achieved through the horizontal scaling of consumers. When using the recommended approach with consumer groups, only one consumer may be reading the partition at a time. Therefore, the scaling limit for reads is the number of partitions. The consumer group still could have more consumers than a number of partitions, but extra ones will just be idle.
The logic of partitioning is that data is being assigned to one or another partition according to certain criteria. Of course, it is possible to write to each partition in a round-robin manner, similar to a regular load balancer. Still, this approach is unsuitable for many typical scenarios where messages related to one entity (for example, order modifications) must be processed by the same consumer. The more practical way is a hash function that is applied to the message key and determines what partition that message should be sent to.
Things are getting complicated here, and once again thanks to the Kafka developers for making it easier. When sending a message, you need to specify the partition, but clients also have a built-in helper mechanism that calculates the appropriate partition.
This choice is made using the so-called partitioner. In fact, this is the implementation of some hashing function. And there is even a default partitioner that just works, so developers might not even know that it exists.
Kafka is a fairly old, mature product (since 2011); some APIs have changed during its development path and some have been deprecated.
For example, in the beginning, for the connection, the Zookeeper address (which is a necessary component of Kafka versions before 2.8.0) was used. Later they switched to the addresses of Kafka brokers (the bootstrap servers mentioned above). Now the recommended way is to use bootstrap servers, but connection via Zookeeper also works and is still supported in some utilities.
Conclusion: Do not use obsolete protocols, or at least do not mix them with the new ones.