Skip to content

Kafka

What happens on deploy?

When you deploy an application that requests access to Kafka, Naiserator will create an AivenApplication resource in the cluster. The AivenApplication has a name that matches the deployed application, and the name of the secret to generate. Naiserator will request that a secret with this name used in the deployment.

When an AivenApplication resource is created or updated, Aivenator will create a new service user and generate credentials. These credentials are then inserted into the requested secret and used in the deployment.

If there is a problem generating the secret, this might fail your deployment. In this case, Aivenator will update the status part of the resource, with further information about the problem.

Application design guidelines

Authentication and authorization

The NAIS platform will generate new credentials when your applications is deployed. Kafka requires TLS client certificates for authentication. Make sure your Kafka and/or TLS library can do client certificate authentication, and that you can specify a custom CA certificate for server validation.

Readiness and liveness

Making proper use of liveness and readiness probes can help with many situations. If producing or consuming Kafka messages are a vital part of your application, you should consider failing one or both probes if you have trouble with Kafka connectivity. Depending on your application, failing liveness might be the proper course of action. This will make sure your application is restarted when it is experiencing problems, which might help.

In other cases, failing just the readiness probe will allow your application to continue running, attempting to move forward without being killed. Failing readiness will be most helpful during deployment, where the old instances will keep running until the new are ready. If the new instances are not able to connect to Kafka, keeping the old ones until the problem is resolved will allow your application to continue working.

Working with Kafka Offsets

Retention and what it means

On Aiven Kafka, we retain consumer offsets for a period of 7 days. This is the period recommended by Aiven and the default for Kafka. Due to how longer offset retention affects other parts of Kafka, we do not want to increase this period.

When a consumer group stops consuming messages, its offsets will be retained for the mentioned period.

How Kafka decides if a consumer group has stopped comes in two variations:

Dynamically assigned partitions

This is the normal operation when using current Kafka client libraries. In this case, Kafka assigns partitions to the consumers as they connect, and manages group membership. A consumer group is considered empty when there are no connected consumers with assigned partitions.

Once the group is empty, Kafka retains the offsets for 7 days.

Manually assigned partitions

When using the assign API you are responsible for keeping track of consumers. In this scenario, Kafka uses the time of the last commit to determine offset retention.

Offsets are kept for 7 days after the last commit.

Warning

This means that when using manual assignment on a topic with long periods of inactivity (more than 7 days between messages), you might lose offsets even if your consumer is running and committing offsets as it should.

Do you even need offsets?

Some scenarios don't actually need to track offsets, and can consider disabling the feature for a slight performance gain. In these situations, you can set enable.auto.commit=false, and simply not commit offsets.

There are two main variations of this scenario:

  1. Always reading the entire topic from start to end. Set auto.offset.reset=earliest.
  2. Only caring about fresh messages arriving after the consumer connects. Set auto.offset.reset=latest.

Autocommit: When/Why/Why not?

When starting out with Kafka, it is common to use the autocommit feature available in the client libraries. This makes it easy to get started, and often provides good enough semantics.

When you use autocommit, the client will automatically commit the last received offsets at a set interval, configured using auto.commit.interval.ms. The implementation ensures that you get "at-least-once" semantics, where the worst case scenario is to reprocess messages received in the interval between last commit and the consumer stopping. One downside with this mechanism is that you have little control over when offsets are committed.

Autocommit is done before a poll to the server, which means that your consumer needs to ensure that has completed processing of a message before the next call to poll. If your consumer processes messages in other threads, you probably need to manage offsets explicitly and not rely on autocommit.

Managing offsets explicitly

The KafkaConsumer exposes two APIs for committing offsets. Asynchronous commits using commitAsync and synchronous commits using commitSync.

From the Confluent documentation:

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending.


A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately by using asynchronous commits.


In general, asynchronous commits should be considered less safe than synchronous commits.

Saving offsets elsewhere

The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. Another use case is when you have consumers that receive messages very rarely, where consumer inactivity and other incidents might lead to lost offsets because of shorter retention.

In some cases it might even be beneficial to store offsets in alternative storage even if your messages are not. This will avoid issues with offsets passing beyond the retention threshold, in case of recurring errors or networking issues.

When storing offsets outside Kafka, your consumer needs to pay attention to rebalance events, to ensure correct offset management. In these cases it might be easier to also manage partition assignment explicitly.

Before storing offsets outside Kafka, consult the Kafka documentation on the topic.

What to do when you lose your offsets

Shit happens, and you may experience lost offsets even if you've done everything right. In these cases, having a good plan for recovery can be crucial. Depending on your application, there are several paths to recovery, some more complicated than others.

In the best of cases, you can simply start your consumer from either earliest or latest offsets and process normally. If you can accept reprocessing everything, set auto.offset.reset=earliest. If you can accept missing a few messages, set auto.offset.reset=latest. If neither of those are the case, your path becomes more complicated, and it is probably best to set auto.offset.reset=none.

If you need to assess or manually handle the situation before continuing, setting auto.offset.reset to none will make your application fail immediately after offsets are lost. Trying to recover from lost offsets are considerably more complicated after your consumer has been doing the wrong thing for an hour.

If you don't want to start at either end, but have a reasonable estimate of where your consumer stopped, you can use the seek API to jump to the wanted offset before starting your consumer.

You can also update consumer offsets using the Kafka command-line tool kafka-consumer-groups.sh. Aiven has written a short article about its usage, that is a great place to start. In order to use it you need credentials giving you access to the topic, which you can get using the NAIS CLI.

For other strategies, post a message in #kafka on slack, and ask for help. Several teams have plans and tools for recovery that they can share.

Getting estimates for last offset

Finding a good estimate for where your last offset was can be tricky.

One place to go is Prometheus. In our clusters, we have kafka-lag-exporter running. This tracks various offset-related metrics, one of which is the last seen offset for a consumer group.

You can use this query to get offsets for a consumer group:

max(kafka_consumergroup_group_offset{group="spedisjon-v1"}) by (topic, partition)

FAQ/Troubleshooting

Why do I have to specify a pool name if there is only nav-dev and nav-prod?

Custom pools might be added in the future, so this is done to avoid changing that part of the API.

I can't produce/consume on my topic, with an error message like "topic not found". What's wrong?

You need to use the fully qualified name; check the .status.fullyQualifiedName field in your Topic resource.

I can't produce/consume on my topic, with an error message like "not authorized". What's wrong?

Make sure you added the application to .spec.acl in your topic.yaml.

I get the error MountVolume.SetUp failed for volume "kafka-credentials" : secret ... not found

Check the status of the AivenApplication resource created by Naiserator to look for errors.

Are Schemas backed up?

Aiven makes backups of configuration and schemas every 3 hours, but no topic data is backed up by default. See the Aiven documentation for more details.