Simple Kafka Consumer In Clojure

2014/05/28

#kafka #queue #clojure

Table of contents

Kafka

Kafka is a high performance publish-subscribe messaging system, implemented as a distributed commit log. The message streams are organized into topics, topics are broken down furter into partitions.

Kafka Anatomy

This system is suitable for realtime applications, using Zookeeper as the strong consistency provider. The high level APIs are pretty easy to use but often misunderstood. I have spent some time trying to figure out why the libraries out there do not work as I thought but after a while I realized it is better to write a very simple library to wrap the functionality I need.

Shovel

Shovel is a minimal wrapper around the Kafka client APIs. The code is mostly documented and type hinted for better performance. Kafka by default tolerates anyservice outages on the consumer side, meaning it is going to resume the operation when the broker comes back, however the producer just simply throws a connection refused exception. This behavior enables us to consume the messages with a simple blocking stream that blocks the execution when there are no new messages or the broker is down. Lets have a closer look how the consumer works.

Consumer

The Kafka consumer consists of few things. First we need to get a ConsumerConnector to connect to the broker. I am using a hashmap for the configuration and convert it to java.util.Properties to create a ConsumerConfig and ConsumerConnector that is returned.

(defn consumer-connector
  "returns a ConsumerConnector that can be used to create consumer streams"
  ^ConsumerConnector [^clojure.lang.PersistentArrayMap h]
  (let [config (ConsumerConfig. (hashmap-to-properties h))]
    (Consumer/createJavaConsumerConnector config)))

The ConsumerConnector can be used to get the message streams (java.util.ArrayList).

(defn message-streams
  "returning the message-streams with a certain topic and thread-pool-size
  message-streams can be processed in threads with simple blocking on empty queue"
  ^java.util.ArrayList [^ConsumerConnector consumer ^String topic ^Integer thread-pool-size]
  (.get
    (.createMessageStreams consumer {topic thread-pool-size}) topic))

Message streams than consumed by a simple iterator. There is the Kafka way of doing that but also the more idiomatic way in Clojure, the later is how I implemented it.

(defn default-iterator
  "processing all streams in a thread and printing the message field for each message"
  [^java.util.ArrayList streams]
  (let [c (async/chan)]
    ;; create a thread for each stream
    (doseq
      [^kafka.consumer.KafkaStream stream streams]
      (let [uuid (uuid)]
        (async/thread
          (async/>!! c
            (doseq
              [^kafka.message.MessageAndMetadata message stream]
              (println (str "uuid: " uuid " :: "(String. (nth (message-to-vec message) 4)))))))))
    ;; read the channel forever
    (while true
      (async/<!! c))))

Each message is a kafka.message.MessageAndMetadata that can be processed the following way:

(defn- message-to-vec
  "returns a vector of all of the message fields"
  [^kafka.message.MessageAndMetadata message]
  [(.topic message) (.offset message) (.partition message) (.key message) (.message message)])

The key and the message is a byte array than can be easily converted to a string.

Producer

A Kafka producer is similar to a consumer, there is a producer connector that can be used to send a message.

(defn producer-connector
  [^clojure.lang.PersistentArrayMap h]
  (info "fn: producer-connector" " config: " h)
  (let [config (ProducerConfig. (hashmap-to-properties h))]
    (Producer. config)))

(defn message
  [topic key value]
  (info "fn: message" " topic: " topic)
  (KeyedMessage. topic key value))

(defn produce
  [^Producer producer ^KeyedMessage message]
  (info "fn: produce message: " message)
  (.send producer message))

Credits

The kudos go to @nikore.