Michael G. Noll

Applied Research. Big Data. Distributed Systems. Open Source.

Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node

In this article I describe how to install, configure and run a multi-broker Apache Kafka 0.8 (trunk) cluster on a single machine. The final setup consists of one local ZooKeeper instance and three local Kafka brokers. We will test-drive the setup by sending messages to the cluster via a console producer and receive those messages via a console receiver. I will also describe how to build Kafka for Scala 2.9.2, which makes it much easier to integrate Kafka with other Scala-based frameworks and tools that require Scala 2.9 instead of Kafka’s default Scala 2.8.

Update Mar 2014: I have released a Wirbelsturm, a Vagrant and Puppet based tool to perform 1-click local and remote deployments, with a focus on big data related infrastructure such as Apache Kafka and Apache Storm. Thanks to Wirbelsturm you don’t need to follow this tutorial to manually install and configure a Kafka cluster. Use Wirbelsturm to fire up a Kafka cluster, then come back to this tutorial and follow the usage examples. You can find Wirbelsturm on GitHub.
Update Feb 2014: I have released a Puppet module to easily deploy Kafka clusters: puppet-kafka. This module automates most of the manual tasks listed in this article. You can find puppet-kafka on GitHub.

What we want to do

Here is an overview of what we want to do:

  • Build Kafka 0.8-trunk for Scala 2.9.2.
    • I also provide instructions for the default 2.8.0, just in case.
  • Use a single machine for this Kafka setup.
  • Run 1 ZooKeeper instance on that machine.
  • Run 3 Kafka brokers on that machine.
  • Create a Kafka topic called “zerg.hydra” and send/receive messages for that topic via the console. The topic will be configured to use 3 partitions and 2 replicas per partition.

The purpose of this article is not to present a production-ready configuration of a Kafka cluster. However it should get you started with using Kafka as a distributed messaging system in your own infrastructure.

Installing Kafka

Background: Why Kafka and Scala 2.9?

Personally I’d like to use Scala 2.9.2 for Kafka – which is still built for Scala 2.8.0 by default as of today – because many related software packages that are of interest to me (such as Finagle, Kestrel) are based on Scala 2.9. Also, the current versions of many development and build tools (e.g. IDEs, sbt) for Scala require at least version 2.9. If you are working in a similar environment you may want build Kafka for Scala 2.9 just like I did – otherwise you can expect to run into issues such as Scala version conflicts.

Option 1 (preferred): Kafka 0.8-trunk with Scala 2.9.2

Unfortunately the current trunk of Kafka has problems to build against Scala 2.9.2 out of the box. I created a fork of Kafka 0.8-trunk that includes the required fix (a change of one file) in the branch “scala-2.9.2”. The fix ties the Scala version used by Kafka’s shell scripts to 2.9.2 instead of 2.8.0.

The following instructions will use this fork to download, build and install Kafka for Scala 2.9.2:

$ cd $HOME
$ git clone git@github.com:miguno/kafka.git
$ cd kafka
# this branch of includes a patched bin/kafka-run-class.sh for Scala 2.9.2
$ git checkout -b scala-2.9.2 remotes/origin/scala-2.9.2
$ ./sbt update
$ ./sbt "++2.9.2 package"

Option 2: Kafka 0.8-trunk with Scala 2.8.0

If you are fine with Scala 2.8 you need to build and install Kafka as follows.

$ cd $HOME
$ git clone git@github.com:apache/kafka.git
$ cd kafka
$ ./sbt update
$ ./sbt package

Configuring and running Kafka

Unless noted otherwise all commands below assume that you are in the top level directory of your Kafka installation. If you followed the instructions above, this directory is $HOME/kafka/.

Configure your OS

For Kafka 0.8 it is recommended to increase the maximum number of open file handles because due to changes in 0.8 Kafka will keep more file handles open than in 0.7. The exact number depends on your usage patterns, of course, but on the Kafka mailing list the ballpark figure “tens of thousands” was shared:

In Kafka 0.8, we keep the file handles for all segment files open until they are garbage collected. Depending on the size of your cluster, this number can be pretty big. Few 10 K or so.

For instance, to increase the maximum number of open file handles for the user kafka to 98,304 (change kafka to whatever user you are running the Kafka daemons with – this can be your own user account, of course) you must add the following line to /etc/security/limits.conf:

