In this article I describe how to install, configure and run a multi-broker Apache Kafka 0.8 (trunk) cluster on a single machine. The final setup consists of one local ZooKeeper instance and three local Kafka brokers. We will test-drive the setup by sending messages to the cluster via a console producer and receive those messages via a console receiver. I will also describe how to build Kafka for Scala 2.9.2, which makes it much easier to integrate Kafka with other Scala-based frameworks and tools that require Scala 2.9 instead of Kafka’s default Scala 2.8.
What we want to do
Here is an overview of what we want to do:
- Build Kafka 0.8-trunk for Scala 2.9.2.
- I also provide instructions for the default 2.8.0, just in case.
- Use a single machine for this Kafka setup.
- Run 1 ZooKeeper instance on that machine.
- Run 3 Kafka brokers on that machine.
- Create a Kafka topic called “zerg.hydra” and send/receive messages for that topic via the console. The topic will be configured to use 3 partitions and 2 replicas per partition.
The purpose of this article is not to present a production-ready configuration of a Kafka cluster. However it should get you started with using Kafka as a distributed messaging system in your own infrastructure.
Background: Why Kafka and Scala 2.9?
Personally I’d like to use Scala 2.9.2 for Kafka – which is still built for Scala 2.8.0 by default as of today – because many related software packages that are of interest to me (such as Finagle, Kestrel) are based on Scala 2.9. Also, the current versions of many development and build tools (e.g. IDEs, sbt) for Scala require at least version 2.9. If you are working in a similar environment you may want build Kafka for Scala 2.9 just like I did – otherwise you can expect to run into issues such as Scala version conflicts.
Option 1 (preferred): Kafka 0.8-trunk with Scala 2.9.2
Unfortunately the current trunk of Kafka has problems to build against Scala 2.9.2 out of the box. I created a fork of Kafka 0.8-trunk that includes the required fix (a change of one file) in the branch “scala-2.9.2”. The fix ties the Scala version used by Kafka’s shell scripts to 2.9.2 instead of 2.8.0.
The following instructions will use this fork to download, build and install Kafka for Scala 2.9.2:
$ cd $HOME $ git clone firstname.lastname@example.org:miguno/kafka.git $ cd kafka # this branch of includes a patched bin/kafka-run-class.sh for Scala 2.9.2 $ git checkout -b scala-2.9.2 remotes/origin/scala-2.9.2 $ ./sbt update $ ./sbt "++2.9.2 package"
Option 2: Kafka 0.8-trunk with Scala 2.8.0
If you are fine with Scala 2.8 you need to build and install Kafka as follows.
$ cd $HOME $ git clone email@example.com:apache/kafka.git $ cd kafka $ ./sbt update $ ./sbt package
Configuring and running Kafka
Unless noted otherwise all commands below assume that you are in the top level directory of your Kafka installation.
If you followed the instructions above, this directory is
Configure your OS
For Kafka 0.8 it is recommended to increase the maximum number of open file handles because due to changes in 0.8 Kafka will keep more file handles open than in 0.7. The exact number depends on your usage patterns, of course, but on the Kafka mailing list the ballpark figure “tens of thousands” was shared:
In Kafka 0.8, we keep the file handles for all segment files open until they are garbage collected. Depending on the size of your cluster, this number can be pretty big. Few 10 K or so.
For instance, to increase the maximum number of open file handles for the user
kafka to 98,304 (change
whatever user you are running the Kafka daemons with – this can be your own user account, of course) you must add the
following line to
Kafka ships with a reasonable default ZooKeeper configuration for our simple use case. The following command launches a local ZooKeeper instance.
By default the ZooKeeper server will listen on
Configure and start the Kafka brokers
We will create 3 Kafka brokers, whose configurations are based on the default
config/server.properties. Apart from
the settings below the configurations of the brokers are identical.
The first broker:
config/server1.properties and replace the existing config values as follows:
broker.id=1 port=9092 log.dir=/tmp/kafka-logs-1
The second broker:
config/server2.properties and replace the existing config values as follows:
broker.id=2 port=9093 log.dir=/tmp/kafka-logs-2
The third broker:
config/server3.properties and replace the existing config values as follows:
broker.id=3 port=9094 log.dir=/tmp/kafka-logs-3
Now you can start each Kafka broker in a separate console:
Here is a summary of the configured network interfaces and ports that the brokers will listen on:
Broker 1 Broker 2 Broker 3 ---------------------------------------------- Kafka *:9092/tcp *:9093/tcp *:9094/tcp JMX *:9999/tcp *:10000/tcp *:10001/tcp
Excursus: Topics, partitions and replication in Kafka
In a nutshell Kafka partitions incoming messages for a topic, and assigns those partitions to the available Kafka brokers. The number of partitions is configurable and can be set per-topic and per-broker.
First the stream [of messages] is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order.
A new feature of Kafka 0.8 is that those partitions will be now be replicated across Kafka brokers to make the cluster more resilient against host failures:
Partitions are now replicated. Previously the topic would remain available in the case of server failure, but individual partitions within that topic could disappear when the server hosting them stopped. If a broker failed permanently any unconsumed data it hosted would be lost. Starting with 0.8 all partitions have a replication factor and we get the prior behavior as the special case where replication factor = 1. Replicas have a notion of committed messages and guarantee that committed messages won’t be lost as long as at least one replica survives. Replica are byte-for-byte identical across replicas.
Producer and consumer are replication aware. When running in sync mode, by default, the producer send() request blocks until the messages sent is committed to the active replicas. As a result the sender can depend on the guarantee that a message sent will not be lost. Latency sensitive producers have the option to tune this to block only on the write to the leader broker or to run completely async if they are willing to forsake this guarantee. The consumer will only see messages that have been committed.
The following diagram illustrates the relationship between topics, partitions and replicas.
Logically this relationship is very similar to how Hadoop manages blocks and replication in HDFS.
When a topic is created in Kafka 0.8, Kafka determines how each replica of a partition is mapped to a broker. In general Kafka tries to spread the replicas across all brokers (source). Messages are first sent to the first replica of a partition (i.e. to the current “leader” broker of that partition) before they are replicated to the remaining brokers. Message producers may choose from different strategies for sending messages (e.g. synchronous mode, asynchronous mode). Producers discover the available brokers in a cluster and the number of partitions on each, by registering watchers in ZooKeeper.
If you wonder how to configure the number of partitions per topic/broker, here’s feedback from LinkedIn developers:
At LinkedIn, some of the high volume topics are configured with more than 1 partition per broker. Having more partitions increases I/O parallelism for writes and also increases the degree of parallelism for consumers (since partition is the unit for distributing data to consumers). On the other hand, more partitions adds some overhead: (a) there will be more files and thus more open file handlers; (b) there are more offsets to be checkpointed by consumers which can increase the load of ZooKeeper. So, you want to balance these tradeoffs.
Create a Kafka topic
In Kafka 0.8, there are 2 ways of creating a new topic:
- Turn on
auto.create.topics.enableoption on the broker. When the broker receives the first message for a new topic, it creates that topic with
- Use the admin command
- Note: In Kafka 0.8.0 release use the admin command
kafka-topics.shwas removed in the release version and split into
- Note: In Kafka 0.8.0 release use the admin command
We will use the latter approach. The following command creates a new topic “zerg.hydra”. The topic is configured to use 3 partitions and a replication factor of 2. Note that in a production setting we’d rather set the replication factor to 3, but a value of 2 is better for illustrative purposes (i.e. we intentionally use different values for the number of partitions and replications to better see the effects of each setting).
This has the following effects:
- Kafka will create 3 logical partitions for the topic.
- Kafka will create a total of two replicas (copies) per partition. For each partition it will pick two brokers that will host those replicas. For each partition Kafka will elect a “leader” broker.
Ask Kafka for a list of available topics. The list should include the new
1 2 3 4
You can also inspect the configuration of the topic as well as the currently assigned brokers per partition and replica. Because a broker can only host a single replica per partition, Kafka has opted to use a broker’s ID also as the corresponding replica’s ID.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
In this example output the first broker (with
broker.id = 1) happens to be the designated leader for partition 0
at the moment. Similarly, the second and third brokers are the leaders for partitions 1 and 2, respectively.
The following diagram illustrates the setup (and also includes the producer and consumer that we will run shortly).
You can also inspect the local filesystem to see how the
--describe output above matches actual files. By default
Kafka persists topics as “log files” (Kafka terminology) in the
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
Start a producer
Start a console producer in
Example producer output:
[...] INFO Verifying properties (kafka.utils.VerifiableProperties) [...] INFO Property broker.list is overridden to localhost:9092,localhost:9093,localhost:9094 (...) [...] INFO Property compression.codec is overridden to 0 (kafka.utils.VerifiableProperties) [...] INFO Property key.serializer.class is overridden to kafka.serializer.StringEncoder (...) [...] INFO Property producer.type is overridden to sync (kafka.utils.VerifiableProperties) [...] INFO Property queue.buffering.max.messages is overridden to 10000 (...) [...] INFO Property queue.buffering.max.ms is overridden to 1000 (kafka.utils.VerifiableProperties) [...] INFO Property queue.enqueue.timeout.ms is overridden to 0 (kafka.utils.VerifiableProperties) [...] INFO Property request.required.acks is overridden to 0 (kafka.utils.VerifiableProperties) [...] INFO Property request.timeout.ms is overridden to 1500 (kafka.utils.VerifiableProperties) [...] INFO Property send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties) [...] INFO Property serializer.class is overridden to kafka.serializer.StringEncoder (...)
You can now enter new messages, one per line. Here we enter two messages “Hello, world!” and “Rock: Nerf Paper. Scissors is fine.”:
Hello, world! Rock: Nerf Paper. Scissors is fine.
After the messages are produced, you should see the data being replicated to the three log directories for each of the
broker instances, i.e.
Start a consumer
Start a console consumer that reads messages in
zerg.hydra from the beginning (in a production setting you
would usually NOT want to add the
The consumer will see a new message whenever you enter a message in the producer above.
Example consumer output:
<snipp> [...] INFO [console-consumer-28434_panama.local-1363174829799-954ed29e], Connecting to zookeeper instance at localhost:2181 ... [...] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [...] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT ... [...] INFO Client environment:host.name=192.168.0.153 (org.apache.zookeeper.ZooKeeper) <snipp> [...] INFO Fetching metadata with correlation id 0 for 1 topic(s) Set(zerg.hydra) (kafka.client.ClientUtils$) [...] INFO Connected to 192.168.0.153:9092 for producing (kafka.producer.SyncProducer) [...] INFO Disconnecting from 192.168.0.153:9092 (kafka.producer.SyncProducer) [...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-3], Starting ... [...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 2, initOffset -1 to broker 3 with fetcherId 0 ... [...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-2], Starting ... [...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 1, initOffset -1 to broker 2 with fetcherId 0 ... [...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-1], Starting ... [...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 0, initOffset -1 to broker 1 with fetcherId 0 ...
And at the end of the output you will see the following messages:
Hello, world! Rock: Nerf Paper. Scissors is fine.
A note when using Kafka with Storm
The maximum parallelism you can have on a KafkaSpout is the number of partitions of the corresponding Kafka topic. The following question-answer thread (I slightly modified the original text for clarification purposes) is from the Storm user mailing list, but supposedly refers to Kafka pre-0.8 and thereby before the replication feature was added:
Question: Suppose the number of Kafka partitions per broker is configured as 1 and the number of hosts is 2. If we set the spout parallelism as 10, then how does Storm handle the difference between the number of Kafka partitions and the number of spout tasks? Since there are only 2 partitions, does every other spout task (greater than first 2) not read the data or do they read the same data?
Answer (by Nathan Marz): The remaining 8 (= 10 - 2) spout tasks wouldn’t read any data from the Kafka topic.
My current understanding is that the number of partitions (i.e. regardless of replicas) is still the limiting factor for the parallelism of a KafkaSpout. Why? Because Kafka is not allowing consumers to read from replicas other than the (replica of the) leader of a partition to simplify concurrent access to data in Kafka.
A note when using Kafka with Hadoop
LinkedIn has published their Kafka->HDFS pipeline named Camus. It is a MapReduce job that does distributed data loads out of Kafka.
Where to go from here
Automated deployment of Kafka clusters:
- puppet-kafka – a Puppet module I wrote to deploy Kafka 0.8+ clusters
The following documents provide plenty of information about Kafka that goes way beyond what I covered in this article: