Configure Kafka for High Throughput

Configure Kafka for High Throughput

Number of Partitions

Topic partitions is a unit of parallelism in Kafka. Therefore, increasing the number of partitions of a given topic may also increase the throughput.

How to determine number of partitions for high throughput per topic

  1. Measure the production throughput on a single partition - tproduction.

  2. Measure the consumption throughput on a single partition - tconsumption.

  3. If the target throughput is t, then the minimum number of partitions required = max(t/tproduction, t/tconsumption).

High throughput per partition

  1. High throughput per partition can be achieved on the producer side by tuning the following configurations (All except max.in.flight.requests.per.connection have been discussed in the other sections below):

    1. batch.size

    2. linger.ms

    3. compression.type

    4. acks

    5. max.in.flight.requests.per.connection → This number is for a connection, not for a single partition. However, it helps increasing throughput per partition "on an average".

  2. However, in general, one can produce at 10s of MB/sec on just a single partition as shown in this benchmark.

  3. On the consumer side, the throughput is often dependent on how fast the consumer logic executes in the Kafka client application.

Increasing number of partitions over time

  • We can increase the number of partitions at a later point, however this can change the partition assignment of new keyed-messages produced thereafter. With a given number of partitions, it is guaranteed that messages with the same key will be sent to the same partition, and messages in a partition are always delivered in order to consumers. This partition assignment guarantee could be critical for some client applications, so we need to evaluate our application for the same (this evaluation should be done at the outset itself of the application).

  • The recommendation by Confluent is a better-safe-than-sorry approach - Over-partitioning . It basically means configuring number of partitions based on the throughput expected in the future.

  • However, even then we may find the need to increase the number of partitions at some point. In such a case, there will be a period, depending on log retention, when messages with same key will be present in two partitions (this will happen when newer messages no longer get assigned to the same partition as the older messages with the same key). Or, we can choose to take a relatively significant downtime by bringing down all the producer clients, making sure that all the messages in the topic have been consumed, and then finally increasing the count of partitions.

Trade-offs

Taking care of OS File handle limit with increasing partitions

  • Kafka broker has a directory for every partition of a topic. The directory contains log segment files, and each log segment file is accompanied by an index file. Kafka broker opens file handles for an index file and also its corresponding log segment file.

  • Therefore, increasing the number of partitions would also mean the broker will be opening a higher number of file handles. We may have to take care of the maximum limit imposed by the OS on the allowed number of file handles as we go on increasing the number of partitions.

  • Confluent reports having seen production Kafka clusters running with 30,000 open file handles per broker.

Increasing the number of partitions may decrease availability of Kafka cluster

  • Every partition in Kafka has a leader replica and a set of follower replicas. All read and write requests are served from the leader replica which lives in one of the brokers in the cluster.

  • So when a broker fails/shuts down abruptly, every partition whose leader lived on the failed broker becomes temporarily unavailable. The affected partitions become available again for serving client requests when the Controller broker detects the failure and reassigns partition leadership to one of their available replicas. If it is the Controller broker itself that goes down, then there is an added delay of Controller failover and initialization (happens automatically through Zookeeper).

  • When a broker is shut down gracefully, the Controller moves the partition leadership for one partition at a time. Moving a single leader takes only a few milliseconds, hence the unavailability is limited to a very small duration from a client application's perspective.

  • Simply put, increasing partitions increases the average number of leaders of partitions per broker which can increase the total read and write unavailability of partition in case of broker failures and/or graceful shutdowns.

  • Ungraceful shutdown/failures are less likely to happen compared to graceful shutdowns. However if availability is paramount even during such rare cases, Confluent recommends keeping number of leader partitions limited to 2 to 4000 per broker, and total number of partitions in a cluster to low tens of thousands.

