Michael G. Noll

Applied Research. Big Data. Distributed Systems. Open Source.

Running a Multi-Node Storm Cluster

In this tutorial I will describe in detail how to set up a distributed, multi-node Storm cluster on RHEL 6. We will install and configure both Storm and ZooKeeper and run their respective daemons under process supervision, similarly to how you would operate them in a production environment. I will show how to run an example topology in the newly built cluster, and conclude with an operational FAQ that answers the most common questions of managing a Storm cluster.

Update Mar 2014: I have released a Wirbelsturm, a Vagrant and Puppet based tool to perform 1-click local and remote deployments, with a focus on big data related infrastructure such as Apache Kafka and Apache Storm. Thanks to Wirbelsturm you don’t need to follow this tutorial to manually install and configure a Storm cluster. Use Wirbelsturm to fire up a Storm cluster, then come back to this tutorial and follow the usage examples. You can find Wirbelsturm on GitHub.
Update Feb 2014: I have released a Puppet module to easily deploy Storm clusters: puppet-storm. This module automates most of the manual tasks listed in this article. You can find puppet-storm on GitHub.

What we want to do

Here is an outline of what we want to do:

  • Install and configure a distributed, multi-node Storm cluster.
  • Use a single-node ZooKeeper cluster for the Storm cluster.
  • Use one master node for running Storm’s Nimbus daemon as well as the web-based Storm UI.
  • Use three slave nodes for running Storm’s Supervisor daemons and the actual worker processes that execute your Storm application code. As you will see the actual number of slave nodes does not really matter (you can use this tutorial to set up a Storm cluster of fifty nodes if you want to).
  • Run an example Storm topology in our new Storm cluster.

Software versions

The instructions described in this article have been tested with:

  • RedHat Enterprise Linux RHEL 6.3 (Santiago)
  • Storm 0.8.2 release

The same instructions should apply to CentOS 6, too.

Overview of cluster setup

  • zkserver1: The only ZooKeeper server in our single-node ZooKeeper cluster.
  • nimbus1: The master node in the Storm cluster, which will run the Nimbus daemon and, optionally, the Storm UI.
  • slave{1,2,3,...,n}: The slave nodes in the Storm cluster, which will run the Supervisor daemons including the actual worker processes that execute your Storm application code.

Thanks to Storm’s design – and notably its use of ZooKeeper for coordination purposes – it actually does not matter whether you plan to run just 1 slave node or 100 nodes. The instructions in this tutorial will work regardless of the number of slave nodes. I happen to use 3 slaves for illustration purposes but that’s an arbitrary number. The only thing you need to do is to install, configure and start each of your slave nodes according to the instructions, and Storm will automatically take care of integrating those into the cluster and making good use of them. That’s pretty cool.

Figure 1: Storm cluster setup

Before we start with setting up our Storm cluster let’s use the next section to briefly introduce the various components of Storm.

Background: Storm architecture

Storm distinguishes between two kinds of nodes in a Storm cluster:

  1. The master node. This nodes runs the so-called Nimbus daemon that is responsible for assigning tasks to the worker nodes, monitoring the cluster for failures, and distributing (your) code around the cluster. If you are familiar with Hadoop, you can think of the Nimbus as the JobTracker of Storm.
  2. The worker nodes. Each worker node runs an instance of the so-called Supervisor daemon. This daemon listens for work assigned (by Nimbus) to the node it runs on and starts/stops the worker processes as necessary. Each worker process executes a subset of a topology. If you are familiar with Hadoop, you can think of the Supervisors as the TaskTrackers of Storm.

In addition to its own components Storm relies on a ZooKeeper cluster (consisting of one or more ZooKeeper servers) to perform the coordination between Nimbus and the Supervisors. Apart from using ZooKeeper for coordination purposes the Nimbus and Supervisors also store all their state in Zookeeper or on local disk (with regard to the latter see the section on storm.local.dir below).

Please refer to the official Storm tutorial and my blog post Understanding the Parallelism of a Storm Topology for further details.

Networking setup

Make sure all the nodes can reach each other over the network and that DNS is properly configured. For the sake of completeness here is a basic /etc/hosts file that you can use. However there is nothing special here – in the remainder of this article we will refer to the various machines only by their hostname (e.g. nimbus1).

A basic hosts file (for ALL machines)
1
2
3
4
5
192.168.10.1    slave1.foo.com     slave1
192.168.10.2    slave2.foo.com     slave2
192.168.10.3    slave3.foo.com     slave3
192.168.10.200  nimbus1.foo.com    nimbus1
192.168.10.250  zkserver1.foo.com  zkserver1

Installing ZooKeeper

Storm and ZooKeeper

Storm uses Zookeeper for coordinating the various “actors” (e.g. Nimbus, Supervisor daemons) within a Storm cluster. For small installations or installations that are not mission critical it is sufficient to run a single-node ZooKeeper cluster.

Please take a look at section Set up a Zookeeper cluster in the Storm documentation for further details.

Install ZooKeeper

If you happen to use a Hadoop distribution such as Cloudera CDH4 you can use the respective management tool (Cloudera Manager in the case of CDH4) to easily install a single-node or a multi-node ZooKeeper cluster. If you don’t want to go down the management tool route you can install ZooKeeper manually. You can either follow the official ZooKeeper instructions for deploying a ZK cluster, which will require you to install ZK from a release tarball, or you can fall back to pre-built RPMs for RHEL6/CentOS6.

For the context of this article we will pick the latter approach (RPMs). We will rely on Cloudera’s yum repositories to provide us with ZooKeeper RPMs. The first step is to add Cloudera’s CDH4 yum repositories to your machines’ yum configuration (make sure you pick the correct OS version). Please follow the instructions in the previous link to add the Cloudera yum repository. Once you are done you can verify the previous step with the following command:

1
2
3
# Does your machine have the Cloudera CDH4 repository configured?
$ yum repolist | grep cloudera-cdh4
cloudera-cdh4                Cloudera's Distribution for Hadoop, Versio    77+18

Now you can install ZooKeeper as follows:

1
$ sudo yum install zookeeper zookeeper-server

Configure ZooKeeper

For the sake of brevity I will not describe the ZooKeeper configuration in full detail as the official documentation already provides all the required information in the ZooKeeper Getting Started Guide.

A basic ZooKeeper configuration zoo.cfg looks as follows:

/etc/zookeeper/conf/zoo.cfg
1
2
3
4
5
6
7
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

# Enable regular purging of old data and transaction logs every 24 hours
autopurge.purgeInterval=24
autopurge.snapRetainCount=5

The last two autopurge.* settings are very important for production systems. They instruct ZooKeeper to regularly remove (old) data and transaction logs. The default ZooKeeper configuration does not do this on its own, and if you do not set up regular purging ZooKeeper will quickly run out of disk space.

When using older ZooKeeper versions: ZooKeeper versions < 3.4.x do not support auto-purging. But you can achieve the same effect by manually triggering purging via e.g. a cron job. See ZooKeeper Maintenance for details.
When using the same ZooKeeper cluster to power multiple Storm clusters: Storm uses a znode tree rooted at /storm in ZooKeeper by default. Hence you may want to use different roots when running multiple Storm clusters against the same ZooKeeper cluster. See storm.zookeeper.root, transactional.zookeeper.root and dev.zookeeper.path in conf/defaults.yaml.

Start ZooKeeper

Start the ZooKeeper daemon as follows:

1
$ zkServer.sh start

In production environments it is strongly recommended to run the ZooKeeper daemon under process supervision. See section Running ZooKeeper in production mode below for details.

Verifying ZooKeeper operation

You can use a tool such as telnet or netcat to connect to ZK and interactively run so-called four letter commands such as conf (print details of ZK configuration) and stat (list brief details of server and connected clients). This is a simple way to perform basic diagnostics of a ZooKeeper cluster/server.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Is the ZK running in a non-error state ("aRe yoU OK")?
$ echo ruok | nc zkserver1 2181
imok   # I'm ok.

# Show summary stats
$ echo stat | nc zkserver1 2181
Zookeeper version: 3.4.5-cdh4.2.1--1, built on 04/22/2013 16:45 GMT
Clients:
/192.168.10.2:40623[1](queued=0,recved=9422,sent=9422)
/192.168.10.2:40666[1](queued=0,recved=9415,sent=9415)
/192.168.10.3:40639[1](queued=0,recved=9416,sent=9416)
[...snipp...]

Latency min/avg/max: 0/0/17
Received: 424358
Sent: 424357
Connections: 10
Outstanding: 0
Zxid: 0x222
Mode: standalone
Node count: 5

Running ZooKeeper in production mode

ZooKeeper and process supervision

It is strongly recommended to run ZooKeeper under process supervision because Zookeeper is fail-fast and will exit the process if it encounters any error case. On the plus side a ZooKeeper cluster is “self healing”: once restarted the failed server will automatically rejoin the cluster without any manual intervention.

The ZooKeeper documentation covers process supervision in further detail in the section Supervision. One option of course is to put ZooKeeper under process supervision via supervisord (not to be confused with Storm’s Supervisor daemon) just as we will do for Storm’s own daemons further down below.

Note that when running ZooKeeper under supervision you may want to use zkServer.sh start-foreground instead of zkServer.sh start:

When running zookeeper under supervision, use “zkServer.sh start- foreground” instead of “zkServer.sh start”. The regular start command forks and exits too many times for supervisord to track the application state successfully.

Managing ZooKeeper storage

You should double-check the ZooKeeper documentation for making the previously described basic ZooKeeper setup ready for long-running production systems. Notably, take a look at ZooKeeper Maintenance and ZooKeeper Configuration.

Installing Storm

Install dependencies

The following software dependencies must be installed on all Storm nodes, i.e. master and slave nodes.

Java 6

To make things simple we will install OpenJDK 6 in this tutorial.

1
$ sudo yum install java-1.6.0-openjdk

If you prefer Oracle/Sun JDK 6 (which I personally do) you will need to download and install the Oracle JDK 6 RPM files manually. Depending on your work environment you might also have an internal yum repository configured through which you can install Oracle JDK 6 directly via yum as shown above for OpenJDK 6.

ZeroMQ 2.1.7

ZeroMQ is “a library which extends the standard socket interfaces with features traditionally provided by specialised messaging middleware products”. Storm relies on ZeroMQ primarily for task-to-task communication in running Storm topologies.

It is important to install exactly the version 2.1.7 of ZeroMQ. Unfortunately an official RPM of ZeroMQ 2.1.7 is not available for RHEL 6. This leaves you with two options:

  1. Use an RPM created by a third party.
  2. Build and install ZeroMQ from source

The third (and preferred) option is of course to create your own RPM, but doing so is beyond the scope of this tutorial.

If you are running into any problems during the installation of ZeroMQ, please refer to Installing native dependencies in the Storm documentation and to bin/install_zmq.sh in the Storm release package.

Option 1: Install ZeroMQ from third-party RPM

A pre-built RPM of zeromq for RHEL 6 is available from saltstack’s GitHub repository:

Download the file above and install via:

1
2
3
$ cd /tmp
$ wget https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm
$ sudo yum install zeromq-2.1.7-1.el6.x86_64.rpm

Option 2: Install ZeroMQ from source

Prior to the actual installation steps for ZeroMQ as shown below I also needed to install the following packages on my RHEL 6 machines in order to be able to compile ZeroMQ from source:

1
$ sudo yum install gcc gcc-c++ libuuid-devel

The following instructions build ZeroMQ from source and install it under /usr/local/:

1
2
3
4
5
6
7
$ cd /tmp
$ wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
$ tar -xzf zeromq-2.1.7.tar.gz
$ cd zeromq-2.1.7
$ ./configure
$ make
$ sudo make install

JZMQ

JZMQ are the Java bindings for ZeroMQ.

Unfortunately an official RPM of JZMQ is not available for RHEL 6, and I could not find any pre-built RPM created by a third party. For your and my convenience I decided to built my own RPM of JZMQ for RHEL 6 (64bit) via Jordan Sissel’s awesome packaging tool fpm.

Feel free to try installing jzmq-2.1.0.el6.x86_64.rpm instead of installing JZMQ from source as you would usually do, and report back whether the RPM worked for you.

Install JZMQ via RPM as follows:

1
2
$ wget https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm
$ sudo yum install jzmq-2.1.0.el6.x86_64.rpm

See also Installing native dependencies in the Storm documentation and bin/install_zmq.sh in the Storm release package.

Off-topic: How I created the JZMQ RPM

This section is purely informative and FYI. You do not need to follow these steps if you are happy with the RPM I already created.

Prior to the actual build steps for JZMQ as shown below I first needed to install the following packages on my RHEL 6 machine in order to be able to compile JZMQ from source and then subsequently package it as RPM via fpm:

1
2
3
$ sudo yum install libtool gcc-c++ java-1.6.0-openjdk-devel
$ wget https://github.com/downloads/saltstack/salt/zeromq-devel-2.1.7-1.el6.x86_64.rpm
$ sudo yum install zeromq-devel-2.1.7-1.el6.x86_64.rpm

Here are the build steps I used to create the JZMQ RPM for Storm:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ git clone https://github.com/nathanmarz/jzmq.git
$ cd jzmq
$ ./autogen.sh
$ env JAVA_HOME=/usr/java/default ./configure --prefix=/usr
$ env JAVA_HOME=/usr/java/default make install DESTDIR=/tmp/installdir
$ fpm -s dir -t rpm -a all -n jzmq -v 2.1.0 -C /tmp/installdir/ \
    --maintainer "<michael@michael-noll.com>" \
    --vendor github.com/nathanmarz/jzmq \
    --url https://github.com/nathanmarz/jzmq.git \
    --description "Frozen version of JZMQ that is tested to work with Storm.\nJZMQ is [...snip...]" \
    -p jzmq-VERSION.el6.ARCH.rpm \
    -d "zeromq == 2.1.7" \
    -a "x86_64" \
    usr

# => jzmq-2.1.0.el6.x86_64.rpm

Some information about the environment I used to build JZMQ:

1
2
3
4
5
6
7
8
9
10
11
$ uname -a
Linux buildhost 2.6.32-279.19.1.el6.x86_64 #1 SMP Sat Nov 24 14:35:28 EST 2012 x86_64 x86_64 x86_64 GNU/Linux

$ cat /etc/redhat-release
Red Hat Enterprise Linux Server release 6.3 (Santiago)

$ set | grep JAVA_HOME
JAVA_HOME=/usr/java/default

$ autoconf --version | head -1
autoconf (GNU Autoconf) 2.63

Python 2.6.6

Python 2.6.6 should be installed by default on RHEL 6.3. If it isn’t for some reason, you have a seriously broken machine – don’t consider continuing before you haven’t reinstalled your machine from scratch. :-)

Unzip

The decompression tool unzip should be installed by default on RHEL 6.3. If it isn’t for some reason, run the following command:

1
$ sudo yum install unzip

Unzip is only required to decompress a Storm release package, which comes as a .zip file.

Create a dedicated Storm system user

The following command will create a user account storm with group storm and uid/gid of 53001. The home directory of the user is set to /app/home/storm/.

1
2
3
4
5
$ sudo groupadd -g 53001 storm
$ sudo mkdir -p /app/home
$ sudo useradd -u 53001 -g 53001 -d /app/home/storm -s /bin/bash storm -c "Storm service account"
$ sudo chmod 700 /app/home/storm
$ sudo chage -I -1 -E -1 -m -1 -M -1 -W -1 -E -1 storm

Download and install a Storm release

You can download a release of Storm from the Storm Downloads section.

Here is a direct link to Storm version 0.8.2:

Yes, the direct link takes you to dl.dropbox.com. I assume this is because GitHub disabled the feature to upload files and make them available for download in the Downloads Tab (see their GoodBye, Uploads blog post for details). So a new place to host the release files had to be found.

1
2
3
4
5
6
$ cd /tmp
$ wget https://dl.dropbox.com/u/133901206/storm-0.8.2.zip
$ cd /usr/local
$ sudo unzip /tmp/storm-0.8.2.zip
$ sudo chown -R storm:storm storm-0.8.2
$ sudo ln -s storm-0.8.2 storm

Now the Storm installation is available at /usr/local/storm/. I will refer to this directory as $STORM_HOME.

Create local working directory for Storm

Create a local directory in which the Nimbus and Supervisor daemons can store (small) amounts of state such as jar files and configurations. We will use /app/storm/. This directory will be configured to be used as storm.local.dir later on.

1
2
3
$ sudo mkdir -p /app/storm
$ sudo chown -R storm:storm /app/storm
$ sudo chmod 750 /app/storm

Once we have started the various Storm daemons later on you will see files being created under storm.local.dir by the Nimbus and Supervisor daemons. Here is an example:

1
2
3
4
5
# on master node (nimbus1)
$ sudo tree /app/storm
/app/storm
+--- nimbus
     +--- inbox
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# on slave nodes
$ sudo tree /app/storm
/app/storm
+--- supervisor
     +--- isupervisor
     |    +--- 1368698566625
     |    +--- 1368698566625.version
     +--- localstate
     |    +--- 1368698594679
     |    +--- 1368698594679.version
     |    +--- 1368698597685
     |    +--- 1368698597685.version
     |    +--- 1368698597689
     |    +--- 1368698597689.version
     |    +--- 1368698597696
     |    +--- 1368698597696.version
     +--- tmp

This local directoy has one particular notable role: If a Supervisor daemon crashes or is deliberately killed, then a restarted Supervisor daemon will pick up where the old process stopped as long as the new process reads from the same local directory.

Configure Storm

Storm is configured through the file conf/storm.yaml, which overrides the default configuration values in defaults.yaml.

Where is defaults.yaml? In the GitHub repository of Storm the default configuration file is available at conf/defaults.yaml. However defaults.yaml is not included in a release package of Storm (as of v0.8.2).

Open conf/storm.yaml in a text editor and change/set the following configuration parameters:

conf/storm.yaml (on each node, master and slaves)
1
2
3
4
5
6
7
8
9
10
11
12
storm.zookeeper.servers:
    - "zkserver1"

nimbus.host: "nimbus1"
nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"

ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

storm.local.dir: "/app/storm"
What about my slave nodes? As long as you are happy with Storm’s default values (most likely you are at the beginning) you do not need to configure or “add” your slave nodes in any special way. The Supervisor daemons running on the slave nodes will register themselves with ZooKeeper during startup, and Nimbus discovers those automatically via ZooKeeper.

See section Fill in mandatory configurations into storm.yaml in the document Setting up a Storm cluster for more information, notably about the configuration parameters:

  • java.library.path – to tell Storm where to find the native libraries it uses (ZeroMQ and JZMQ)
  • supervisor.slots.ports – to configure how many workers processes run on the machine with this configuration. For instance, you could use only 1 port = 1 worker process for a small Amazon EC2 instance, and more ports = more workers for large EC2 instances. Here you would need to give each EC2 instance type its appropriate conf/storm.yaml with the proper value for supervisor.slots.ports. By default Storm uses 4 ports (6700-6703/tcp) = 4 workers processes per machine. The Storm UI (see below) also calls those workers “slots”. Note that these worker ports will only be in use when the workers are actually running; for instance, if the cluster is idle then no worker processes will be running at all.

Test-drive the Storm cluster

Before we configure process supervision for our cluster’s Storm daemons to make our setup production-ready we will first take our Storm cluster on a manual test drive. This allows us to verify that every node is configured correctly. If we run into any issues we can inspect the Storm log files to identify and fix the problem (see section Troubleshooting below).

Start the Nimbus daemon manually

Start the Nimbus daemon by running the following command on the master node nimbus1:

1
2
3
4
# On machine nimbus1
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm nimbus

Example output:

1
2
$ bin/storm nimbus
Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/storm-0.8.2 [...snip...] backtype.storm.daemon.nimbus

By default the Nimbus daemon will listen on port 6627, which can be configured via the parameter nimbus.thrift.port.

If you do not see any error messages and everything looks ok you can stop the Nimbus daemon again by either:

  • Entering Ctrl-C in the terminal window where you started the daemon, or
  • Sending a SIGKILL to the Nimbus process (kill -9 <processid>)

Start the Storm UI daemon manually

Storm ships with a web-based user interface that you can optionally launch on the master node that runs the Nimbus daemon. The Storm UI provides a basic overview of the cluster state by showing cluster-level and topology-level diagnostics.

Start the UI daemon by running the following command on the master node nimbus1:

1
2
3
4
# On machine nimbus1
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm ui

Again, you must run this daemon under supervision in production environments as we will describe later.

By default the UI listens on port 8080/tcp. You can access it via the URL:

http://nimbus1:8080/

Here is a screenshot of the Storm UI as of version 0.8.2:

Figure 2: Screenshot of Storm web UI

You can change the UI port by setting the ui.port configuration parameter in conf/storm.yaml:

conf/storm.yaml (on master)
1
2
3
### ui.* configs are for the master
ui.port: 8080
ui.childopts: "-Xmx768m"

If you do not see any error messages and everything looks ok you can stop the Storm daemon again by either:

  • Entering Ctrl-C in the terminal window where you started the daemon, or
  • Sending a SIGKILL to the Nimbus process (kill -9 <processid>)

Start the Supervisor daemons manually

Run the following command on each slave node in the Storm cluster to start their respective Supervisor daemons. Upon startup the Supervisors will register themselves with ZooKeeper, where Nimbus can find them.

1
2
3
4
# On each slave
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm supervisor

Example output:

1
2
$ bin/storm supervisor
Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/storm-0.8.2 [...snip...] backtype.storm.daemon.supervisor

If you do not see any error messages and everything looks ok you can stop the Supervisor daemons again by either:

  • Entering Ctrl-C in the respective terminal window where you started the daemon, or
  • Sending a SIGKILL to the respective Supervisor process (kill -9 <processid>)

Storm log files and troubleshooting

By default Storm will write log messages of its daemons to:

  • Nimbus: $STORM_HOME/logs/nimbus.log
  • Supervisor: $STORM_HOME/logs/supervisor.log
  • UI: $STORM_HOME/logs/ui.log

The relevant configuration parameter is -Dlogfile.name=... in $STORM_HOME/bin/storm for each daemon.

Note: Even if you set logfile.name to a seemingly absolute path such as /tmp/nimbus.log Storm will still place the file under $STORM_HOME/logs/ (here: $STORM_HOME/logs/tmp/nimbus.log).

These log files are the best source of information when performing troubleshooting. Even if everything is working correctly I’d still recommended to take a look at them to familiarize yourself with the kind of information available there.

Running Storm daemons under supervision

For a production environment we do not want to start the Storm daemons manually – doing so is even strongly discouraged:

It is critical that you run each of these daemons under supervision. Storm is a fail-fast system which means the processes will halt whenever an unexpected error is encountered. Storm is designed so that it can safely halt at any point and recover correctly when the process is restarted. This is why Storm keeps no state in-process – if Nimbus or the Supervisors restart, the running topologies are unaffected.

There are various tools available to supervise processes, for instance supervisord, monit, daemontools and runit. In this tutorial we will be using supervisord because a) an RPM for RHEL 6 is readily available via EPEL (whereas daemontools and runit are not) and b) supervisord is very simple to setup and run (whereas we would need to write init scripts that create PID files – which Storm does not create itself – when using monit because that tool requires PID files in order to work).

What is supervisord?

This section is not about Storm’s Supervisor daemon but about the process supervisor of the same name.

Supervisor is a client/server system that allows its users to monitor and control a number of processes.

The following sections explain how to install Supervisor 2.x for RHEL 6 from EPEL. Unfortunately Supervisor 3.x is not yet available out of the box, so we have to stick to the older version (which will require us to perform some workarounds as you will see later).

Install RHEL EPEL repository

First we must add the EPEL repository for RHEL 6 so that we can subsequently install supervisord via yum.

1
2
3
$ cd /tmp
$ wget http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
$ sudo rpm -Uhv epel-release-6-8.noarch.rpm

Please refer to Installing RHEL EPEL Repo on Centos 5.x or 6.x for more information.

Important note: DO NOT use the ZeroMQ package in EPEL (version 2.2.0 as of this writing) to install ZeroMQ. Storm requires ZeroMQ 2.1.7, which is not available in EPEL.

Install supervisord

1
2
3
4
$ sudo yum install supervisor
$ sudo chkconfig supervisord on
# Recommended: secure supervisord configuration file (may contain user credentials)
$ sudo chmod 600 /etc/supervisord.conf

Create log directories for Storm

We create a directory under /var/log to which output of Storm daemons captured by supervisord can be written to. That said, most (all of?) Storm log messages will be stored in Storm’s own log files under $STORM_HOME/logs/.

1
2
$ sudo mkdir -p /var/log/storm
$ sudo chown -R storm:storm /var/log/storm

Configure supervisord for Storm

Two notes before we start:

  • This section uses supervisord 2.x configuration parameters. For instance, supervisord 2.x uses logfile whereas this parameter was superseded in supervisord 3.x by stdout_logfile.
  • For Cloudera CDH4 users: Apparently Cloudera Manager runs its own supervisord at port 9001/tcp. Make sure to configure “your” supervisord instance appropriately if you decide to use the HTTP interface (whose port defaults to 9001/tcp).

Master node: Nimbus and UI daemons

Add the following lines to /etc/supervisord.conf on the master node nimbus1:

[program:storm-nimbus]
command=/usr/local/storm/bin/storm nimbus
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/nimbus.out
logfile_maxbytes=20MB
logfile_backups=10

[program:storm-ui]
command=/usr/local/storm/bin/storm ui
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/ui.out
logfile_maxbytes=20MB
logfile_backups=10

Slave nodes: Supervisor daemons

