Skip to main content
Pure Technical Services

Best Practices for Apache Kafka running on Kubernetes with Portworx

Currently viewing public documentation. Please login to access the full scope of documentation.

The following sections will offer the best practices or guidelines to follow when running Kafka on Kubernetes with Portworx.  This is not an extensive list as these are based on our internal testing of the application and documented what we feel offers the best experience, be it performance or stability or availability, when running Kafka on Kubernetes.

Backend Storage

As Portworx is a software-defined storage solution that provides container-granular storage, you can use any backing stores, from directly attached NVMe/SSD or HDD storage, on-prem SAN like Pure Storage FlashArray, cloud-based block storage such as Amazon EBS or Azure Disk.  As Kafka application is all about high performance, higher throughput, and lower latency any storage with flash medium should be the preferred choice. 

You need at least three worker nodes in your Portworx cluster so that Portworx can replicate data across nodes.

Evaluate the flash storage based on your throughput needs. There are pros and cons to having locally attached NVMe/SSD drives to using enterprise all-flash storage.  The direct attached storage model offers higher bandwidth when a similar server with locally attached NVMe drives is added to the cluster but for higher availability of the data you have to mirror the data between the drives within the server and it doesn’t offer any additional data services like encryption, compression.    

Meanwhile Pure Storage FlashArray//X is the first mainstream, 100% NVMe, enterprise class all-flash array that offers high performance with in-built data services like encryption, data reduction through deduplication and compression, data protection through RAID-HA and non-disruptive upgrades.   Based on your application bandwidth requirements, data retention and the workload profile, determine the right FlashArray model.   

Broker Pod Availability

The advantage of running applications like Kafka on containers is the app level abstraction.  This enables Container Orchestration platforms like Kubernetes to scale them as needed and manage the container failures seamlessly.  If a Kafka broker pod fails in Kubernetes due to the node failure, Kubernetes with the help of Portworx can respawn the broker pod on a node that has the portworx replica.  This is enabled by the Portworx Platform’s storage-aware scheduling called STORK (STorage Orchestrator Runtime for Kubernetes).  STORK performs health monitoring of Portworx services, kubelet, and other components of the system, and when a broker failure is detected, it will reach faster than kube-scheduler in rescheduling the broker pod to a healthy node.   

When a failed broker is respawned on a new node with access to the original volume, the broker has all the topics and partitions for it to join the list of in-sync replicas.  Hence, the broker does not need to replicate huge amounts of data from the ‘leader’ across the network, rather only catch up with the offsets that were added since the broker went offline.

This feature allows the Kafka Broker pod to be available within a short period of time, the time STORK takes to reschedule the pod, as opposed to hours or days in a non-container world, especially when the Kafka broker fails permanently.  

The broker pod's availability time after a failure is generally around three minutes or more as STORK scans every two minutes (configurable) for any node failure and once it finds a node failure, it waits for a minute before it attaches the volume to another node.  To reduce this time by half, you can configure the parameter --health-monitor-interval from the default of 120 seconds to 30 seconds.

To change the settings, run the following command to edit the stork’s option.

kubectl edit deploy stork -n kube-system

and change the value --health-monitor-interval to 30 in the editor and save the changes.  You should see the message deployment.apps/stork edited.

kubectl edit deploy stork -n kube-system
deployment.apps/stork edited

Unless the broker respawn time of three minutes or more is not good enough, you can consider changing the settings.  

Note: In the absence of STORK, the default kube scheduler timeout to reschedule a pod after a node failure is over 5 minutes as the pod-eviction-timeout within the kube-controller-manager is 5 minutes.  

Kafka Replication with Portworx

Kafka, being designed as a distributed and horizontally scalable streaming system, relies on the replication factor of topic-partitions to achieve higher availability of the data if a broker has to fail.  As such for production readiness, you should never run Kafka with the default replication factor of 1 even if you are running the application on Kubernetes with Portworx.  The general recommendation is to use either 2 or 3 for the replication factor which allows either 1 or 2 broker failures and still have access to the data.

Portworx enables high availability of the application data through replication at the storage level.  Portworx allows you to set the replication between 1 and 3 where 3 offers the highest level of data protection and availability.   

Given that both Kafka and Portworx provide replication, what is the ideal replication setting to run Kafka on Kubernetes with Portworx?

Kafka RF*

PX Repl

Kafka Availability

Broker Recovery & Rebalance Time

Space Usage

Pod Failure & Reschedule

Storage Node Failure

Pod Failure & Reschedule

Storage Node Failure

3

1

Can sustain two failures

Can sustain two failures

Seconds

Minutes/Hours

High

2

2

Can sustain one failure

Can sustain one failure

Seconds

Seconds

Higher

3

NA**

NA**

Can sustain two failures

NA**

Hours/Days

High

RF* = Replication factor
** Portworx and Kubernetes were not used in this scenario but Kafka was setup with direct-attached storage

Based on our testing, all replication factor combinations across Kafka and Portworx show us that the main decisions when architecting Kafka on Kubernetes are centered around availability, recovery times, and space usage.  Our testing revealed that two options were ideal for running Kafka, depending on your business requirements around tolerance to failures.  

Option 1: Fast Recoverability

Portworx replication and Kafka replication at two.

With replication set to two, Portworx replicates the volume at the storage layer which enables quick recovery from pod-, node-, network-, and disk-level failures by quickly respawning the failed broker on another Kubernetes node with the original volume attached.  During that short period, when the pod is rescheduled, Kafka’s replication factor of two can handle the reassignment of leadership.  Portworx reduces the overall broker pod failure time through STORK and brings back the broker with the original data which significantly improves the data synchronization by the failed broker with its surviving replica and this eliminates the need for Kafka admins to perform any partition rebuild over the network after hard node failures. This option offers quicker recoverability, better availability, resilience, and operational efficiency gains at the cost of some additional storage.

Option 2: Kafka standard configuration

Kafka replication at three and Portworx replication at one.

This is the standard configuration most Kafka users are used to. This is equivalent to running Kafka on a non-Kubernetes environment with Kafka replication at three but with some additional benefits.  With this option, during a pod failure or the node failure that doesn’t have the Portworx replica, Portworx can still reattach its virtual volumes from the surviving nodes to the respawned broker pod on another node.  However, if the node that fails owns the Portworx replica, the broker will be unavailable until recovery occurs.  During this time, Kafka is still operational as it has two other replicas and when the failed node comes back, the respawned broker can synchronize the data from the other Kafka brokers.  

It is still operationally better to have Portworx with a single replica than having storage directly on the hosts as in the traditional setup because Portworx can move and reattach its virtual volumes to any other Portworx nodes.  This is possible as long as it has an available replica, even if the failed node running the broker is unavailable.  This is not something that is possible with the directly attached storage.

Producer Settings

The producer configurations in Kafka can play a major role in achieving certain key characteristics like message ordering, message durability, overall throughput and performance.  While there are various producer configurations, we will focus on the following based on our internal testing.

Message Durability

The message durability in Kafka is controlled through the acks setting which supports three values - 0, 1 and all (-1). An ack is an acknowledgement that the producer gets from a Kafka broker to ensure that the message has been successfully committed to that broker.  The config acks is the number of acknowledgements the producer needs to receive before considering a successful commit.

acks=0:  You should never set acks to 0 in a production environment as the producer does not wait for a response and assumes the write is successful once the request is sent out.

acks=1: This is the default setting and the producer will wait for the ack and will consider a successful commit when it receives an ack from the leader broker but the leader doesn’t wait for confirmation from all replicas. The trade-off with this setting is to tolerate lower durability for better performance.

acks=all: This setting also makes the producer wait for the ack and in this case the leader will wait for the full set of in-sync replicas to acknowledge the message and to consider it committed.  This gives the best available guarantee that the record will not be lost as long as at least one in-sync replica remains alive.  The trade-off with this setting is to tolerate lower throughput/higher latency for better durability as the leader broker has to wait for acknowledgements from replicas before responding to the producer.   Our recommendation is to use acks=all if you want the highest available guarantee of data.

Message Throughput

To optimize for throughput, the producers need to send as much data as they can at a faster rate.  If the producers are sending several messages to the same partition in a broker, they can be sent as a batch.  Generally a smaller batch leads to more requests and queuing which can result in higher latency.  Increasing the batch size along with time spent waiting for the batch to fill up with messages can yield higher throughput and reduce load on producers.  It also reduces the broker's CPU overhead to process each request if those were sent all the time.  Following are some of the key configurations that play a vital role in achieving higher throughput.

batch.size: This allows the producer to batch the messages to the size set before sending the messages to the broker.  The default batch.size is 16384.  If your message size is higher than the batch.size then you should increase your batch.size to have better throughput with improved latency. 

linger.ms: This allows the producer to wait until this time (in milliseconds) before sending the data to the broker. The default setting is 0 meaning no delay.  The trade-off with this parameter is the added latency.

compression.type: The compression type for the data generated by the producer.  The default value is none (or no compression).  Valid values are none, gzip, snappy, lz4 or zstd.  If compression is enabled the producer compresses the completed batch.  Hence more data can be sent to the producer.  Compression helps to send more payload for the same batch size which again can improve the throughput.

Use of compression comes with additional CPU cycles at the producer level and the compression ratio.  gzip offers the best compression but comes with a very high CPU usage.  On the other hand, lz4 uses the lowest CPU but the compression ratio is lower than every other compression type.   snappy and zstd use a moderate level of CPU and provide a medium level of compression ratio compared to the other.  For performance reasons we do not recommend using gzip.  You can choose between lz4 and snappy based on your requirements.  

To get better throughput, update both batch.size and linger.ms parameters.  Increasing the batch.size without changing linger.ms from 0 might not yield the throughput you expect and at the same time might be adding more load to both producer and brokers by sending way too many smaller requests.  If you want higher throughput at some added latency, we recommend that you increase batch.size and linger.ms and enable compression at the producer level.

There is no one specific setting of batch.size and linger.ms that anyone can recommend.  The batching depends on various factors like the type of data, its compressibility, available CPU and network resources etc,.  Hence, we recommend you to test your data to identify the right settings for these parameters to achieve higher throughput.

socket.send.buffer.bytes: The size of the socket send buffer.  The default is 100KB.  For higher throughput environments, this value might not be sufficient and we recommend increasing this value.  You can consider increasing this to 8MB or 16MB if you have high-bandwidth networks and enough memory.  If not, you can set this as low as 1 MB.

buffer.memory: This total memory (in bytes) that the producer can use to buffer records that are waiting to be sent to the broker. The default value is 33,554,432 or 32MB.  If you have a lot of partitions, you might want to increase this setting.

Consumer Settings

Like the batching of messages with producers, Kafka batches messages for consumers to achieve higher throughput.  

socket.receive.buffer.size: The size of the socket receive buffer.  The default is 100KB or 102400 bytes which might not be sufficient if you have a higher volume of data to be published and read.  You can consider increasing this to 8MB or 16MB if you have high-bandwidth networks and enough memory.  If not, you can go down to 1MB.

fetch.min.bytes: This parameter sets the minimum number of bytes expected for a fetch response from a consumer. Increasing this will reduce the number of fetch requests made to the broker, reducing the broker CPU overhead to process each fetch.  

Broker Settings

compression.type: The compression type for the data to be performed by the broker.  Valid values are none, gzip, snappy, lz4, zstd, producer.  The value ‘producer’ means to retain the original compression codec set by the producer. Our recommendation is to set this value to ‘producer’ so the producer can take the responsibility of compressing the data and allow broker CPUs to be used for other critical functions.

Partitions: The topic partition is the unit of parallelism in Kafka.  Both partition writes (through producer/broker) and reads (through consumer) can be done fully in parallel.  In general, the more the partitions, the higher the throughput you can achieve.  There are some disadvantages with too many partitions, like more partitions may increase unavailability and also require more open file handles.  There is no magic number to the ideal partition settings and hence determine the same based on your throughput requirements. 

You can find more ideal settings for the producers, consumers and brokers in the Apache Kafka documentation.

Portworx Best Practices

Please follow the Production Readiness document for all the prerequisites, best practices to run Portworx.

Volume Management

Portworx volumes are thin-provisioned by default.  Ensure the volume capacity is monitored and necessary actions are taken through PX-Autopilot to avoid any outage to the stateful applications like Kafka.

High Availability of stateful applications

Applications like Kafka that need to be highly available and resistant to any failures in the worker node like CPU, memory, drive, or power should use Portworx replicated volumes.  

Portworx allows upto 3 replicated copies of the volumes and the replication setting of two is generally recommended. Portworx also offers the rack parameter that can be used to place replicated volumes across failure domains.

StorageClass

Portworx allows administrators to pick and choose or create “classes” of storage it offers.  Following example creates a StorageClass for a PVC with replication factor of 2 with higher IO priority.

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: px-storageclass-kafka
provisioner: kubernetes.io/portworx-volume
allowVolumeExpansion: true
parameters:
  repl: "2"
  priority_io: "high"


For all the parameters supported by Portworx, please check this table in the "Dynamic Provisioning of PVCs" topic.