The only thing that’s even better than Apache Kafka and Apache Storm is to use the two tools in combination. Unfortunately, their integration can and is still a pretty challenging task, at least judged by the many discussion threads on the respective mailing lists. In this post I am introducing kafka-storm-starter, which contains many code examples that show you how to integrate Apache Kafka 0.8+ with Apache Storm 0.9+, while using Apache Avro as the data serialization format. I will also briefly summarize the current state of their integration on a high level to give you additional context of where the two projects are headed in this regard.

kafka-storm-starter is available at kafka-storm-starter on GitHub.

State of the (integration) game

For the lazy reader here’s the TL;DR version of Kafka and Storm integration:

  • You can indeed integrate Kafka 0.8.1.1 (latest stable) and Storm 0.9.1-incubating (latest stable). I mention this explicitly only to clear up any confusion whatsoever that may have resulted from you reading the mailing lists.
  • The Kafka/Storm integration is, at this time, still more complicated and error prone than it should be. For this reason I released the code project kafka-storm-starter (more details below), which should answer most questions you may have when setting out to connect Storm to Kafka for both reading and writing data. As such kafka-storm-starter can serve as a bootstrapping template to build your own real-time data processing pipelines with Kafka and Storm.
  • In the Storm project we are actively working on closing this integration gap. For instance, we have recently merged the most popular Kafka spout into the core Storm project. This Kafka spout will be included in the next version of Storm, 0.9.2-incubating, which is just around the corner. And the spout is now compatible with the latest Kafka 0.8.1.1. Kudos to P. Taylor Goetz of HortonWorks for acting as the initial sponsor of the storm-kafka component! For more information see external/storm-kafka in the Storm code base.
  • The Kafka project is working on an improved, consolidated consumer API for Kafka 0.9. Take a look at the respective discussions in the kafka-user and kafka-dev mailing lists. The Kafka 0.9 Consumer Rewrite Design document is also worth a read. Moving forward this API initiative should simplify interaction with Kafka in general and integration with storm-kafka in particular.

kafka-storm-starter

Overview and quick start

A few days ago I released kafka-storm-starter as a means to jumpstart developers interested in integrating Kafka 0.8 and Storm 0.9. Without further ado let’s take a first quick look.

Before we start we must grab the latest version of the code, which is implemented in Scala 2.10:

$ git clone https://github.com/miguno/kafka-storm-starter.git
$ cd kafka-storm-starter

We begin the tour by running the test suite:

$ ./sbt test

Notably this command will run end-to-end tests of Kafka, Storm, and Kafka/Storm integration. See this shortened version of the test output:

[...other tests removed...]

[info] KafkaSpec:
[info] Kafka
[info] - should synchronously send and receive a Tweet in Avro format
[info]   + Given a ZooKeeper instance
[info]   + And a Kafka broker instance
[info]   + And some tweets
[info]   + And a single-threaded Kafka consumer group
[info]   + When I start a synchronous Kafka producer that sends the tweets in Avro binary format
[info]   + Then the consumer app should receive the tweets
[info] - should asynchronously send and receive a Tweet in Avro format
[info]   + Given a ZooKeeper instance
[info]   + And a Kafka broker instance
[info]   + And some tweets
[info]   + And a single-threaded Kafka consumer group
[info]   + When I start an asynchronous Kafka producer that sends the tweets in Avro binary format
[info]   + Then the consumer app should receive the tweets
[info] StormSpec:
[info] Storm
[info] - should start a local cluster
[info]   + Given no cluster
[info]   + When I start a LocalCluster instance
[info]   + Then the local cluster should start properly
[info] - should run a basic topology
[info]   + Given a local cluster
[info]   + And a wordcount topology
[info]   + And the input words alice, bob, joe, alice
[info]   + When I submit the topology
[info]   + Then the topology should properly count the words
[info] KafkaStormSpec:
[info] Feature: AvroDecoderBolt[T]
[info]   Scenario: User creates a Storm topology that uses AvroDecoderBolt
[info]     Given a ZooKeeper instance
[info]     And a Kafka broker instance
[info]     And a Storm topology that uses AvroDecoderBolt and that reads tweets from topic testing-input and writes them as-is to topic testing-output
[info]     And some tweets
[info]     And a synchronous Kafka producer app that writes to the topic testing-input
[info]     And a single-threaded Kafka consumer app that reads from topic testing-output
[info]     And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
[info]     When I run the Storm topology
[info]     And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
[info]     Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
[info] Feature: AvroScheme[T] for Kafka spout
[info]   Scenario: User creates a Storm topology that uses AvroScheme in Kafka spout
[info]     Given a ZooKeeper instance
[info]     And a Kafka broker instance
[info]     And a Storm topology that uses AvroScheme and that reads tweets from topic testing-input and writes them as-is to topic testing-output
[info]     And some tweets
[info]     And a synchronous Kafka producer app that writes to the topic testing-input
[info]     And a single-threaded Kafka consumer app that reads from topic testing-output
[info]     And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
[info]     When I run the Storm topology
[info]     And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
[info]     Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
[info] Run completed in 21 seconds, 852 milliseconds.
[info] Total number of tests run: 25
[info] Suites: completed 8, aborted 0
[info] Tests: succeeded 25, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 22 s, completed May 23, 2014 12:31:09 PM

We finish the tour by launching the KafkaStormDemo application:

$ ./sbt run

This demo starts in-memory instances of ZooKeeper, Kafka, and Storm. It then runs a demo Storm topology that connects to and reads from the Kafka instance.

You will see output similar to the following (some parts removed to improve readability):

7031 [Thread-19] INFO  backtype.storm.daemon.worker - Worker 3f7f1a51-5c9e-43a5-b431-e39a7272215e for storm kafka-storm-starter-1-1400839826 on daa60807-d440-4b45-94fc-8dd7798453d2:1027 has finished loading
7033 [Thread-29-kafka-spout] INFO  storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
7050 [Thread-29-kafka-spout] INFO  backtype.storm.daemon.executor - Opened spout kafka-spout:(1)
7051 [Thread-29-kafka-spout] INFO  backtype.storm.daemon.executor - Activating spout kafka-spout:(1)
7051 [Thread-29-kafka-spout] INFO  storm.kafka.ZkCoordinator - Refreshing partition manager connections
7065 [Thread-29-kafka-spout] INFO  storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
7066 [Thread-29-kafka-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition managers: []
7066 [Thread-29-kafka-spout] INFO  storm.kafka.ZkCoordinator - New partition managers: [Partition{host=127.0.0.1:9092, partition=0}]
7083 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Read partition information from: /kafka-spout/kafka-storm-starter/partition_0  --> null
7100 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
7105 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18
7106 [Thread-29-kafka-spout] INFO  storm.kafka.ZkCoordinator - Finished refreshing
7126 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
7126 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527
9128 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
9129 [Thread-29-kafka-spout] INFO  storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527

At this point Storm is connected to Kafka (more precisely: to the testing topic in Kafka). The last few lines from above – “Committing offset …” — will be repeated again and again, because a) this demo Storm topology only reads from the Kafka topic but it does nothing to the data that was read and b) because we are not sending any data to the Kafka topic.

Note: This example will actually run two in-memory instances of ZooKeeper: the first (listening at 127.0.0.1:2181/tcp) is used by the Kafka instance, the second (listening at 127.0.0.1:2000/tcp) is automatically started and used by the in-memory Storm cluster. This is because, when running in local aka in-memory mode, Storm does not allow you to reconfigure or disable its own ZooKeeper instance.

To stop the demo application you must kill or Ctrl-C the process in the terminal.

You can use KafkaStormDemo as a starting point to create your own, “real” Storm topologies that read from a “real” Kafka, Storm, and ZooKeeper infrastructure. An easy way to get started with such an infrastructure is by deploying Kafka, Storm, and ZooKeeper via a tool such as Wirbelsturm.

Features

I showcase the following features in kafka-storm-starter. Note that I focus on showcasing, and not necessarily on “production ready”.

  • How to integrate Kafka and Storm.
  • How to use Avro with Kafka and Storm for serializing and deserializing the data payload. For this I leverage Twitter Bijection and Twitter Chill.
  • Kafka standalone code examples
    • KafkaProducerApp: A simple Kafka producer app for writing Avro-encoded data into Kafka. KafkaSpec puts this producer to use and shows how to use Twitter Bijection to Avro-encode the messages being sent to Kafka.
    • KafkaConsumerApp: A simple Kafka consumer app for reading Avro-encoded data from Kafka. KafkaSpec puts this consumer to use and shows how to use Twitter Bijection to Avro-decode the messages being read from Kafka.
  • Storm standalone code examples
    • AvroDecoderBolt[T]: An AvroDecoderBolt[T <: org.apache.avro.specific.SpecificRecordBase] that can be parameterized with the type of the Avro record T it will deserialize its data to (i.e. no need to write another decoder bolt just because the bolt needs to handle a different Avro schema).
    • AvroScheme[T]: An AvroScheme[T <: org.apache.avro.specific.SpecificRecordBase] scheme, i.e. a custom backtype.storm.spout.Scheme to auto-deserialize a spout’s incoming data. The scheme can be parameterized with the type of the Avro record T it will deserializes its data to (i.e. no need to write another scheme just because the scheme needs to handle a different Avro schema).
      • You can opt to configure a spout (such as the Kafka spout) with AvroScheme if you want to perform the Avro decoding step directly in the spout instead of placing an AvroDecoderBolt after the Kafka spout. You may want to profile your topology which of the two approaches works best for your use case.
    • TweetAvroKryoDecorator: A custom backtype.storm.serialization.IKryoDecorator, i.e. a custom Kryo serializer for Storm.
      • Unfortunately we have not figured out a way to implement a parameterized AvroKryoDecorator[T] variant yet. (A “straight-forward” approach we tried – similar to the other parameterized components – compiled fine but failed at runtime when running the tests). Code contributions are welcome!
  • Kafka and Storm integration
    • AvroKafkaSinkBolt[T]: An AvroKafkaSinkBolt[T <: org.apache.avro.specific.SpecificRecordBase] that can be parameterized with the type of the Avro record T it will serialize its data to before sending the encoded data to Kafka (i.e. no need to write another Kafka sink bolt just because the bolt needs to handle a different Avro schema).
    • Storm topologies that read Avro-encoded data from Kafka: KafkaStormDemo and KafkaStormSpec
    • A Storm topology that writes Avro-encoded data to Kafka: KafkaStormSpec
  • Unit testing
  • Integration testing
    • KafkaSpec: Tests for Kafka, which launch and run against in-memory instances of Kafka and ZooKeeper.
    • StormSpec: Tests for Storm, which launch and run against in-memory instances of Storm and ZooKeeper.
    • KafkaStormSpec: Tests for integrating Storm and Kafka, which launch and run against in-memory instances of Kafka, Storm, and ZooKeeper.

