[Kafka] Dynamic vs. Static membership
I assume you have some familiarity with Kafka. If you don't, this article is unlikely to be particularly useful for you.
You can get started by reading something from confluent or a LinkedIn post from 2013.
Unsurprisingly I am not the first to discuss this topic, so maybe you also want to read a couple other articles to get a more holistic view.
Consumer Groups
When you start consuming from Kafka, you must specify a group.id
. This is used by Kafka to determine the context in which you are consuming within. This context is required in order to maintain state within Kafka, such as the consumer offsets, and to allow Kafka to balance partitions over several consumers in the same group.
You may optionally specify a second piece of config: the group.instance.id
.
The obvious next question here is why would we want to do this?
The short answer: to reduce unnecessary re-balancing.
Rebalancing
Say you have deployed a service with 3 replicas consuming from a topic with 6 partitions.
This service runs fine for a while, but at some point one of the machines running the service dies for unexplained reasons (maybe the network card got fried) and you are suddenly down a consumer.
Suddenly, you are no longer consuming data from partitions 2 and 5. Kafka detects this after your configured session.timeout.ms
and performs a rebalancing operation. For the sake of simplicity, I am going to assume that we are using the default eager rebalancing strategy where all partitions will eb reallocated. There are other options but they are not required here.
After this rebalancing, you end up in a configuration with two consumers, each consuming from three partitions.
Great! We have successfully reallocated load on to our remaining compute resources and avoided a total catastrophe. This sounds like exactly what we wanted to achieve.
Frequent Outages
Indeed, this has done exactly what we want in this scenario. Machine failures are, however, relatively rare. If you are working in a modern and reasonably-techy business with a full Ci/CD setup, you are much more likely to be pulling down consumers because you are deploying an update to your application. You could, quite feasibly, be doing this dozens of times a day.
In particular, let's examine the behaviour we expect if we are to have our application deployed via a rolling update.
This ends up looking quite similar to the scenario where a machine died. We replace one of our consumers at a time with the new version.
This rebalance is actually quite a lot of work. Namely, we have to wait for the connection to the downed consumer to timeout (unless the consumer managed to gracefully leave the group). We then need to re-organize our partitions among the remaining, and new, consumers. With an eager strategy, like the default one we are using, this requires all partitions to be revoked and re-assigned.
This process will then repeat every time a step in the rolling update proceeds.
The likely out come of this is that we suffer from severely hampered throughput as our consumers spend a large amount of their time sorting out who is consuming what. Clearly, the more consumers we have in the group, the more this problem is exacerbated.
We could simply switch to a cooperative rebalancing strategy, but this still requires several rounds of communication and we really didn't need to rebalance at all. We know we are replacing the downed consumer in short order, so why not just allocate its partitions to the new replica?
This is exactly what we achieve with a stable group.instance.id
.
Instead of doing a large amount of work shuffling partitions, we just assign the partitions to the new consumer.
Overachieving
By using static group membership, we have managed to reduce the number of rebalances required when we deploy updates to our applications and have also reduced the amount of state shuffling (for stateful consumers) required each time we deploy.
In doing so we have actually opened ourselves up to later confusion. There are several triggers which cause Kafka to rebalance partitions among topics, and you can find them listed elsewhere. The summary is that a rebalance is triggered if:
- A consumer joins the group
- A consumer leaves the group
- A consumer is idle for too long (timeout)
- Partitions are added to a topic
With our static group membership setup, a new deployment does not trigger any of these conditions, hence why we are able to reduce the number of rebalances. Each new consumer comes up and replaces one of the old consumers by claiming its spot in the consumer group, via the maintained group.instance.id
.
For the most part, this is exactly what we want. If we are making a logical code change, we do not want Kafka to re-assign all the partitions as the data has not changed, just our response to it.
In some instances, however, this leads to unexpected behaviour. Consider, for example, the case that your deployment changes the topic you are subscribed to. Maybe you are moving on from a now-defunct topic to a new one, Say you change your service's configuration to stop subscribing to topic.old
and start subscribing to topic.new
. If you do not update your group.id
or your group.instance.id
s, you will find that the assigned partitions do not change at all. Your service will remain consuming from topic.old
, until you either stop it for longer than the configured session.timeout.ms
or change the group.id
.
This one is harder to explain but trivially reproducible, so I leave it as an exercise to the reader to go run an example.
I may come back and update this article if I find the time, but I wouldn't bet on it :)