Configure Kafka for High Durability

Configure Kafka for High Durability

What is Durability in Kafka

Durability in Kafka is all about reducing the likelihood of losing a message.

Confluent Cloud enforces a replication factor of 3 to ensure data durability [1].

Producer acks

Producer acks

Producers can control the durability of messages written to Kafka brokers using the acks config parameter. Although we can use producer acks configuration to optimize throughput and latency, it is primarily used in the context of durability. acks=all offers the highest durability guarantee as the leader broker waits for all ISRs to acknowledge saving of a message (in other words, leader waits for a message to get "committed" and become available for reads by consumers). Only after this, the leader broker returns an acknowledgement to the Producer application. Therefore, the message won't be lost as long as at least one ISR remains alive.

The number of ISRs that must acknowledge the saving of a message is defined by min.insync.replicas (discussed in detail below).

More details on acks config - https://krishnakrmahto.com/configure-kafka-for-high-throughput#heading-producer-acks-for-production-throughput

Trade-offs

https://krishnakrmahto.com/configure-kafka-for-high-throughput#heading-trade-offs-3

Minimum In-sync Replicas

  • The replica brokers of a partition that are "in-sync" with the leader are called in-sync replicas (ISRs). A produced message is replicated across all ISRs before it is made available for reads by the consumer applications.

  • When a Producer has set acks=all in its config, minimum in-sync replicas represent the minimum number of ISRs that must acknowledge the saving of a message to the leader broker before the leader broker returns success response to the producer's produce request. This is set through the min.insync.replicas broker/topic configuration.

  • Default is min.insync.replicas=1, which means the leader returns a response as soon as it saves the message without waiting for acknowledgements from the ISRs. So there is a chance that we may lose messages if the leader broker goes down before the followers have completed replicating the messages.

  • Moreover, a broker acknowledges messages that are not persisted to the disk - Kafka relies on replication across the followers for durability. The idea behind this is that having three machines in separate racks or availability zones, each with a copy of the data, is safer than writing the messages to disk on the leader, because simultaneous failures on two different racks or zones are so unlikely [3].

  • Therefore, setting min.insync.replicas to a higher value in tandem with setting the Producer config acks=all ensures that a produce request was indeed replicated to more than 1 ISRs which strengthens the durability guarantee.

  • min.insync.replicas should not be set to be equal to the replication factor of the topic because of the reason discussed in the Durability vs Write Availability trade-off below.

  • Kindly note that setting any value of min.insync.replicas > 1 is effective only when the Producer acks config is set to acks=all (discussed in the points above).

  • Use cases for which occasional loss of messages can be tolerated are not recommended to have the value of this configuration increased from the default 1 [3].

Trade-offs

Durability vs Write Availability
  • A leader can remove a follower from the ISR list, or a broker itself can go down. If the number of ISRs of a topic partition available is lesser than the configured min.insync.replicas, then the leader broker will not accept any writes to the partition which consequently manifests as the Producer throwing exception. In other words, the topic partition will become unavailable for writes.

  • So as the value of min.insync.replicas is increased, the cluster can tolerate lesser ISR unavailability for writes. For e.g., if number of ISRs = 3 and min.insync.replicas\=1, then we can tolerate unavailability of 2 ISRs, and if min.insync.replicas\=2, then we can tolerate unavailability of only 1 ISR without the leader rejecting produce requests.

  • The maximum possible number of ISRs of a partition can be equal to the replication factor of the topic. However, setting min.insync.replicas\=replication factor is undesirable because it means a partition can become unavailable for writes even when a single replica dies (since the loss of even one ISR in this case results in the total ISR count becoming lesser than the minimum configured).

  • In short, increasing min.insync.replicas increases durability of messages of a topic, but weakens the write availability of a partition.

Durability vs Read Availability
  • Consumers can read from a topic as long as at least the leader broker is available which is always an ISR. Therefore, reads are not affected by the value of min.insync.replicas.
Durability vs Producer Throughput
Durability vs Producer Latency

