Integrating Kafka and Storm: Code Examples and State of the Game
The only thing that’s even better than Apache Kafka and Apache Storm is to use the two tools in combination. Unfortunately, their integration can and is still a pretty challenging task, at least judged by the many discussion threads on the respective mailing lists. In this post I am introducing kafka-storm-starter, which contains many code examples that show you how to integrate Apache Kafka 0.8+ with Apache Storm 0.9+, while using Apache Avro as the data serialization format. I will also briefly summarize the current state of their integration on a high level to give you additional context of where the two projects are headed in this regard.
State of the (integration) game
For the lazy reader here’s the TL;DR version of Kafka and Storm integration:
- You can indeed integrate Kafka 0.8.1.1 (latest stable) and Storm 0.9.1-incubating (latest stable). I mention this explicitly only to clear up any confusion whatsoever that may have resulted from you reading the mailing lists.
- The Kafka/Storm integration is, at this time, still more complicated and error prone than it should be. For this reason I released the code project kafka-storm-starter (more details below), which should answer most questions you may have when setting out to connect Storm to Kafka for both reading and writing data. As such kafka-storm-starter can serve as a bootstrapping template to build your own real-time data processing pipelines with Kafka and Storm.
- In the Storm project we are actively working on closing this integration gap. For instance, we have recently merged the most popular Kafka spout into the core Storm project. This Kafka spout will be included in the next version of Storm, 0.9.2-incubating, which is just around the corner. And the spout is now compatible with the latest Kafka 0.8.1.1. Kudos to P. Taylor Goetz of HortonWorks for acting as the initial sponsor of the storm-kafka component! For more information see external/storm-kafka in the Storm code base.
- The Kafka project is working on an improved, consolidated consumer API for Kafka 0.9. Take a look at the respective discussions in the kafka-user and kafka-dev mailing lists. The Kafka 0.9 Consumer Rewrite Design document is also worth a read. Moving forward this API initiative should simplify interaction with Kafka in general and integration with storm-kafka in particular.
kafka-storm-starter
Overview and quick start
A few days ago I released kafka-storm-starter as a means to jumpstart developers interested in integrating Kafka 0.8 and Storm 0.9. Without further ado let’s take a first quick look.
Before we start we must grab the latest version of the code, which is implemented in Scala 2.10:
$ git clone https://github.com/miguno/kafka-storm-starter.git
$ cd kafka-storm-starter
We begin the tour by running the test suite:
$ ./sbt test
Notably this command will run end-to-end tests of Kafka, Storm, and Kafka/Storm integration. See this shortened version of the test output:
[...other tests removed...]
[info] KafkaSpec:
[info] Kafka
[info] - should synchronously send and receive a Tweet in Avro format
[info] + Given a ZooKeeper instance
[info] + And a Kafka broker instance
[info] + And some tweets
[info] + And a single-threaded Kafka consumer group
[info] + When I start a synchronous Kafka producer that sends the tweets in Avro binary format
[info] + Then the consumer app should receive the tweets
[info] - should asynchronously send and receive a Tweet in Avro format
[info] + Given a ZooKeeper instance
[info] + And a Kafka broker instance
[info] + And some tweets
[info] + And a single-threaded Kafka consumer group
[info] + When I start an asynchronous Kafka producer that sends the tweets in Avro binary format
[info] + Then the consumer app should receive the tweets
[info] StormSpec:
[info] Storm
[info] - should start a local cluster
[info] + Given no cluster
[info] + When I start a LocalCluster instance
[info] + Then the local cluster should start properly
[info] - should run a basic topology
[info] + Given a local cluster
[info] + And a wordcount topology
[info] + And the input words alice, bob, joe, alice
[info] + When I submit the topology
[info] + Then the topology should properly count the words
[info] KafkaStormSpec:
[info] Feature: AvroDecoderBolt[T]
[info] Scenario: User creates a Storm topology that uses AvroDecoderBolt
[info] Given a ZooKeeper instance
[info] And a Kafka broker instance
[info] And a Storm topology that uses AvroDecoderBolt and that reads tweets from topic testing-input and writes them as-is to topic testing-output
[info] And some tweets
[info] And a synchronous Kafka producer app that writes to the topic testing-input
[info] And a single-threaded Kafka consumer app that reads from topic testing-output
[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
[info] When I run the Storm topology
[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
[info] Feature: AvroScheme[T] for Kafka spout
[info] Scenario: User creates a Storm topology that uses AvroScheme in Kafka spout
[info] Given a ZooKeeper instance
[info] And a Kafka broker instance
[info] And a Storm topology that uses AvroScheme and that reads tweets from topic testing-input and writes them as-is to topic testing-output
[info] And some tweets
[info] And a synchronous Kafka producer app that writes to the topic testing-input
[info] And a single-threaded Kafka consumer app that reads from topic testing-output
[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
[info] When I run the Storm topology
[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
[info] Run completed in 21 seconds, 852 milliseconds.
[info] Total number of tests run: 25
[info] Suites: completed 8, aborted 0
[info] Tests: succeeded 25, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 22 s, completed May 23, 2014 12:31:09 PM
We finish the tour by launching the KafkaStormDemo application:
$ ./sbt run
This demo starts in-memory instances of ZooKeeper, Kafka, and Storm. It then runs a demo Storm topology that connects to and reads from the Kafka instance.
You will see output similar to the following (some parts removed to improve readability):
7031 [Thread-19] INFO backtype.storm.daemon.worker - Worker 3f7f1a51-5c9e-43a5-b431-e39a7272215e for storm kafka-storm-starter-1-1400839826 on daa60807-d440-4b45-94fc-8dd7798453d2:1027 has finished loading
7033 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
7050 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Opened spout kafka-spout:(1)
7051 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Activating spout kafka-spout:(1)
7051 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections
7065 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: []
7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [Partition{host=127.0.0.1:9092, partition=0}]
7083 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Read partition information from: /kafka-spout/kafka-storm-starter/partition_0 --> null
7100 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
7105 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18
7106 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing
7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527
9128 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
9129 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527
At this point Storm is connected to Kafka (more precisely: to the testing
topic in Kafka). The last few lines from
above – “Committing offset …” — will be repeated again and again, because a) this demo Storm topology only reads
from the Kafka topic but it does nothing to the data that was read and b) because we are not sending any data to the
Kafka topic.
To stop the demo application you must kill or Ctrl-C
the process in the terminal.
You can use KafkaStormDemo as a starting point to create your own, “real” Storm topologies that read from a “real” Kafka, Storm, and ZooKeeper infrastructure. An easy way to get started with such an infrastructure is by deploying Kafka, Storm, and ZooKeeper via a tool such as Wirbelsturm.
Features
I showcase the following features in kafka-storm-starter. Note that I focus on showcasing, and not necessarily on “production ready”.
- How to integrate Kafka and Storm.
- How to use Avro with Kafka and Storm for serializing and deserializing the data payload. For this I leverage Twitter Bijection and Twitter Chill.
- Kafka standalone code examples
- KafkaProducerApp: A simple Kafka producer app for writing Avro-encoded data into Kafka. KafkaSpec puts this producer to use and shows how to use Twitter Bijection to Avro-encode the messages being sent to Kafka.
- KafkaConsumerApp: A simple Kafka consumer app for reading Avro-encoded data from Kafka. KafkaSpec puts this consumer to use and shows how to use Twitter Bijection to Avro-decode the messages being read from Kafka.
- Storm standalone code examples
- AvroDecoderBolt[T]:
An
AvroDecoderBolt[T <: org.apache.avro.specific.SpecificRecordBase]
that can be parameterized with the type of the Avro recordT
it will deserialize its data to (i.e. no need to write another decoder bolt just because the bolt needs to handle a different Avro schema). - AvroScheme[T]:
An
AvroScheme[T <: org.apache.avro.specific.SpecificRecordBase]
scheme, i.e. a custombacktype.storm.spout.Scheme
to auto-deserialize a spout’s incoming data. The scheme can be parameterized with the type of the Avro recordT
it will deserializes its data to (i.e. no need to write another scheme just because the scheme needs to handle a different Avro schema).- You can opt to configure a spout (such as the Kafka spout) with
AvroScheme
if you want to perform the Avro decoding step directly in the spout instead of placing anAvroDecoderBolt
after the Kafka spout. You may want to profile your topology which of the two approaches works best for your use case.
- You can opt to configure a spout (such as the Kafka spout) with
- TweetAvroKryoDecorator:
A custom
backtype.storm.serialization.IKryoDecorator
, i.e. a custom Kryo serializer for Storm.- Unfortunately we have not figured out a way to implement a parameterized
AvroKryoDecorator[T]
variant yet. (A “straight-forward” approach we tried – similar to the other parameterized components – compiled fine but failed at runtime when running the tests). Code contributions are welcome!
- Unfortunately we have not figured out a way to implement a parameterized
- AvroDecoderBolt[T]:
An
- Kafka and Storm integration
- AvroKafkaSinkBolt[T]:
An
AvroKafkaSinkBolt[T <: org.apache.avro.specific.SpecificRecordBase]
that can be parameterized with the type of the Avro recordT
it will serialize its data to before sending the encoded data to Kafka (i.e. no need to write another Kafka sink bolt just because the bolt needs to handle a different Avro schema). - Storm topologies that read Avro-encoded data from Kafka: KafkaStormDemo and KafkaStormSpec
- A Storm topology that writes Avro-encoded data to Kafka: KafkaStormSpec
- AvroKafkaSinkBolt[T]:
An
- Unit testing
- AvroDecoderBoltSpec
- AvroSchemeSpec
- And more under src/test/scala
- Integration testing
- KafkaSpec: Tests for Kafka, which launch and run against in-memory instances of Kafka and ZooKeeper.
- StormSpec: Tests for Storm, which launch and run against in-memory instances of Storm and ZooKeeper.
- KafkaStormSpec: Tests for integrating Storm and Kafka, which launch and run against in-memory instances of Kafka, Storm, and ZooKeeper.
Interested in more?
All the gory details are available at kafka-storm-starter. Apart from the code and build script (sbt) I provide information about how to create Cobertura code coverage reports, to package the code, to create Java “sources” and “javadoc” jars, to generate API docs, to integrate with Jenkins CI and TeamCity build servers, and to set up kafka-storm-starter as a project in IntelliJ IDEA and Eclipse.
Moving forward my plan is to keep kafka-storm-starter up to date with the latest versions of Kafka and Storm. The next version of Storm, 0.9.2, will already simplify the current setup quite a lot. Of course I welcome any code, docs, or similar contributions you may have.
The quest to get there
Just for the historical record here are some of the gotchas that are addressed by kafka-storm-starter, i.e. problems you do not need to solve yourself anymore:
- Figuring out which Kafka spout in Storm 0.9 works with the latest Kafka 0.8 version. A lot of people tried in vain to
use a Kafka spout built for Kafka 0.7 to read from Kafka 0.8. Others didn’t know how to use the available Kafka 0.8
spouts in their code, and so on. In the case of kafka-storm-starter I opted to go with the spout created by
wurstmeister, primarily because this spout will soon by the
“official” Kafka spout maintained by the Storm project. Unfortunately the latest version of the spout was/is not
available in a public Maven repository, so I had take care of that, too, until Storm 0.9.2 will provide the official
version.
- Alternatively you can also try Kafka spout of HolmesNL, developed by Mattijs Ugen. I don’t want to talk about the differences to the wurstmeister spout in detail, but essentially the wurstmeister spout uses the Simple Consumer API of Kafka 0.8 whereas the Mattijs’ spout uses the High Level Consumer API.
- Resolving version conflicts between the various software packages. For instance, Storm 0.9.1 has a transitive
dependency on Kryo 2.17 because Storm depends on an old version of Carbonite.
This causes problems when trying to use Twitter Bijection or Twitter Chill, because those require a newer version of
Kryo. (Apart from that Kryo 2.21 also fixes data corruption issues, so you do want the newer version.) To address
this issue I filed STORM-263, which is included in upcoming
Storm 0.9.2. Thanks to Sam Ritchie, the maintainer of Carbonite, and everyone else
involved to get the patch included.
Another example is that you must exclude
javax.jms:jms
(and a few others) when including Kafka into your build dependencies. Or how to handle Netflix (now: Apache) Curator conflicts. - Understanding the various conflicting ZooKeeper versions, and picking a version to go with. Right now Storm and Kafka still prefer very old 3.3.x versions of ZooKeeper, whereas in practice many people run 3.4.x in their infrastructure (e.g. because ZooKeeper 3.4.x is already deployed alongside other infrastructure pieces such as Hadoop clusters when using commercial Hadoop distributions).
- How to write unit tests for Storm topologies. A lot of people seem to find references to TestingApiDemo.java while searching the Internet but struggle with extracting these examples out of the Storm code base and merging them into their own project.
- How to write Storm topologies in a way that you can parameterize its components (bolts etc.) with the Avro record type
T
, so that you don’t need to write a new bolt only because your Avro schema changes. The goal of this code is to show how you can improve the developer/user experience by providing ready-to-use functionality, in this case with regards to (Avro) serialization/deserialization. To tackle this you must understand Storm’s serialization system as well as its run-time behavior.- While doing that I discovered a (known) Scala bug when I tried to use
TypeTag
instead of deprecatedManifest
to implement e.g.AvroDecoderBolt[T]
, see SI-5919. This bug is still not fixed in the latest Scala 2.11.1, by the way.
- While doing that I discovered a (known) Scala bug when I tried to use
- How to write end-to-end Kafka->Storm->Kafka tests.
- And so on…
Conclusion
I hope you find kafka-storm-starter useful to bootstrap your own Kafka/Storm application. In the Storm community we are actively working on improving and simplifying the Kafka/Storm integration, so please stay tuned and, above all, thanks for your patience. The upcoming 0.9.2 version of Storm is already a first step in the right direction by bundling a Kafka spout that works with the latest stable version of Kafka (0.8.1.1 at the time of this writing).
Now where to go once you have your Kafka and Storm code ready? At this point you can then use a tool such as Wirbelsturm and its associated Puppet modules to deploy production Kafka and Storm clusters and run your own real-time data processing pipelines at scale.