Add the following lines to /etc/supervisord.conf on each slave node:

[program:storm-supervisor]
command=/usr/local/storm/bin/storm supervisor
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/supervisor.out
logfile_maxbytes=20MB
logfile_backups=10

Start supervisord including Storm daemons

Before proceeding please make sure that any previously (manually) started Storm daemons have been stopped.

Starting supervisord will read the configuration /etc/supervisord.conf and also start any commands defined therein that have autostart set to true. In our case, starting supervisord will start the Nimbus daemon and the Storm UI daemon on the master node nimbus1, and the (Storm) Supervisor daemons on the slave nodes.

Run the following command on each node, master and slaves:

1
$ sudo service supervisord start

You can verify whether the Storm daemons have been started successfully with supervisorctl:

1
2
3
4
5
6
7
8
# On master node (nimbus1)
$ sudo supervisorctl status
storm-nimbus     RUNNING    pid 17563, uptime 0:03:12
storm-ui         RUNNING    pid 17558, uptime 0:03:12

# On slave nodes
$ sudo supervisorctl status
storm-supervisor RUNNING    pid 17561, uptime 0:03:12
If you happen to run into error: <class ‘socket.error’>, [Errno 2] No such file or directory when executing supervisorctl you must ensure that “supervisord“ is actually running.

You can also inspect the supervisord log file at /var/log/supervisor/supervisord.log for any issues you might encounter. For example, here is a snippet from the master node nimbus1:

INFO supervisord started with pid 20490
INFO spawned: 'storm-ui' with pid 20492
INFO spawned: 'storm-nimbus' with pid 20497
INFO success: storm-ui entered RUNNING state, process has stayed up for > than 10 seconds (startsecs)
INFO success: storm-nimbus entered RUNNING state, process has stayed up for > than 10 seconds (startsecs)

Running an example Storm topology

Prerequisites

In the following sections we will execute the storm command quite a bit. This command requires access to a working Storm configuration. For this reason you must provide each client machine that you intend to use for controlling your Storm cluster – including submitting topologies – with a proper Storm configuration. Some people also call those client machines “gateway machines” as they act like a bridge between your Storm cluster and other infrastructure services.

The easiest way is to set up the shell environment of user accounts on client machines is to copy the cluster’s conf/storm.yaml file to ~/.storm/storm.yaml.

Set up shell environment of user accounts on client/gateway machines
1
2
3
4
5
# When storm.yaml is available locally
$ cp /usr/local/storm/conf/storm.yaml ~/.storm/storm.yaml

# Otherwise retrieve the master node's config file
$ scp nimbus1:/usr/local/storm/conf/storm.yaml ~/.storm/storm.yaml

Downloading and building storm-starter

We will use the storm-starter example topologies to run a first Storm topology in our cluster.

1
2
3
4
5
6
$ cd /tmp
$ git clone git@github.com:nathanmarz/storm-starter.git
$ cd storm-starter
$ lein deps
$ lein compile
$ lein uberjar
lein jar vs. lein uberjar: The command lein uberjar creates a jar file that contains both the storm-starter code as well as all of its transitive dependencies.

The last command lein uberjar will create a jar file of the storm-starter code at the following location:

target/storm-starter-0.0.1-SNAPSHOT-standalone.jar

We can now use this jar file to submit and run the ExclamationTopology in our Storm cluster.

Submitting the topology to the cluster

The storm jar command submits a topology to the cluster.