Interested in more?

All the gory details are available at kafka-storm-starter. Apart from the code and build script (sbt) I provide information about how to create Cobertura code coverage reports, to package the code, to create Java “sources” and “javadoc” jars, to generate API docs, to integrate with Jenkins CI and TeamCity build servers, and to set up kafka-storm-starter as a project in IntelliJ IDEA and Eclipse.

Moving forward my plan is to keep kafka-storm-starter up to date with the latest versions of Kafka and Storm. The next version of Storm, 0.9.2, will already simplify the current setup quite a lot. Of course I welcome any code, docs, or similar contributions you may have.

The quest to get there

Just for the historical record here are some of the gotchas that are addressed by kafka-storm-starter, i.e. problems you do not need to solve yourself anymore:

  • Figuring out which Kafka spout in Storm 0.9 works with the latest Kafka 0.8 version. A lot of people tried in vain to use a Kafka spout built for Kafka 0.7 to read from Kafka 0.8. Others didn’t know how to use the available Kafka 0.8 spouts in their code, and so on. In the case of kafka-storm-starter I opted to go with the spout created by wurstmeister, primarily because this spout will soon by the “official” Kafka spout maintained by the Storm project. Unfortunately the latest version of the spout was/is not available in a public Maven repository, so I had take care of that, too, until Storm 0.9.2 will provide the official version.
  • Resolving version conflicts between the various software packages. For instance, Storm 0.9.1 has a transitive dependency on Kryo 2.17 because Storm depends on an old version of Carbonite. This causes problems when trying to use Twitter Bijection or Twitter Chill, because those require a newer version of Kryo. (Apart from that Kryo 2.21 also fixes data corruption issues, so you do want the newer version.) To address this issue I filed STORM-263, which is included in upcoming Storm 0.9.2. Thanks to Sam Ritchie, the maintainer of Carbonite, and everyone else involved to get the patch included. Another example is that you must exclude javax.jms:jms (and a few others) when including Kafka into your build dependencies. Or how to handle Netflix (now: Apache) Curator conflicts.
  • Understanding the various conflicting ZooKeeper versions, and picking a version to go with. Right now Storm and Kafka still prefer very old 3.3.x versions of ZooKeeper, whereas in practice many people run 3.4.x in their infrastructure (e.g. because ZooKeeper 3.4.x is already deployed alongside other infrastructure pieces such as Hadoop clusters when using commercial Hadoop distributions).
  • How to write unit tests for Storm topologies. A lot of people seem to find references to TestingApiDemo.java while searching the Internet but struggle with extracting these examples out of the Storm code base and merging them into their own project.
  • How to write Storm topologies in a way that you can parameterize its components (bolts etc.) with the Avro record type T, so that you don’t need to write a new bolt only because your Avro schema changes. The goal of this code is to show how you can improve the developer/user experience by providing ready-to-use functionality, in this case with regards to (Avro) serialization/deserialization. To tackle this you must understand Storm’s serialization system as well as its run-time behavior.
    • While doing that I discovered a (known) Scala bug when I tried to use TypeTag instead of deprecated Manifest to implement e.g. AvroDecoderBolt[T], see SI-5919. This bug is still not fixed in the latest Scala 2.11.1, by the way.
  • How to write end-to-end Kafka->Storm->Kafka tests.
  • And so on…

Conclusion

I hope you find kafka-storm-starter useful to bootstrap your own Kafka/Storm application. In the Storm community we are actively working on improving and simplifying the Kafka/Storm integration, so please stay tuned and, above all, thanks for your patience. The upcoming 0.9.2 version of Storm is already a first step in the right direction by bundling a Kafka spout that works with the latest stable version of Kafka (0.8.1.1 at the time of this writing).

Now where to go once you have your Kafka and Storm code ready? At this point you can then use a tool such as Wirbelsturm and its associated Puppet modules to deploy production Kafka and Storm clusters and run your own real-time data processing pipelines at scale.

Interested in more? You can subscribe to this blog and follow me on Twitter.