Producer Retries

  • Producers can retry sending messages to ensure no data is lost if sending messages to the broker fails.

  • The number of retries is controlled by the retries and delivery.timeout.ms producer configurations.

  • It is recommended to keep retries at its default value (MAX_INT) and tune delivery.timeout.ms instead to control the retry behaviour [2].

  • delivery.timeout.ms should be set as the upper bound for the total time between sending a message and receiving an acknowledgement from the broker, which should also reflect the business requirement of how long a message is valid for [1].

Things to consider while Configuring Producer Retries

Message Duplication

If a Producer application does not receive any response from the leader broker within a specified interval (controlled by the Producer configuration - request.timeout.ms), then the producer will retry sending a message which although might have been unacknowledged yet persisted in the broker. This results in duplicate messages.

Message Ordering

Multiple send requests might be in flight (controlled by Producer configuration - max.in.flight.requests.per.connection). Messages might get stored in the broker when an older failed request is retried by the Producer, although newer messages might have already been stored in the broker.

Preventing Message Duplication and Message Ordering Issues - Idempotent Producer

  • Both of the issues are addressed by configuring the Producer as idempotent. This is controlled by the enable.idempotence=true Producer config.

  • Alongwith setting enable.idempotence to true, the Producer configuration must also be set to the following values as mentioned below:

    1. max.in.flight.requests.per.connection (<= 5)

    2. retries > 0

    3. acks = all

  • An Idempotent Producer is assigned a Producer ID by the broker during producer initialization.

  • Every message sent by an Idempotent Producer is assigned a monotonically increasing sequence number starting with 0 (zero) on topic-partition basis.

  • The leader broker stores the sequence numbers it has already received from every Producer ID. The broker uses these stored sequence numbers to determine whether a message is:

    1. new → the sequence number of the message = the last sequence number the broker had seen + 1.

    2. duplicate → the sequence number of the message is <= the last sequence number the broker had seen.

    3. out-of-order → the sequence number of the message is > the last sequence number the broker had seen + 1.

  • If a Producer retries sending a duplicate sequence number, it receives a DUPLICATE_SEQUENCE_NUMBER exception from the broker and the Producer simply considers the message as sent.

  • If a broker receives out-of-order sequence numbers, then it returns an OUT_OF_ORDER_SEQUENCE_NUMBER exception to the Producer for each such message. After this, the producer requeues the message in increasing order to the buffer so that it can be sent again.

Committing Consumer Offsets

  • Consumer offsets track which messages have been consumed by a consumer.

  • Handling unexpected consumer failures is important to make sure that messages don't get lost when they are consumed/processed. This is not an absolute threat to the durability of messages stored in the broker, but the messages effectively seize to exist for the consumer if its offsets are mis-committed during consumer failures.

  • So it is advised to avoid situations where a consumer application commits an offset, then starts processing a message, and then fails unexpectedly before it finishes. In such cases, the subsequent consumer that replaces the failed consumer will start processing the messages that have offsets greater than the committed offsets.

  • By default, a consumer commits offsets automatically (controlled by Consumer config enable.auto.commit) during the poll() request at fixed intervals of 5 seconds (controlled by Consumer config auto.commit.interval.ms). This offset commit is performed at the time of consumer metadata update (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean)) before fetching new consumer records from the broker. Therefore, the default auto-commit strategy commits only those offsets that were processed till the last poll. This causes no side effects to the durability of messages, although this can cause the consumer to process a message more than once in case of consumer failure with uncommitted but processed offsets.

  • We can disable auto-commit by setting enable.auto.commit to false, and manually commit the offsets by calling either commit() or commitAsync() methods. As already discussed, we need to ensure that we should try to avoid committing offsets before the corresponding messages are processed if we choose to commit offsets manually.

References

  1. Optimizing for Durability | Confluent Documentation

  2. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_retries

  3. Kafka - The Definitive Guide

  4. If references to declarative claims made in this doc are not found in the references above, then they may have been taken directly from the Kafka code base - https://github.com/apache/kafka

Did you find this article valuable?

Support Krishna Kumar Mahto by becoming a sponsor. Any amount is appreciated!