1
2
3
4
5
6
7
8
$ /usr/local/storm/bin/storm help jar
Syntax: [storm jar topology-jar-path class ...]

    Runs the main method of class with the specified arguments.
    The storm jars and configs in ~/.storm are put on the classpath.
    The process is configured so that StormSubmitter
    (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
    will upload the jar at topology-jar-path when the topology is submitted.

The following command instructs Storm to launch the ExclamationTopology in distributed mode in the cluster. Behind the scenes the storm-starter jar file is uploaded to the master node running Nimbus (nimbus1) and stored in its storm.local.dir.

1
$ /usr/local/storm/bin/storm jar target/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamation-topology

Example output:

0    [main] INFO  backtype.storm.StormSubmitter  - Jar not uploaded to master yet. Submitting jar...
12   [main] INFO  backtype.storm.StormSubmitter  - Uploading topology jar storm-starter-0.0.1-SNAPSHOT-standalone.jar to assigned location: /app/storm/nimbus/inbox/stormjar-8a208db8-78d5-45e5-9a99-924f03900759.jar
34   [main] INFO  backtype.storm.StormSubmitter  - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-8a208db8-78d5-45e5-9a99-924f03900759.jar
34   [main] INFO  backtype.storm.StormSubmitter  - Submitting topology exclamation-topology in distributed mode with conf {"topology.workers":3,"topology.debug":true}
225  [main] INFO  backtype.storm.StormSubmitter  - Finished submitting topology: exclamation-topology
UnsupportedClassVersionError: You might run into the following error when submitting the topology:

Exception in thread "main" java.lang.UnsupportedClassVersionError: storm/starter/ExclamationTopology : Unsupported major.minor version 51.0
This error indicates that the Java version used for compiling the storm-starter jar is different from the one used to run Storm. For instance, you compiled storm-starter with Java 7 but Storm itself runs on Java 6. Make sure the Java versions are matching to resolve this error.

The second parameter, exclamation-topology, defines the name of the submitted topology instance within the Storm cluster. This name must be unique within the cluster.

Topology names must be unique: If you try to submit another topology under the same name Storm will complain with the error:

Exception in thread "main" java.lang.RuntimeException:
Topology with name `exclamation-topology` already exists on cluster

You can verify whether the submitted topology is running via the storm CLI tool:

1
2
3
4
5
$ /usr/local/storm/bin/storm list

Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
exclamation-topology ACTIVE     16         3            798

Alternatively you can also open the Storm UI. Here you should see a corresponding entry “exclamation-topology” in the UI section Topology Summary.

Stopping the topology

If you want to stop the topology you can kill it by using either the storm CLI tool or the Storm UI.

1
2
3
4
$ /usr/local/storm/bin/storm kill exclamation-topology

0    [main] INFO  backtype.storm.thrift  - Connecting to Nimbus at localhost:6627
33   [main] INFO  backtype.storm.command.kill-topology  - Killed topology: exclamation-topology

You can verify whether the topology was stopped with storm list:

1
2
3
4
5
$ /usr/local/storm/bin/storm list

Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
exclamation-topology KILLED     16         3            161

The KILLED entry will only stay in the list for a while after which it will be removed. At that point Storm will report no running topologies (the KILLLED entry will not be listed anymore):

1
2
3
$ /usr/local/storm/bin/storm list

No topologies running.

Operational FAQ

This section summarizes the most important aspects of cluster management.

Start the Storm cluster?

This section depends on using supervisord for controlling Storm daemons as described in this tutorial.

Run the following command on each node:

On each node, i.e. master and slaves
1
$ sudo service supervisord start

Give the processes a few seconds to start up. After that you can verify the success of the command via supervisorctl:

On master (nimbus1)
1
2
3
$ sudo supervisorctl status
storm-nimbus   RUNNING    pid 17930, uptime 2:24:52
storm-ui       RUNNING    pid 17929, uptime 2:24:52
On slaves
1
2
$ sudo supervisorctl status
storm-supervisor RUNNING    pid 11441, uptime 2:12:22

Stop the Storm cluster?

This section depends on using supervisord for controlling Storm daemons as described in this tutorial.

Run the following command on each node:

1
$ sudo service supervisord stop

Give supervisord a few seconds to shutdown all processes. Note that trying to run supervisorctl when supervisord itself is stopped will result in an error message (this appears to be a known user interface issue in supervisord 2.x).

How to kill ALL Storm processes including worker processes? Any worker threads (launched by the Supervisor daemons on the slave nodes) that happen to be running when you are stopping the cluster will continue to run. This is a deliberate design decision of Storm because it means that crashing/restarting Nimbus and Supervisor daemons will not affect any running topologies in Storm. The downside is that you have to put some extra effort into fully stopping all Storm-related processes in a cluster.

# If you want to kill ALL processes follow this procedure on the slave nodes:
$ sudo supervisorctl stop storm-supervisor
$ sudo pkill -TERM -u storm -f 'backtype.storm.daemon.worker'

Add another worker node?

The Nimbus daemon finds available Supervisors via ZooKeeper, to which the Supervisor daemons register themselves.

If you want to add a new supervisor to your Storm cluster, just configure storm.yaml on the new worker node (similar to the existing ones), and let the Supervisor daemon register itself to ZooKeeper. You can then use the storm rebalance command (see Command line client) to assign work load to the new Supervisor.

See the discussion thread Adding a new server/node/zookeeper to Storm for more information.

Restart Supervisor daemon and worker processes?

It is important to understand that Storm intentionally does not link the processes of Supervisors and the worker processes they spawned:

Storm is designed so that supervisors can die/restart without affecting the workers. Having the life of the workers be independent on the life of Nimbus or Supervisors makes things much easier to manage. It’s a pretty minor tradeoff given how easy it is to just kill all the worker processes if that’s what you want to do.

Here is the basic idea how you would perform a full “restart” of a Supervisor daemon including the worker processes:

Must be run on the node(s) in question
1
2
3
4
5
6
7
8
# stop Supervisor daemon
$ sudo supervisorctl stop storm-supervisor

# kill any running Storm worker processes on the same machine
$ sudo pkill -TERM -u storm -f 'backtype.storm.daemon.worker'

# (re)start Supervisor daemon, which will eventually also spawn new worker processes to resume work
$ sudo supervisorctl start storm-supervisor

See the discussion thread Stopping supervisor and workers for more information.

Prevent broken nodes to impact cluster performance?

Arguably the best approach is to have enough capacity available to cope with one or more machines going down in your Storm cluster, otherwise the performance and stability of any running topologies might suffer.

If Storm doesn’t have enough worker slots to run a topology, it will “pack” all the tasks for that topology into whatever slots there are on the cluster. Then, when there are more worker slots available (like you add another machine), it will redistribute the tasks to the proper number of workers. Of course, running a topology with less workers than intended will probably lead to performance problems. So it’s best practice to have extra capacity available to handle any failures.

Configure the CLI storm command to find Nimbus etc.?

You configure the Nimbus host that the storm CLI tool connects to – and other Storm configuration settings – via your ~/.storm/storm.yaml file.

Submit and run a Storm topology?

Use the storm jar command:

1
2
3
4
5
6
7
8
$ storm help jar
Syntax: [storm jar topology-jar-path class ...]

    Runs the main method of class with the specified arguments.
    The storm jars and configs in ~/.storm are put on the classpath.
    The process is configured so that StormSubmitter
    (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
    will upload the jar at topology-jar-path when the topology is submitted.

Example: The following command line submits the ExclamationTopology in storm-starter under the name “exclamation-topology” to the cluster. Note that some topology implementations may set the topology name via different means (e.g. directly in the code or in a configuration file), and in such cases the storm jar command line would need to be changed accordingly.

1
$ storm jar /path/to/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamation-topology

Stop a running Storm topology?

The storm kill command stops a running topology.

1
$ storm kill <name-of-topology>

For instance, if you have submitted the ExampleTopology from storm-starter as shown in the previous section you would run:

1
$ storm kill exclamation-topology

List any running Storm topologies?

The storm list command shows the list of running (or recently stopped/killed) topologies:

1
2
3
4
$ bin/storm list
[...snip...]
0    [main] INFO  backtype.storm.thrift  - Connecting to Nimbus at localhost:6627
No topologies running.

In order to run this command successfully one of the two following conditions must be met:

  1. You are running the command on the same machine where the Nimbus daemon runs (because by default the storm command expects the Nimbus daemon to be listening on localhost:6627).
  2. You have configured nimbus.host in your $HOME/.storm/storm.yaml file to point to the correct machine (in this tutorial, nimbus1).

Access the Storm UI?

The Storm UI runs on port 8080/tcp by default on the machine where storm ui was executed. In this tutorial the full URL would be:

http://nimbus1:8080/

Monitor the Storm cluster?

Storm includes a few basic utilities to monitor and administrate a running Storm cluster:

  1. The Storm UI
  2. The “storm” command line client

For instance, you can spawn a Clojure REPL with the Storm jars and Storm’s configuration on CLASSPATH. This can be very useful for debugging, similarly to the approach described in the section Debugging, Monitoring and Patching Production in the REPL in Clojure Programming.

1
2
3
4
$ storm repl
[...snip...]
Clojure 1.4.0
user=>

In a production environment you would pair the above utilities with your operations tools of choice, for instance by reading metrics from Storm via Graphite and/or Ganglia, which are then fed into Nagios. You can also ingest the Storm log files into a log analysis tool such as logstash or Splunk.

Use third-party libraries in my Storm code?

You have two options:

  1. (Preferred) Add the third-party library as dependency to your Storm code (which is the common way to include third-party libs anyways). Then create a “fat jar” of your code, which is a jar file that contains your own code plus all of its transitive dependencies (which, again, is the recommended way to build your jar file for Storm anyways). If you use Leiningen, for instance, you can run lein uberjar. Gradle aficionados may want to use the plugin gradle-fatjar-plugin.
  2. Place all relevant third-party jar files in the $STORM_HOME/lib/ directory of the Storm distribution on your cluster. Storm will include all jar files in this directory in the CLASSPATH when a topology is being run.
When adding non-jar files such as XML or .so: Any other files should be included in you jar file. You can then use the Thread.currentThread().getContextClassLoader().getResources(…) function to find resources on your classpath. With regard to .so files, it is recommended to install those on each Storm node and update java.library.path in your storm.yaml accordingly. (source)

Build a correct standalone / fat jar file of my Storm code?

When working with your own code projects be aware that you should not bundle Storm’s own jars with your jar file. If you do Storm will complain, e.g. about “multiple defaults.yaml found” (its “own” in the cluster installation directory plus “your” copy from the included Storm dependency in the jar file). So make sure to exclude the Storm jars in your build process. Here are two examples on how to achieve this with leiningen and with gradle.

Excluding Storm jars when using Leiningen:

project.clj (Leiningen)
1
2
3
4
5
6
7
8
:dependencies [
               [your-third-party/dependencies "1.2.3"]]

;; :dev-dependencies is similar to :dependencies, but the dependencies are not considered
;; to be run-time dependencies -- they are only for use by Leiningen during the build.
;; As such 'lein uberjar' will not include them.
:dev-dependencies [[storm "0.8.2"]
                   [org.clojure/clojure "1.4.0"]])

Excluding Storm jars when using Gradle:

build.gradle (Gradle)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
buildscript {
    repositories {
        mavenCentral()
        mavenRepo url: "http://clojars.org/repo"
    }
    dependencies {
        // see https://github.com/musketyr/gradle-fatjar-plugin
        classpath 'eu.appsatori:gradle-fatjar-plugin:0.2-rc1'
    }
}

// When using this plugin you can build a fat jar / uberjar with 'gradle fatJar'
apply plugin: 'fatjar'

dependencies {
    compile 'storm:storm:0.8.2', {
        ext {
            fatJarExclude = true
        }
    }
}

Where to go from here

Automated deployment of Storm clusters:

  • puppet-storm – a Puppet module I wrote to deploy Storm 0.9+ clusters

Now that you have your Storm cluster up and running you might want to write your first own Storm topology:

Further readings

Change Log

  • 2013-06-06: Created an RPM file for Storm’s JZMQ version. Replaced old install from source instructions with the new RPM install instructions.

Comments