Working with Kafka Offsets¶
The Kafka offset is the index of the last read message for a consumer.
Retention and what it means¶
On Aiven Kafka, we retain consumer offsets for a period of 7 days. This is the period recommended by Aiven and the default for Kafka. Due to how longer offset retention affects other parts of Kafka, we do not want to increase this period.
When a consumer group stops consuming messages, its offsets will be retained for the mentioned period.
How Kafka decides if a consumer group has stopped comes in two variations:
Dynamically assigned partitions¶
This is the normal operation when using current Kafka client libraries. In this case, Kafka assigns partitions to the consumers as they connect, and manages group membership. A consumer group is considered empty when there are no connected consumers with assigned partitions.
Once the group is empty, Kafka retains the offsets for 7 days.
Manually assigned partitions¶
When using the assign
API you are responsible for keeping track of consumers.
In this scenario, Kafka uses the time of the last commit to determine offset retention.
Offsets are kept for 7 days after the last commit.
Warning
This means that when using manual assignment on a topic with long periods of inactivity (more than 7 days between messages), you might lose offsets even if your consumer is running and committing offsets as it should.
Do you even need offsets?¶
Some scenarios don't actually need to track offsets, and can consider disabling the feature for a slight performance gain.
In these situations, you can set enable.auto.commit=false
, and simply not commit offsets.
There are two main variations of this scenario:
- Always reading the entire topic from start to end. Set
auto.offset.reset=earliest
. - Only caring about fresh messages arriving after the consumer connects. Set
auto.offset.reset=latest
.
Autocommit: When/Why/Why not?¶
When starting out with Kafka, it is common to use the autocommit feature available in the client libraries. This makes it easy to get started, and often provides good enough semantics.
When you use autocommit, the client will automatically commit the last received offsets at a set interval, configured using auto.commit.interval.ms
.
The implementation ensures that you get "at-least-once" semantics, where the worst case scenario is to reprocess messages received in the interval between last commit and the consumer stopping.
One downside with this mechanism is that you have little control over when offsets are committed.
Autocommit is done before a poll
to the server, which means that your consumer needs to ensure that has completed processing of a message before the next call to poll
.
If your consumer processes messages in other threads, you probably need to manage offsets explicitly and not rely on autocommit.
Managing offsets explicitly¶
The KafkaConsumer exposes two APIs for committing offsets.
Asynchronous commits using commitAsync
and synchronous commits using commitSync
.
From the Confluent documentation:
Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending.
A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately by using asynchronous commits.
In general, asynchronous commits should be considered less safe than synchronous commits.
Saving offsets elsewhere¶
The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. Another use case is when you have consumers that receive messages very rarely, where consumer inactivity and other incidents might lead to lost offsets because of shorter retention.
In some cases it might even be beneficial to store offsets in alternative storage even if your messages are not. This will avoid issues with offsets passing beyond the retention threshold, in case of recurring errors or networking issues.
When storing offsets outside Kafka, your consumer needs to pay attention to rebalance events, to ensure correct offset management. In these cases it might be easier to also manage partition assignment explicitly.
Before storing offsets outside Kafka, consult the Kafka documentation on the topic.
What to do when you lose your offsets¶
Shit happens, and you may experience lost offsets even if you've done everything right. In these cases, having a good plan for recovery can be crucial. Depending on your application, there are several paths to recovery, some more complicated than others.
In the best of cases, you can simply start your consumer from either earliest
or latest
offsets and process normally.
If you can accept reprocessing everything, set auto.offset.reset=earliest
.
If you can accept missing a few messages, set auto.offset.reset=latest
.
If neither of those are the case, your path becomes more complicated, and it is probably best to set auto.offset.reset=none
.
If you need to assess or manually handle the situation before continuing, setting auto.offset.reset
to none
will make your application fail immediately after offsets are lost.
Trying to recover from lost offsets are considerably more complicated after your consumer has been doing the wrong thing for an hour.
If you don't want to start at either end, but have a reasonable estimate of where your consumer stopped, you can use the seek
API to jump to the wanted offset before starting your consumer.
You can also update consumer offsets using the Kafka command-line tool kafka-consumer-groups.sh
.
Aiven has written a short article about its usage, that is a great place to start.
In order to use it you need credentials giving you access to the topic, which you can get using the NAIS CLI.
For other strategies, post a message in #kafka on slack, and ask for help. Several teams have plans and tools for recovery that they can share.
Getting estimates for last offset¶
Finding a good estimate for where your last offset was can be tricky.
One place to go is Prometheus. In our clusters, we have kafka-lag-exporter running. This tracks various offset-related metrics, one of which is the last seen offset for a consumer group.
You can use this query to get offsets for a consumer group: