High Performance Kafka Producer on AWS

2015/03/02

#kafka #queue #clojure

Table of contents

Context

The modern analytics stack uses some sort low latency data bus to power both real-time and batch pipelines. In this post we are going to take a closer look to Apache Kafka, that can handle billions of events every hour on few nodes using commodity hardware. It tolerates outages on both the producer and the consumer side, or even in the service (a Kafka broker goes down). All of these make it a great fit for analytics especially for real-time, streaming systems. The new re-written library kafka-clients jut got released, containing both the producer and the consumer client but the latter is not implemented yet. Let’s have a look to the new configuration and the code.

Kafka Producer

Producer as the name suggests, sends data to the brokers. It can be operated in sync and async modes. The sync mode is way slower but it guarantees durability for data, while the async mode is extremely fast, but it might lose a small percentage of data in case of a node outages. The performance test will be done for both. The producer send() method has a few versions, the simplest case is just send a message, without a key. The message is a <K,V> object, that has the partitioning information in it.

public final class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final K key;
    private final V value;
    /...
}

If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.

source

Asynchronous mode is implemented in the producer, using Java Futures. That makes it easy for both of the usecases to achieve the behavior they want consider following:

//init
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);

//async
producer.send(record);
//sync
producer.send(record).get(250, TimeUnit.MILLISECONDS);

What happens in case of a failed send can be decided in the application layer (retry, propagate error, etc.) easily. I left out the callback that can be also passed in to the send(). It comes really handy for monitoring how long a request takes or if you would like to re-try the send, maybe save it to local disk for further processing. We can extend the previous code with a simple callback

long sendStart = System.currentTimeMillis();
Callback callback = mycallback.nextCompletion(sendStart, payload.length);
producer.send(record, callback);

This maintains the asynchronous nature of the method but gives us an opportonity to at least gather statistics (latency, size, status) about the outcome.

New Kafka Producer Configuration

The confgurations available with the new producere are the following:

{
    :bootstrap.servers                      "localhost:9092"
    :metadata.fetch.timeout.ms              "1000"
    :metadata.max.age.ms                    "100000"
    :batch.size                             "256"
    :buffer.memory                          "1024000" ; 1000 * 1024
    :acks                                   "-1"
    :timeout.ms                             "250"
    :linger.ms                              "0"
    :client.id                              "test-client"
    :send.buffer.bytes                      "102400" ; 100 * 1024
    :receive.buffer.bytes                   "102400" ; 100 * 1024
    :max.request.size                       "5000000"
    :reconnect.backoff.ms                   "100"
    :block.on.buffer.full                   "true"
    :retries                                "3"
    :retry.backoff.ms                       "100"
    :compression.type                       "snappy"
    :metrics.sample.window.ms               ""
    :metrics.num.samples                    ""
    :metric.reporters                       ""
    :max.in.flight.requests.per.connection  "1024"
    :key.serializer                         "org.apache.kafka.common.serialization.ByteArraySerializer"
    :value.serializer                       "org.apache.kafka.common.serialization.ByteArraySerializer"
 }

The configuration has to be tailored to each use case, each producer has an entirely different set of parameters depending on the role in the pipeline. The new org.apache.kafka.common namespace has the serilazation libraries String and ByteArray are the currently existing types. The entire config is much cleaner and logical than the previous version. The full description of each paramter is available here.

Benchmarking Kafka

For the benchmark we are going to use the slightly modified version of ProducerPerformance. I would like to implement the synchronous send, as above and measure the performance that way too.

Hardware

I used 5 servers for Kafka (3 Zookeepers and 5 brokers) on m3.large (vCPU:2, mem:7.5G) and a raid10 array of 4 x 512G EBS volumes. I think it is not worth the hassle to use raid arrays with EBS, but I wanted to try out just in case any client asks for it. The IO characteristics of the array are better than a single volume, but the rebuild time introduces downtime for the node pontentially of limited performance that might be worse than a complete node outage. Now, the question is, which is better: letting Kafka replicate data to different nodes that resides on a dead node, or creating a layer that does sort of similar recovery on the single node level. I was able achieve 30-40% IOWait with 3 nodes running ProducerPerformance code.

It requires a little more investigation what combination of nodes and EBS type would be the best, but this depends on the volume, the latency requirements and the typical message size. Often there is more than one Kafka cluster if the requirements are so different. Using the cluster for only one set of requirements is the best.

Results

Kafka Throughput

Kafka Throughput

Kafka Throughput

Kafka Latency

Kafka Latency

The test details can be found here, along with other test results. Parameters for ProducerPerformance are included as a header for each run.

[https://github.com/l1x/kafka-bench/blob/master/ec2-52-11-124-236.us-west-2.compute.amazonaws.com/log.5](Example log)

These results are just a quick peek into Kafka’s performance profile, that can be tuned for the certain use case.