/etc/security/limits.conf
1
kafka    -    nofile    98304

Start ZooKeeper

Kafka ships with a reasonable default ZooKeeper configuration for our simple use case. The following command launches a local ZooKeeper instance.

Start ZooKeeper
1
$ bin/zookeeper-server-start.sh config/zookeeper.properties

By default the ZooKeeper server will listen on *:2181/tcp.

Configure and start the Kafka brokers

We will create 3 Kafka brokers, whose configurations are based on the default config/server.properties. Apart from the settings below the configurations of the brokers are identical.

The first broker:

Create the config file for broker 1
1
$ cp config/server.properties config/server1.properties

Edit config/server1.properties and replace the existing config values as follows:

broker.id=1
port=9092
log.dir=/tmp/kafka-logs-1

The second broker:

Create the config file for broker 2
1
$ cp config/server.properties config/server2.properties

Edit config/server2.properties and replace the existing config values as follows:

broker.id=2
port=9093
log.dir=/tmp/kafka-logs-2

The third broker:

Create the config file for broker 3
1
$ cp config/server.properties config/server3.properties

Edit config/server3.properties and replace the existing config values as follows:

broker.id=3
port=9094
log.dir=/tmp/kafka-logs-3

Now you can start each Kafka broker in a separate console:

Start the first broker in its own terminal session
1
$ env JMX_PORT=9999  bin/kafka-server-start.sh config/server1.properties
Start the second broker in its own terminal session
1
$ env JMX_PORT=10000 bin/kafka-server-start.sh config/server2.properties
Start the third broker in its own terminal session
1
$ env JMX_PORT=10001 bin/kafka-server-start.sh config/server3.properties

Here is a summary of the configured network interfaces and ports that the brokers will listen on:

        Broker 1     Broker 2      Broker 3
----------------------------------------------
Kafka   *:9092/tcp   *:9093/tcp    *:9094/tcp
JMX     *:9999/tcp   *:10000/tcp   *:10001/tcp

Excursus: Topics, partitions and replication in Kafka

In a nutshell Kafka partitions incoming messages for a topic, and assigns those partitions to the available Kafka brokers. The number of partitions is configurable and can be set per-topic and per-broker.

First the stream [of messages] is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order.

A new feature of Kafka 0.8 is that those partitions will be now be replicated across Kafka brokers to make the cluster more resilient against host failures:

Partitions are now replicated. Previously the topic would remain available in the case of server failure, but individual partitions within that topic could disappear when the server hosting them stopped. If a broker failed permanently any unconsumed data it hosted would be lost. Starting with 0.8 all partitions have a replication factor and we get the prior behavior as the special case where replication factor = 1. Replicas have a notion of committed messages and guarantee that committed messages won’t be lost as long as at least one replica survives. Replica are byte-for-byte identical across replicas.

Producer and consumer are replication aware. When running in sync mode, by default, the producer send() request blocks until the messages sent is committed to the active replicas. As a result the sender can depend on the guarantee that a message sent will not be lost. Latency sensitive producers have the option to tune this to block only on the write to the leader broker or to run completely async if they are willing to forsake this guarantee. The consumer will only see messages that have been committed.

The following diagram illustrates the relationship between topics, partitions and replicas.

The relationship between topics, partitions and replicas in Kafka.

Logically this relationship is very similar to how Hadoop manages blocks and replication in HDFS.

When a topic is created in Kafka 0.8, Kafka determines how each replica of a partition is mapped to a broker. In general Kafka tries to spread the replicas across all brokers (source). Messages are first sent to the first replica of a partition (i.e. to the current “leader” broker of that partition) before they are replicated to the remaining brokers. Message producers may choose from different strategies for sending messages (e.g. synchronous mode, asynchronous mode). Producers discover the available brokers in a cluster and the number of partitions on each, by registering watchers in ZooKeeper.

If you wonder how to configure the number of partitions per topic/broker, here’s feedback from LinkedIn developers:

At LinkedIn, some of the high volume topics are configured with more than 1 partition per broker. Having more partitions increases I/O parallelism for writes and also increases the degree of parallelism for consumers (since partition is the unit for distributing data to consumers). On the other hand, more partitions adds some overhead: (a) there will be more files and thus more open file handlers; (b) there are more offsets to be checkpointed by consumers which can increase the load of ZooKeeper. So, you want to balance these tradeoffs.

Create a Kafka topic

In Kafka 0.8, there are 2 ways of creating a new topic:

  1. Turn on auto.create.topics.enable option on the broker. When the broker receives the first message for a new topic, it creates that topic with num.partitions and default.replication.factor.
  2. Use the admin command bin/kafka-topics.sh.
    • Note: In Kafka 0.8.0 release use the admin command bin/kafka-create-topic.sh. kafka-topics.sh was removed in the release version and split into kafka-create-topic.sh and kafka-list-topic.sh.

We will use the latter approach. The following command creates a new topic “zerg.hydra”. The topic is configured to use 3 partitions and a replication factor of 2. Note that in a production setting we’d rather set the replication factor to 3, but a value of 2 is better for illustrative purposes (i.e. we intentionally use different values for the number of partitions and replications to better see the effects of each setting).

Create the “zerg.hydra” topic
1
2
$ bin/kafka-topics.sh --zookeeper localhost:2181 \
    --create --topic zerg.hydra --partitions 3 --replication-factor 2
Note: For Kafka 0.8.0 release you must use the command: $ bin/kafka-create-topic.sh –zookeeper localhost:2181 –partition 3 –replica 2 –topic zerg.hydra

This has the following effects:

  • Kafka will create 3 logical partitions for the topic.
  • Kafka will create a total of two replicas (copies) per partition. For each partition it will pick two brokers that will host those replicas. For each partition Kafka will elect a “leader” broker.

Ask Kafka for a list of available topics. The list should include the new zerg.hydra topic:

List the available topics in the Kafka cluster
1
2
3
4
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
<snipp>
zerg.hydra
</snipp>
Note: For Kafka 0.8.0 release you must use the command: $ bin/kafka-list-topic.sh –zookeeper localhost:2181

You can also inspect the configuration of the topic as well as the currently assigned brokers per partition and replica. Because a broker can only host a single replica per partition, Kafka has opted to use a broker’s ID also as the corresponding replica’s ID.

List the available topics in the Kafka cluster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic zerg.hydra
<snipp>
zerg.hydra
    configs:
    partitions: 3
        partition 0
        leader: 1 (192.168.0.153:9092)
        replicas: 1 (192.168.0.153:9092), 2 (192.168.0.153:9093)
        isr: 1 (192.168.0.153:9092), 2 (192.168.0.153:9093)
        partition 1
        leader: 2 (192.168.0.153:9093)
        replicas: 2 (192.168.0.153:9093), 3 (192.168.0.153:9094)
        isr: 2 (192.168.0.153:9093), 3 (192.168.0.153:9094)
        partition 2
        leader: 3 (192.168.0.153:9094)
        replicas: 3 (192.168.0.153:9094), 1 (192.168.0.153:9092)
        isr: 3 (192.168.0.153:9094), 1 (192.168.0.153:9092)
<snipp>
Note: For Kafka 0.8.0 release you must use the command: $ bin/kafka-list-topic.sh –zookeeper localhost:2181 –topic zerg.hydra

In this example output the first broker (with broker.id = 1) happens to be the designated leader for partition 0 at the moment. Similarly, the second and third brokers are the leaders for partitions 1 and 2, respectively.

The following diagram illustrates the setup (and also includes the producer and consumer that we will run shortly).

Overview of our Kafka setup including the current state of the partitions and replicas. The colored boxes represent replicas of partitions. “P0 R1” denotes the replica with ID 1 for partition 0. A bold box frame means that the corresponding broker is the leader for the given partition.

You can also inspect the local filesystem to see how the --describe output above matches actual files. By default Kafka persists topics as “log files” (Kafka terminology) in the log.dir directory.

Local files that back up the partitions of Kafka topics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ tree /tmp/kafka-logs-{1,2,3}
/tmp/kafka-logs-1                   # first broker (broker.id = 1)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-2                   # second broker (broker.id = 2)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-3                   # third broker (broker.id = 3)
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

6 directories, 15 files

Start a producer

Start a console producer in sync mode:

Start a console producer in sync mode
1
2
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync \
    --topic zerg.hydra

Example producer output:

[...] INFO Verifying properties (kafka.utils.VerifiableProperties)
[...] INFO Property broker.list is overridden to localhost:9092,localhost:9093,localhost:9094 (...)
[...] INFO Property compression.codec is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property key.serializer.class is overridden to kafka.serializer.StringEncoder (...)
[...] INFO Property producer.type is overridden to sync (kafka.utils.VerifiableProperties)
[...] INFO Property queue.buffering.max.messages is overridden to 10000 (...)
[...] INFO Property queue.buffering.max.ms is overridden to 1000 (kafka.utils.VerifiableProperties)
[...] INFO Property queue.enqueue.timeout.ms is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property request.required.acks is overridden to 0 (kafka.utils.VerifiableProperties)
[...] INFO Property request.timeout.ms is overridden to 1500 (kafka.utils.VerifiableProperties)
[...] INFO Property send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[...] INFO Property serializer.class is overridden to kafka.serializer.StringEncoder (...)

You can now enter new messages, one per line. Here we enter two messages “Hello, world!” and “Rock: Nerf Paper. Scissors is fine.”:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

After the messages are produced, you should see the data being replicated to the three log directories for each of the broker instances, i.e. /tmp/kafka-logs-{1,2,3}/zerg.hydra-*/.

Start a consumer

Start a console consumer that reads messages in zerg.hydra from the beginning (in a production setting you would usually NOT want to add the --from-beginning option):

Start a console consumer
1
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

The consumer will see a new message whenever you enter a message in the producer above.

Example consumer output:

<snipp>
[...] INFO [console-consumer-28434_panama.local-1363174829799-954ed29e], Connecting to zookeeper instance at localhost:2181 ...
[...] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[...] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT ...
[...] INFO Client environment:host.name=192.168.0.153 (org.apache.zookeeper.ZooKeeper)
<snipp>
[...] INFO Fetching metadata with correlation id 0 for 1 topic(s) Set(zerg.hydra) (kafka.client.ClientUtils$)
[...] INFO Connected to 192.168.0.153:9092 for producing (kafka.producer.SyncProducer)
[...] INFO Disconnecting from 192.168.0.153:9092 (kafka.producer.SyncProducer)
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-3], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 2, initOffset -1 to broker 3 with fetcherId 0 ...
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-2], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 1, initOffset -1 to broker 2 with fetcherId 0 ...
[...] INFO [ConsumerFetcherThread-console-consumer-28434_panama.local-1363174829799-954ed29e-0-1], Starting ...
[...] INFO [ConsumerFetcherManager-1363174829916] adding fetcher on topic zerg.hydra, partion 0, initOffset -1 to broker 1 with fetcherId 0 ...

And at the end of the output you will see the following messages:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

That’s it!

A note when using Kafka with Storm

The maximum parallelism you can have on a KafkaSpout is the number of partitions of the corresponding Kafka topic. The following question-answer thread (I slightly modified the original text for clarification purposes) is from the Storm user mailing list, but supposedly refers to Kafka pre-0.8 and thereby before the replication feature was added:

Question: Suppose the number of Kafka partitions per broker is configured as 1 and the number of hosts is 2. If we set the spout parallelism as 10, then how does Storm handle the difference between the number of Kafka partitions and the number of spout tasks? Since there are only 2 partitions, does every other spout task (greater than first 2) not read the data or do they read the same data?

Answer (by Nathan Marz): The remaining 8 (= 10 - 2) spout tasks wouldn’t read any data from the Kafka topic.

My current understanding is that the number of partitions (i.e. regardless of replicas) is still the limiting factor for the parallelism of a KafkaSpout. Why? Because Kafka is not allowing consumers to read from replicas other than the (replica of the) leader of a partition to simplify concurrent access to data in Kafka.

A note when using Kafka with Hadoop

LinkedIn has published their Kafka->HDFS pipeline named Camus. It is a MapReduce job that does distributed data loads out of Kafka.

Where to go from here

Automated deployment of Kafka clusters:

  • puppet-kafka – a Puppet module I wrote to deploy Kafka 0.8+ clusters

The following documents provide plenty of information about Kafka that goes way beyond what I covered in this article:

Comments