Increasing the number of partitions may also increase end-to-end latency

  • The end-to-end latency is measured as the time duration from the point when the message is produced till the point it is available for the consumers to read [2].

  • A produced message is considered committed only after it has been written on all in-sync replica partitions.

  • A message is available to be read by consumers only after it has been committed.

  • So, end-to-end latency can be significantly affected if the time to "commit" a message is high.

  • How increasing the number of partitions increases the time to commit a message?

    • By default, every follower broker uses a single thread to replicate data from another broker (number of threads replicating from source broker is controlled by num.replica.fetchers broker config).

    • Increasing the number of partitions means adding more leader and follower partitions (and hence, more in-sync replicas) per broker.

    • Therefore, each broker in the cluster will have more number of follower partitions that replicate data.

    • More partitions can increase the latency of "committing" a message which gets added to the total end-to-end latency of a message.

  • According to Confluent, replicating 1000 partitions from one broker to another can add about 20 ms latency (one broker had all the 1000 leader partitions and the other broker had all 1000 follower partitions).

  • Evidently, the load of replicating messages from other brokers reduces on relatively larger Kafka clusters because that increases the distribution of partition replicas amongst the brokers, bringing down the total number of follower partitions per broker. Thus, the end-to-end latency effectively is lower in larger clusters.

  • An approximate rough formula for latency given number of brokers = b, replication factor = r, desired latency = l ms and taking 20 ms latency for replicating 1000 partitions as a reference from Confluent's experimental observation mentioned above: l = (ptotal/b) \ (1-r) * (20/1000) ms* where, (ptotal/b) \ (1-r)* is the number of follower partitions that replicate corresponding leader partitions.

Increasing the number of partitions may require more memory on client applications

  • Producer application

    • Producer applications buffer message per partition for a configurable amount (linger.ms/batch.size) before sending a batch of messages off to the brokers.

    • Higher number of partitions directly implies the producer having to buffer messages for the newly added partitions. Therefore, it is possible that we exceed the maximum limit set as the producer buffer memory - buffer.memory.

    • When the buffer.memory is reached, the producer will block new sends until the memory frees up or until a configurable amount of time (max.block.ms) passes before handling the new send requests or finally throwing an exception on the same thread where "send" was called.

    • Therefore, we may have to reconfigure buffer.memory with a larger size. Confluent's recommendation - "As a rule of thumb, to achieve good throughput, one should allocate at least a few tens of KB per partition being produced in the producer and adjust the total amount of memory if the number of partitions increases significantly.".

  • Consumer application

    • Consumer applications also request messages from Kafka brokers in batches. The requested batch size is controlled by fetch.max.bytes and max.partition.fetch.bytes.

    • There are broker as well as topic configurations that restrict the maximum allowable batch size (Topic config - max.message.bytes, Broker config - message.max.bytes). Increasing the number of partitions risks back pressure; we may have to revisit the aforementioned consumer as well as broker/topic configurations.

    • This memory issue may be of concern only in non-realtime consumer applications though.

Batching Messages

Batching messages also is a way to improve the throughput of Kafka cluster. However, this trades-off with latency.

Batching on Producer Side

  • Batching messages on producer side can be optimized by configuration linger.ms and batch.size producer configurations.

  • Batched messages are sent after linger.ms regardless of weather batch.size worth of messages are accumulated. Similarly, as soon as batch.size is filled, it is sent to the Kafka broker regardless of whether linger.ms has passed.

  • Default for linger.ms is 0, this ideally means Kafka Producer will send a message as soon as it gets it. However, this config being 0 does not mean batching is disabled - Producer always sends messages in batches, however, it is possible that a batch may contain only one message. If messages are passed to the Producer faster than it sends them to the cluster, then a batch may have more than one message even with linger.ms = 0.

Batching on Consumer Side (during Consumer Fetching)

  • Consumers also fetch messages in batches. fetch.min.bytes consumer configuration controls the minimum amount of data that the server should return for a fetch request. If there are lesser messages than the specified size, then the broker waits until more messages are available. This reduces the back-and-forth request-response load on the consumer and the broker side respectively. This configuration can also be used when the cluster has a lot of consumers to handle requests from.

  • fetch.max.wait.ms is the maximum amount of time that the broker waits for fetch.min.bytes to finish accumulating after which the messages are returned to the consumer regardless of the size.

  • There is also a max.poll.records Consumer configuration that works purely on the consumer side, and has nothing to do with the fetch request sent to the broker (unlike fetch.min.bytes and fetch.max.wait.ms which dictate the broker as to when it should finally return the records). The consumer caches the records from each fetch request and returns max.poll.records number of records incrementally for every poll() request made. So max.poll.records has no influence on the consumption throughput.

Trade-offs

Producer Throughput vs Producer Latency

  • Batching on Producer side for higher producer throughput trades off with higher producer latency because messages are not sent as soon they are ready to be sent.

  • The latency could manifest as an issue in low load conditions where each message gets delayed until the batch.size fills up, or linger.ms finishes - whichever occurs first.

Consumer Throughput vs Consumer Latency

  • Batching on Consumer side for higher consumption throughput trades off with higher consumption latency because a fetch request will not be served with new messages by the broker until it has enough messages to fulfil the size requested through fetch.min.bytes, or until the expiration of wait time fetch.max.wait.ms.

  • Similar to that of the Producer, the low latency could manifest as an issue when the broker has a low traffic of messages to serve to the consumers.

Compression

  • Compression is possible at both producer-level as well as topic-level. Configuring compression on producer has an advantage of sending fewer bytes compared to the uncompressed equivalent, thus helping higher throughput.

  • Compression is controlled by compression.type Producer configuration (Note: there is also a configuration at broker-level as well as topic-level by the same name). Compression utilizes CPU cycles, but can potentially reduce network bandwidth usage when done on the Producer side.

Trade-offs

CPU Cycles vs Network Bandwidth

  • Enabling compression results in more CPU cycles, less network bandwidth. Producer Throughput vs Producer Latency

  • Compression on Producer can increase producer throughput. It can increase the latency of sending a message to the broker because it adds CPU cycle.

  • Although since compression results in less network bandwidth, a codec that performs well maintaining a fare usage of the CPU, in theory, can help reduce latency as well.

Producer acks for 'Production' Throughput

  • The produced messages are sent to the leader broker of the partition, then the producer waits for a response from the leader broker if acks is not set to 0 (in which case the producer does not wait for any acknowledgement from the broker at all) to know that the sent batch has been saved before proceeding to the next batch.

  • The sooner the leader broker returns a response to the producer, the sooner the producer can send the next batch which should result in higher throughput.

  • acks Producer configuration specifies the number of in-sync replicas (ISRs) that must have sent acknowledgements to the leader broker before the leader broker returns a response to the producer.

  • Sending acks\=1 makes the leader broker return response as soon as it saves the message in its own local log without waiting for an acknowledgement from the ISRs.

  • acks\=all makes the leader broker return a response only after all ISRs have sent acknowledgements. Number of ISRs required to send acknowledgements can be limited by setting min.insync.replicas configuration in the broker/topic.

  • Configuring acks for high throughput trades-off with durability. Throughput should decrease, while at the same time durability guarantee increases in the same order as the following values of acks = ( 0, 1, all ).

Trade-offs

Durability vs Producer Throughput

  • Lower the value of acks, lesser is the durability guarantee but higher is the producer throughput.

Durability vs Producer Latency

  • Lower the value of acks, lesser is the durability guarantee but also lesser is the Producer latency. ####Durability vs End-to-end Latency

  • End-to-end latency is independent of the value of acks, because a message is available for consumers to read only when a message is replicated across all ISRs for consistency. Therefore if end-to-end latency is of more concern rather than the Producer latency, then there is no latency vs durability trade-off to make.

Memory Allocation

  • Kafka producers use a configurable amount of memory to store unset message batches. This memory is controlled by buffer.memory producer configuration.

  • When this buffer memory is reached, the producer waits for max.block.ms for the memory to free up when it receives new send requests. The producer throws an exception on the same thread where the "send" request was made if it had to wait for more than max.block.ms.

  • We can tune buffer.memory considering batch.size, linger.ms and partition count to enable better utilisation of bandwidth across all producers and brokers.

References

Main article - Optimizing for Throughput | Confluent Documentation

  1. How to Choose the Number of Topics/Partitions in a Kafka Cluster?

  2. Kafka: The Definitive Guide

  3. Kafka Producer/Consumer/Topic Configuration documentations (Confluent docs website is easier to follow with better UI, although the content is same as Apache Kafka documentation)

  4. If references to claims made in this doc are not found in the reference sources mentioned above, then it may have been inferred directly from the Kafka code base - github.com/apache/kafka

Did you find this article valuable?

Support Krishna Kumar Mahto by becoming a sponsor. Any amount is appreciated!