Running a Multi-Node Storm Cluster
In this tutorial I will describe in detail how to set up a distributed, multi-node Storm cluster on RHEL 6. We will install and configure both Storm and ZooKeeper and run their respective daemons under process supervision, similarly to how you would operate them in a production environment. I will show how to run an example topology in the newly built cluster, and conclude with an operational FAQ that answers the most common questions of managing a Storm cluster.
- What we want to do
- Background: Storm architecture
- Networking setup
- Installing ZooKeeper
- Installing Storm
- Running Storm daemons under supervision
- Running an example Storm topology
- Operational FAQ
- Start the Storm cluster?
- Stop the Storm cluster?
- Add another worker node?
- Restart Supervisor daemon and worker processes?
- Prevent broken nodes to impact cluster performance?
- Configure the CLI storm command to find Nimbus etc.?
- Submit and run a Storm topology?
- Stop a running Storm topology?
- List any running Storm topologies?
- Access the Storm UI?
- Monitor the Storm cluster?
- Use third-party libraries in my Storm code?
- Build a correct standalone / fat jar file of my Storm code?
- Where to go from here
- Further readings
- Change Log
What we want to do
Here is an outline of what we want to do:
- Install and configure a distributed, multi-node Storm cluster.
- Use a single-node ZooKeeper cluster for the Storm cluster.
- Use one master node for running Storm’s Nimbus daemon as well as the web-based Storm UI.
- Use three slave nodes for running Storm’s Supervisor daemons and the actual worker processes that execute your Storm application code. As you will see the actual number of slave nodes does not really matter (you can use this tutorial to set up a Storm cluster of fifty nodes if you want to).
- Run an example Storm topology in our new Storm cluster.
Software versions
The instructions described in this article have been tested with:
- RedHat Enterprise Linux RHEL 6.3 (Santiago)
- Storm 0.8.2 release
The same instructions should apply to CentOS 6, too.
Overview of cluster setup
zkserver1
: The only ZooKeeper server in our single-node ZooKeeper cluster.nimbus1
: The master node in the Storm cluster, which will run the Nimbus daemon and, optionally, the Storm UI.slave{1,2,3,...,n}
: The slave nodes in the Storm cluster, which will run the Supervisor daemons including the actual worker processes that execute your Storm application code.
Thanks to Storm’s design – and notably its use of ZooKeeper for coordination purposes – it actually does not matter whether you plan to run just 1 slave node or 100 nodes. The instructions in this tutorial will work regardless of the number of slave nodes. I happen to use 3 slaves for illustration purposes but that’s an arbitrary number. The only thing you need to do is to install, configure and start each of your slave nodes according to the instructions, and Storm will automatically take care of integrating those into the cluster and making good use of them. That’s pretty cool.
Before we start with setting up our Storm cluster let’s use the next section to briefly introduce the various components of Storm.
Background: Storm architecture
Storm distinguishes between two kinds of nodes in a Storm cluster:
- The master node. This nodes runs the so-called Nimbus daemon that is responsible for assigning tasks to the worker nodes, monitoring the cluster for failures, and distributing (your) code around the cluster. If you are familiar with Hadoop, you can think of the Nimbus as the JobTracker of Storm.
- The worker nodes. Each worker node runs an instance of the so-called Supervisor daemon. This daemon listens for work assigned (by Nimbus) to the node it runs on and starts/stops the worker processes as necessary. Each worker process executes a subset of a topology. If you are familiar with Hadoop, you can think of the Supervisors as the TaskTrackers of Storm.
In addition to its own components Storm relies on a ZooKeeper cluster (consisting of one
or more ZooKeeper servers) to perform the coordination between Nimbus and the Supervisors. Apart from using ZooKeeper
for coordination purposes the Nimbus and Supervisors also store all their state in Zookeeper or on local disk (with
regard to the latter see the section on storm.local.dir
below).
Please refer to the official Storm tutorial and my blog post Understanding the Parallelism of a Storm Topology for further details.
Networking setup
Make sure all the nodes can reach each other over the network and that DNS is properly configured. For the sake of
completeness here is a basic /etc/hosts
file that you can use. However there is nothing special here – in the
remainder of this article we will refer to the various machines only by their hostname (e.g. nimbus1
).
# A basic hosts file (for ALL machines)
192.168.10.1 slave1.foo.com slave1
192.168.10.2 slave2.foo.com slave2
192.168.10.3 slave3.foo.com slave3
192.168.10.200 nimbus1.foo.com nimbus1
192.168.10.250 zkserver1.foo.com zkserver1
Installing ZooKeeper
Storm and ZooKeeper
Storm uses Zookeeper for coordinating the various “actors” (e.g. Nimbus, Supervisor daemons) within a Storm cluster. For small installations or installations that are not mission critical it is sufficient to run a single-node ZooKeeper cluster.
Please take a look at section Set up a Zookeeper cluster in the Storm documentation for further details.
Install ZooKeeper
If you happen to use a Hadoop distribution such as Cloudera CDH4 you can use the respective management tool (Cloudera Manager in the case of CDH4) to easily install a single-node or a multi-node ZooKeeper cluster. If you don’t want to go down the management tool route you can install ZooKeeper manually. You can either follow the official ZooKeeper instructions for deploying a ZK cluster, which will require you to install ZK from a release tarball, or you can fall back to pre-built RPMs for RHEL6/CentOS6.
For the context of this article we will pick the latter approach (RPMs). We will rely on Cloudera’s yum repositories to provide us with ZooKeeper RPMs. The first step is to add Cloudera’s CDH4 yum repositories to your machines’ yum configuration (make sure you pick the correct OS version). Please follow the instructions in the previous link to add the Cloudera yum repository. Once you are done you can verify the previous step with the following command:
# Does your machine have the Cloudera CDH4 repository configured?
$ yum repolist | grep cloudera-cdh4
cloudera-cdh4 Cloudera's Distribution for Hadoop, Versio 77+18
Now you can install ZooKeeper as follows:
$ sudo yum install zookeeper zookeeper-server
Configure ZooKeeper
For the sake of brevity I will not describe the ZooKeeper configuration in full detail as the official documentation already provides all the required information in the ZooKeeper Getting Started Guide.
A basic ZooKeeper configuration zoo.cfg
looks as follows:
# /etc/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
# Enable regular purging of old data and transaction logs every 24 hours
autopurge.purgeInterval=24
autopurge.snapRetainCount=5
The last two autopurge.*
settings are very important for production systems. They instruct ZooKeeper to regularly
remove (old) data and transaction logs. The default ZooKeeper configuration does not do this on its own, and if you do
not set up regular purging ZooKeeper will quickly run out of disk space.
Start ZooKeeper
Start the ZooKeeper daemon as follows:
$ zkServer.sh start
In production environments it is strongly recommended to run the ZooKeeper daemon under process supervision. See section Running ZooKeeper in production mode below for details.
Verifying ZooKeeper operation
You can use a tool such as telnet
or netcat
to connect to ZK and interactively run so-called
four letter commands such as
conf
(print details of ZK configuration) and stat
(list brief details of server and connected
clients). This is a simple way to perform basic diagnostics of a ZooKeeper cluster/server.
# Is the ZK running in a non-error state ("aRe yoU OK")?
$ echo ruok | nc zkserver1 2181
imok # I'm ok.
# Show summary stats
$ echo stat | nc zkserver1 2181
Zookeeper version: 3.4.5-cdh4.2.1--1, built on 04/22/2013 16:45 GMT
Clients:
/192.168.10.2:40623[1](queued=0,recved=9422,sent=9422)
/192.168.10.2:40666[1](queued=0,recved=9415,sent=9415)
/192.168.10.3:40639[1](queued=0,recved=9416,sent=9416)
[...snipp...]
Latency min/avg/max: 0/0/17
Received: 424358
Sent: 424357
Connections: 10
Outstanding: 0
Zxid: 0x222
Mode: standalone
Node count: 5
Running ZooKeeper in production mode
ZooKeeper and process supervision
It is strongly recommended to run ZooKeeper under process supervision because Zookeeper is fail-fast and will exit the process if it encounters any error case. On the plus side a ZooKeeper cluster is “self healing”: once restarted the failed server will automatically rejoin the cluster without any manual intervention.
The ZooKeeper documentation covers process supervision in further detail in the section
Supervision.
One option of course is to put ZooKeeper under process supervision via supervisord
(not to be confused with Storm’s
Supervisor daemon) just as we will do for Storm’s own daemons further down below.
Note that when running ZooKeeper under supervision you may want to use zkServer.sh start-foreground
instead of
zkServer.sh start
:
When running zookeeper under supervision, use "zkServer.sh start- foreground" instead of "zkServer.sh start". The regular start command forks and exits too many times for supervisord to track the application state successfully.
Managing ZooKeeper storage
You should double-check the ZooKeeper documentation for making the previously described basic ZooKeeper setup ready for long-running production systems. Notably, take a look at ZooKeeper Maintenance and ZooKeeper Configuration.
Installing Storm
Install dependencies
The following software dependencies must be installed on all Storm nodes, i.e. master and slave nodes.
Java 6
To make things simple we will install OpenJDK 6 in this tutorial.
$ sudo yum install java-1.6.0-openjdk
If you prefer Oracle/Sun JDK 6 (which I personally do) you will need to
download and install the Oracle JDK 6 RPM files manually.
Depending on your work environment you might also have an internal yum repository configured through
which you can install Oracle JDK 6 directly via yum
as shown above for OpenJDK 6.
ZeroMQ 2.1.7
ZeroMQ is “a library which extends the standard socket interfaces with features traditionally provided by specialised messaging middleware products”. Storm relies on ZeroMQ primarily for task-to-task communication in running Storm topologies.
It is important to install exactly the version 2.1.7 of ZeroMQ. Unfortunately an official RPM of ZeroMQ 2.1.7 is not available for RHEL 6. This leaves you with two options:
- Use an RPM created by a third party.
- Build and install ZeroMQ from source
The third (and preferred) option is of course to create your own RPM, but doing so is beyond the scope of this tutorial.
If you are running into any problems during the installation of ZeroMQ, please refer to Installing native dependencies in the Storm documentation and to bin/install_zmq.sh in the Storm release package.
Option 1: Install ZeroMQ from third-party RPM
A pre-built RPM of zeromq
for RHEL 6 is available from
saltstack’s GitHub repository:
Download the file above and install via:
$ cd /tmp
$ wget https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm
$ sudo yum install zeromq-2.1.7-1.el6.x86_64.rpm
Option 2: Install ZeroMQ from source
Prior to the actual installation steps for ZeroMQ as shown below I also needed to install the following packages on my RHEL 6 machines in order to be able to compile ZeroMQ from source:
$ sudo yum install gcc gcc-c++ libuuid-devel
The following instructions build ZeroMQ from source and install it under /usr/local/
:
$ cd /tmp
$ wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
$ tar -xzf zeromq-2.1.7.tar.gz
$ cd zeromq-2.1.7
$ ./configure
$ make
$ sudo make install
JZMQ
JZMQ are the Java bindings for ZeroMQ.
Unfortunately an official RPM of JZMQ is not available for RHEL 6, and I could not find any pre-built RPM created by a third party. For your and my convenience I decided to built my own RPM of JZMQ for RHEL 6 (64bit) via Jordan Sissel’s awesome packaging tool fpm.
Feel free to try installing jzmq-2.1.0.el6.x86_64.rpm instead of installing JZMQ from source as you would usually do, and report back whether the RPM worked for you.
Install JZMQ via RPM as follows:
$ wget https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm
$ sudo yum install jzmq-2.1.0.el6.x86_64.rpm
See also Installing native dependencies in the Storm documentation and bin/install_zmq.sh in the Storm release package.
Off-topic: How I created the JZMQ RPM
Prior to the actual build steps for JZMQ as shown below I first needed to install the following packages on my RHEL 6 machine in order to be able to compile JZMQ from source and then subsequently package it as RPM via fpm:
$ sudo yum install libtool gcc-c++ java-1.6.0-openjdk-devel
$ wget https://github.com/downloads/saltstack/salt/zeromq-devel-2.1.7-1.el6.x86_64.rpm
$ sudo yum install zeromq-devel-2.1.7-1.el6.x86_64.rpm
Here are the build steps I used to create the JZMQ RPM for Storm:
$ git clone https://github.com/nathanmarz/jzmq.git
$ cd jzmq
$ ./autogen.sh
$ env JAVA_HOME=/usr/java/default ./configure --prefix=/usr
$ env JAVA_HOME=/usr/java/default make install DESTDIR=/tmp/installdir
$ fpm -s dir -t rpm -a all -n jzmq -v 2.1.0 -C /tmp/installdir/ \
--maintainer "<michael@michael-noll.com>" \
--vendor github.com/nathanmarz/jzmq \
--url https://github.com/nathanmarz/jzmq.git \
--description "Frozen version of JZMQ that is tested to work with Storm.\nJZMQ is [...snip...]" \
-p jzmq-VERSION.el6.ARCH.rpm \
-d "zeromq == 2.1.7" \
-a "x86_64" \
usr
# => jzmq-2.1.0.el6.x86_64.rpm
Some information about the environment I used to build JZMQ:
$ uname -a
Linux buildhost 2.6.32-279.19.1.el6.x86_64 #1 SMP Sat Nov 24 14:35:28 EST 2012 x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/redhat-release
Red Hat Enterprise Linux Server release 6.3 (Santiago)
$ set | grep JAVA_HOME
JAVA_HOME=/usr/java/default
$ autoconf --version | head -1
autoconf (GNU Autoconf) 2.63
Python 2.6.6
Python 2.6.6 should be installed by default on RHEL 6.3. If it isn’t for some reason, you have a seriously broken machine – don’t consider continuing before you haven’t reinstalled your machine from scratch. :-)
Unzip
The decompression tool unzip
should be installed by default on RHEL 6.3. If it isn’t for some
reason, run the following command:
$ sudo yum install unzip
Unzip is only required to decompress a Storm release package, which comes as a .zip
file.
Create a dedicated Storm system user
The following command will create a user account storm
with group storm
and uid/gid of 53001.
The home directory of the user is set to /app/home/storm/
.
$ sudo groupadd -g 53001 storm
$ sudo mkdir -p /app/home
$ sudo useradd -u 53001 -g 53001 -d /app/home/storm -s /bin/bash storm -c "Storm service account"
$ sudo chmod 700 /app/home/storm
$ sudo chage -I -1 -E -1 -m -1 -M -1 -W -1 -E -1 storm
Download and install a Storm release
You can download a release of Storm from the Storm Downloads section.
Here is a direct link to Storm version 0.8.2:
Yes, the direct link takes you to dl.dropbox.com
. I assume this is because GitHub disabled the
feature to upload files and make them available for download in the Downloads Tab (see their
GoodBye, Uploads blog post for details). So a new place
to host the release files had to be found.
$ cd /tmp
$ wget https://dl.dropbox.com/u/133901206/storm-0.8.2.zip
$ cd /usr/local
$ sudo unzip /tmp/storm-0.8.2.zip
$ sudo chown -R storm:storm storm-0.8.2
$ sudo ln -s storm-0.8.2 storm
Now the Storm installation is available at /usr/local/storm/
. I will refer to this directory as $STORM_HOME
.
Create local working directory for Storm
Create a local directory in which the Nimbus and Supervisor daemons can store (small) amounts of state
such as jar files and configurations. We will use /app/storm/
. This directory will be configured to be used as
storm.local.dir
later on.
$ sudo mkdir -p /app/storm
$ sudo chown -R storm:storm /app/storm
$ sudo chmod 750 /app/storm
Once we have started the various Storm daemons later on you will see files being created under storm.local.dir
by the Nimbus and Supervisor daemons. Here is an example:
# on master node (nimbus1)
$ sudo tree /app/storm
/app/storm
+--- nimbus
+--- inbox
# on slave nodes
$ sudo tree /app/storm
/app/storm
+--- supervisor
+--- isupervisor
| +--- 1368698566625
| +--- 1368698566625.version
+--- localstate
| +--- 1368698594679
| +--- 1368698594679.version
| +--- 1368698597685
| +--- 1368698597685.version
| +--- 1368698597689
| +--- 1368698597689.version
| +--- 1368698597696
| +--- 1368698597696.version
+--- tmp
This local directoy has one particular notable role: If a Supervisor daemon crashes or is deliberately killed, then a restarted Supervisor daemon will pick up where the old process stopped as long as the new process reads from the same local directory.
Configure Storm
Storm is configured through the file conf/storm.yaml
, which overrides the default configuration
values in defaults.yaml.
Open conf/storm.yaml
in a text editor and change/set the following configuration parameters:
# conf/storm.yaml (on each node, master and slaves)
storm.zookeeper.servers:
- "zkserver1"
nimbus.host: "nimbus1"
nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
storm.local.dir: "/app/storm"
See section Fill in mandatory configurations into storm.yaml in the document Setting up a Storm cluster for more information, notably about the configuration parameters:
java.library.path
– to tell Storm where to find the native libraries it uses (ZeroMQ and JZMQ)supervisor.slots.ports
– to configure how many workers processes run on the machine with this configuration. For instance, you could use only 1 port = 1 worker process for a small Amazon EC2 instance, and more ports = more workers for large EC2 instances. Here you would need to give each EC2 instance type its appropriateconf/storm.yaml
with the proper value forsupervisor.slots.ports
. By default Storm uses 4 ports (6700-6703/tcp
) = 4 workers processes per machine. The Storm UI (see below) also calls those workers “slots”. Note that these worker ports will only be in use when the workers are actually running; for instance, if the cluster is idle then no worker processes will be running at all.
Test-drive the Storm cluster
Before we configure process supervision for our cluster’s Storm daemons to make our setup production-ready we will first take our Storm cluster on a manual test drive. This allows us to verify that every node is configured correctly. If we run into any issues we can inspect the Storm log files to identify and fix the problem (see section Troubleshooting below).
Start the Nimbus daemon manually
Start the Nimbus daemon by running the following command on the master node nimbus1
:
# On machine nimbus1
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm nimbus
Example output:
$ bin/storm nimbus
Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/storm-0.8.2 [...snip...] backtype.storm.daemon.nimbus
By default the Nimbus daemon will listen on port 6627, which can be configured via the parameter
nimbus.thrift.port
.
If you do not see any error messages and everything looks ok you can stop the Nimbus daemon again by either:
- Entering
Ctrl-C
in the terminal window where you started the daemon, or - Sending a
SIGKILL
to the Nimbus process (kill -9 <processid>
)
Start the Storm UI daemon manually
Storm ships with a web-based user interface that you can optionally launch on the master node that runs the Nimbus daemon. The Storm UI provides a basic overview of the cluster state by showing cluster-level and topology-level diagnostics.
Start the UI daemon by running the following command on the master node nimbus1
:
# On machine nimbus1
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm ui
Again, you must run this daemon under supervision in production environments as we will describe later.
By default the UI listens on port 8080/tcp
. You can access it via the URL:
http://nimbus1:8080/
Here is a screenshot of the Storm UI as of version 0.8.2:
You can change the UI port by setting the ui.port
configuration parameter in conf/storm.yaml
:
# conf/storm.yaml (on master)
### ui.* configs are for the master
ui.port: 8080
ui.childopts: "-Xmx768m"
If you do not see any error messages and everything looks ok you can stop the Storm daemon again by either:
- Entering
Ctrl-C
in the terminal window where you started the daemon, or - Sending a
SIGKILL
to the Nimbus process (kill -9 <processid>
)
Start the Supervisor daemons manually
Run the following command on each slave node in the Storm cluster to start their respective Supervisor daemons. Upon startup the Supervisors will register themselves with ZooKeeper, where Nimbus can find them.
# On each slave
$ sudo su - storm
$ cd /usr/local/storm
$ bin/storm supervisor
Example output:
$ bin/storm supervisor
Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/storm-0.8.2 [...snip...] backtype.storm.daemon.supervisor
If you do not see any error messages and everything looks ok you can stop the Supervisor daemons again by either:
- Entering
Ctrl-C
in the respective terminal window where you started the daemon, or - Sending a
SIGKILL
to the respective Supervisor process (kill -9 <processid>
)
Storm log files and troubleshooting
By default Storm will write log messages of its daemons to:
- Nimbus:
$STORM_HOME/logs/nimbus.log
- Supervisor:
$STORM_HOME/logs/supervisor.log
- UI:
$STORM_HOME/logs/ui.log
The relevant configuration parameter is -Dlogfile.name=...
in $STORM_HOME/bin/storm
for each daemon.
These log files are the best source of information when performing troubleshooting. Even if everything is working correctly I’d still recommended to take a look at them to familiarize yourself with the kind of information available there.
Running Storm daemons under supervision
For a production environment we do not want to start the Storm daemons manually – doing so is even strongly discouraged:
It is critical that you run each of these daemons under supervision. Storm is a fail-fast system which means the processes will halt whenever an unexpected error is encountered. Storm is designed so that it can safely halt at any point and recover correctly when the process is restarted. This is why Storm keeps no state in-process -- if Nimbus or the Supervisors restart, the running topologies are unaffected.
There are various tools available to supervise processes, for instance supervisord, monit, daemontools and runit. In this tutorial we will be using supervisord because a) an RPM for RHEL 6 is readily available via EPEL (whereas daemontools and runit are not) and b) supervisord is very simple to setup and run (whereas we would need to write init scripts that create PID files – which Storm does not create itself – when using monit because that tool requires PID files in order to work).
What is supervisord?
Supervisor is a client/server system that allows its users to monitor and control a number of processes.
The following sections explain how to install Supervisor 2.x for RHEL 6 from EPEL. Unfortunately Supervisor 3.x is not yet available out of the box, so we have to stick to the older version (which will require us to perform some workarounds as you will see later).
Install RHEL EPEL repository
First we must add the EPEL repository for RHEL 6 so that we can subsequently install supervisord
via yum
.
$ cd /tmp
$ wget http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
$ sudo rpm -Uhv epel-release-6-8.noarch.rpm
Please refer to Installing RHEL EPEL Repo on Centos 5.x or 6.x for more information.
Install supervisord
$ sudo yum install supervisor
$ sudo chkconfig supervisord on
# Recommended: secure supervisord configuration file (may contain user credentials)
$ sudo chmod 600 /etc/supervisord.conf
Create log directories for Storm
We create a directory under /var/log
to which output of Storm daemons captured by supervisord
can be written to.
That said, most (all of?) Storm log messages will be stored in Storm’s own log files under $STORM_HOME/logs/
.
$ sudo mkdir -p /var/log/storm
$ sudo chown -R storm:storm /var/log/storm
Configure supervisord for Storm
Two notes before we start:
- This section uses supervisord 2.x configuration parameters. For instance, supervisord 2.x uses
logfile
whereas this parameter was superseded in supervisord 3.x bystdout_logfile
. - For Cloudera CDH4 users: Apparently Cloudera Manager runs its own supervisord at port 9001/tcp. Make sure to configure “your” supervisord instance appropriately if you decide to use the HTTP interface (whose port defaults to 9001/tcp).
Master node: Nimbus and UI daemons
Add the following lines to /etc/supervisord.conf
on the master node nimbus1
:
[program:storm-nimbus]
command=/usr/local/storm/bin/storm nimbus
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/nimbus.out
logfile_maxbytes=20MB
logfile_backups=10
[program:storm-ui]
command=/usr/local/storm/bin/storm ui
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/ui.out
logfile_maxbytes=20MB
logfile_backups=10
Slave nodes: Supervisor daemons
Add the following lines to /etc/supervisord.conf
on each slave node:
[program:storm-supervisor]
command=/usr/local/storm/bin/storm supervisor
user=storm
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/storm/supervisor.out
logfile_maxbytes=20MB
logfile_backups=10
Start supervisord including Storm daemons
Starting supervisord
will read the configuration /etc/supervisord.conf
and also start any commands defined
therein that have autostart
set to true
. In our case, starting supervisord
will start the Nimbus daemon
and the Storm UI daemon on the master node nimbus1
, and the (Storm) Supervisor daemons on the slave nodes.
Run the following command on each node, master and slaves:
$ sudo service supervisord start
You can verify whether the Storm daemons have been started successfully with supervisorctl
:
# On master node (nimbus1)
$ sudo supervisorctl status
storm-nimbus RUNNING pid 17563, uptime 0:03:12
storm-ui RUNNING pid 17558, uptime 0:03:12
# On slave nodes
$ sudo supervisorctl status
storm-supervisor RUNNING pid 17561, uptime 0:03:12
You can also inspect the supervisord
log file at /var/log/supervisor/supervisord.log
for any issues you might
encounter. For example, here is a snippet from the master node nimbus1
:
INFO supervisord started with pid 20490
INFO spawned: 'storm-ui' with pid 20492
INFO spawned: 'storm-nimbus' with pid 20497
INFO success: storm-ui entered RUNNING state, process has stayed up for > than 10 seconds (startsecs)
INFO success: storm-nimbus entered RUNNING state, process has stayed up for > than 10 seconds (startsecs)
Running an example Storm topology
Prerequisites
In the following sections we will execute the storm
command quite a bit. This command requires access to a working
Storm configuration. For this reason you must provide each client machine that you intend to use for controlling your
Storm cluster – including submitting topologies – with a proper Storm configuration. Some people also call those
client machines “gateway machines” as they act like a bridge between your Storm cluster and other infrastructure
services.
The easiest way is to set up the shell environment of user accounts on client machines is to copy the cluster’s
conf/storm.yaml
file to ~/.storm/storm.yaml
.
# Set up shell environment of user accounts on client/gateway machines
# When storm.yaml is available locally
$ cp /usr/local/storm/conf/storm.yaml ~/.storm/storm.yaml
# Otherwise retrieve the master node's config file
$ scp nimbus1:/usr/local/storm/conf/storm.yaml ~/.storm/storm.yaml
Downloading and building storm-starter
We will use the storm-starter example topologies to run a first Storm topology in our cluster.
$ cd /tmp
$ git clone git@github.com:nathanmarz/storm-starter.git
$ cd storm-starter
$ lein deps
$ lein compile
$ lein uberjar
The last command lein uberjar
will create a jar file of the storm-starter code at the following location:
target/storm-starter-0.0.1-SNAPSHOT-standalone.jar
We can now use this jar file to submit and run the ExclamationTopology
in our Storm cluster.
Submitting the topology to the cluster
The storm jar
command submits a topology to the cluster.
$ /usr/local/storm/bin/storm help jar
Syntax: [storm jar topology-jar-path class ...]
Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
The following command instructs Storm to launch the ExclamationTopology
in distributed mode in the
cluster. Behind the scenes the storm-starter jar file is uploaded to the master node running Nimbus
(nimbus1
) and stored in its storm.local.dir
.
$ /usr/local/storm/bin/storm jar target/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamation-topology
Example output:
0 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
12 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar storm-starter-0.0.1-SNAPSHOT-standalone.jar to assigned location: /app/storm/nimbus/inbox/stormjar-8a208db8-78d5-45e5-9a99-924f03900759.jar
34 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-8a208db8-78d5-45e5-9a99-924f03900759.jar
34 [main] INFO backtype.storm.StormSubmitter - Submitting topology exclamation-topology in distributed mode with conf {"topology.workers":3,"topology.debug":true}
225 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: exclamation-topology
Exception in thread "main" java.lang.UnsupportedClassVersionError: storm/starter/ExclamationTopology : Unsupported major.minor version 51.0
This error indicates that the Java version used for compiling the storm-starter jar is different from the one used to
run Storm. For instance, you compiled storm-starter with Java 7 but Storm itself runs on Java 6. Make sure the Java
versions are matching to resolve this error.
The second parameter, exclamation-topology
, defines the name of the submitted topology instance within
the Storm cluster. This name must be unique within the cluster.
Exception in thread "main" java.lang.RuntimeException:
Topology with name `exclamation-topology` already exists on cluster
You can verify whether the submitted topology is running via the storm
CLI tool:
$ /usr/local/storm/bin/storm list
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
exclamation-topology ACTIVE 16 3 798
Alternatively you can also open the Storm UI. Here you should see a corresponding entry “exclamation-topology” in the UI section Topology Summary.
Stopping the topology
If you want to stop the topology you can kill it by using either the storm
CLI tool or the Storm UI.
$ /usr/local/storm/bin/storm kill exclamation-topology
0 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627
33 [main] INFO backtype.storm.command.kill-topology - Killed topology: exclamation-topology
You can verify whether the topology was stopped with storm list
:
$ /usr/local/storm/bin/storm list
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
exclamation-topology KILLED 16 3 161
The KILLED
entry will only stay in the list for a while after which it will be removed. At that point
Storm will report no running topologies (the KILLLED
entry will not be listed anymore):
$ /usr/local/storm/bin/storm list
No topologies running.
Operational FAQ
This section summarizes the most important aspects of cluster management.
Start the Storm cluster?
This section depends on using supervisord for controlling Storm daemons as described in this tutorial.
Run the following command on each node:
# On each node, i.e. master and slaves
$ sudo service supervisord start
Give the processes a few seconds to start up. After that you can verify the success of the command via
supervisorctl
:
# On master (nimbus1)
$ sudo supervisorctl status
storm-nimbus RUNNING pid 17930, uptime 2:24:52
storm-ui RUNNING pid 17929, uptime 2:24:52
# On slaves
$ sudo supervisorctl status
storm-supervisor RUNNING pid 11441, uptime 2:12:22
Stop the Storm cluster?
This section depends on using supervisord for controlling Storm daemons as described in this tutorial.
Run the following command on each node:
$ sudo service supervisord stop
Give supervisord
a few seconds to shutdown all processes. Note that trying to run supervisorctl
when supervisord itself is stopped will result in an error message (this appears to be a known user interface
issue in supervisord 2.x).
# If you want to kill ALL processes follow this procedure on the slave nodes:
$ sudo supervisorctl stop storm-supervisor
$ sudo pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
Add another worker node?
The Nimbus daemon finds available Supervisors via ZooKeeper, to which the Supervisor daemons register themselves.
If you want to add a new supervisor to your Storm cluster, just configure storm.yaml
on the new worker node (similar
to the existing ones), and let the Supervisor daemon register itself to ZooKeeper. You can then use the
storm rebalance
command (see Command line client) to
assign work load to the new Supervisor.
See the discussion thread Adding a new server/node/zookeeper to Storm for more information.
Restart Supervisor daemon and worker processes?
It is important to understand that Storm intentionally does not link the processes of Supervisors and the worker processes they spawned:
Storm is designed so that supervisors can die/restart without affecting the workers. Having the life of the workers be independent on the life of Nimbus or Supervisors makes things much easier to manage. It's a pretty minor tradeoff given how easy it is to just kill all the worker processes if that's what you want to do.
Here is the basic idea how you would perform a full “restart” of a Supervisor daemon including the worker processes:
# Must be run on the node(s) in question
# stop Supervisor daemon
$ sudo supervisorctl stop storm-supervisor
# kill any running Storm worker processes on the same machine
$ sudo pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
# (re)start Supervisor daemon, which will eventually also spawn new worker processes to resume work
$ sudo supervisorctl start storm-supervisor
See the discussion thread Stopping supervisor and workers for more information.
Prevent broken nodes to impact cluster performance?
Arguably the best approach is to have enough capacity available to cope with one or more machines going down in your Storm cluster, otherwise the performance and stability of any running topologies might suffer.
If Storm doesn't have enough worker slots to run a topology, it will "pack" all the tasks for that topology into whatever slots there are on the cluster. Then, when there are more worker slots available (like you add another machine), it will redistribute the tasks to the proper number of workers. Of course, running a topology with less workers than intended will probably lead to performance problems. So it's best practice to have extra capacity available to handle any failures.
Configure the CLI storm command to find Nimbus etc.?
You configure the Nimbus host that the storm
CLI tool connects to – and other Storm configuration settings – via
your ~/.storm/storm.yaml
file.
Submit and run a Storm topology?
Use the storm jar
command:
$ storm help jar
Syntax: [storm jar topology-jar-path class ...]
Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
Example: The following command line submits the ExclamationTopology in
storm-starter under the name “exclamation-topology” to the
cluster. Note that some topology implementations may set the topology name via different means (e.g.
directly in the code or in a configuration file), and in such cases the storm jar
command line would need
to be changed accordingly.
$ storm jar /path/to/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamation-topology
Stop a running Storm topology?
The storm kill
command stops a running topology.
$ storm kill <name-of-topology>
For instance, if you have submitted the ExampleTopology from storm-starter as shown in the previous section you would run:
$ storm kill exclamation-topology
List any running Storm topologies?
The storm list
command shows the list of running (or recently stopped/killed) topologies:
$ bin/storm list
[...snip...]
0 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627
No topologies running.
In order to run this command successfully one of the two following conditions must be met:
- You are running the command on the same machine where the Nimbus daemon runs (because by default the
storm
command expects the Nimbus daemon to be listening onlocalhost:6627
). - You have configured
nimbus.host
in your$HOME/.storm/storm.yaml
file to point to the correct machine (in this tutorial,nimbus1
).
Access the Storm UI?
The Storm UI runs on port 8080/tcp
by default on the machine where storm ui
was executed. In this
tutorial the full URL would be:
http://nimbus1:8080/
Monitor the Storm cluster?
Storm includes a few basic utilities to monitor and administrate a running Storm cluster:
- The Storm UI
- The “storm” command line client
For instance, you can spawn a Clojure REPL with the Storm jars and Storm’s configuration on CLASSPATH
. This can be
very useful for debugging, similarly to the approach described in the section
Debugging, Monitoring and Patching Production in the REPL in
Clojure Programming.
$ storm repl
[...snip...]
Clojure 1.4.0
user=>
In a production environment you would pair the above utilities with your operations tools of choice, for instance by reading metrics from Storm via Graphite and/or Ganglia, which are then fed into Nagios. You can also ingest the Storm log files into a log analysis tool such as logstash or Splunk.
Use third-party libraries in my Storm code?
You have two options:
- (Preferred) Add the third-party library as dependency to your Storm code (which is the common way to include
third-party libs anyways). Then create a “fat jar” of your code, which is a jar file that contains your own code
plus all of its transitive dependencies (which, again, is the recommended way to build your jar file for Storm
anyways). If you use Leiningen, for instance, you can run
lein uberjar
. Gradle aficionados may want to use the plugin gradle-fatjar-plugin. - Place all relevant third-party jar files in the
$STORM_HOME/lib/
directory of the Storm distribution on your cluster. Storm will include all jar files in this directory in theCLASSPATH
when a topology is being run.
Build a correct standalone / fat jar file of my Storm code?
When working with your own code projects be aware that
you should not bundle Storm’s own jars with your jar file.
If you do Storm will complain, e.g. about “multiple defaults.yaml
found” (its “own” in the cluster installation
directory plus “your” copy from the included Storm dependency in the jar file). So make sure to exclude the Storm jars
in your build process. Here are two examples on how to achieve this with leiningen and with gradle.
Excluding Storm jars when using Leiningen, in project.clj
:
:dependencies [
[your-third-party/dependencies "1.2.3"]]
;; :dev-dependencies is similar to :dependencies, but the dependencies are not considered
;; to be run-time dependencies -- they are only for use by Leiningen during the build.
;; As such 'lein uberjar' will not include them.
:dev-dependencies [[storm "0.8.2"]
[org.clojure/clojure "1.4.0"]])
Excluding Storm jars when using Gradle, in build.gradle
:
buildscript {
repositories {
mavenCentral()
mavenRepo url: "http://clojars.org/repo"
}
dependencies {
// see https://github.com/musketyr/gradle-fatjar-plugin
classpath 'eu.appsatori:gradle-fatjar-plugin:0.2-rc1'
}
}
// When using this plugin you can build a fat jar / uberjar with 'gradle fatJar'
apply plugin: 'fatjar'
dependencies {
compile 'storm:storm:0.8.2', {
ext {
fatJarExclude = true
}
}
}
Where to go from here
Automated deployment of Storm clusters:
- puppet-storm – a Puppet module I wrote to deploy Storm 0.9+ clusters
Now that you have your Storm cluster up and running you might want to write your first own Storm topology:
- Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm In this article I describe in detail how to write a Storm topology that can be used for performing analyses such as trending topics on Twitter.
- The storm-starter project contains a few example topologies that are a very good starting point for beginners. In fact, the Rolling Top Words topology I describe in the previously mentioned article is part of storm-starter.
Further readings
- Storm documentation:
- Setting up a Storm cluster
- conf/defaults.yaml – default configuration settings of Storm (well worth a read!)
- By yours truly:
- By others:
- storm-rhel-packaging – Packaging for RedHat and Fedora style RPM installations, including init.d scripts, default configurations, and a .spec file for building an RPM.
- Installing a Storm cluster on CentOS hosts – blog post by Jan Sipke, Jan 2013
- Storm installation on single machine – blog post by 10jumps, Mar 2013
- FPM – Build packages for multiple platforms (deb, rpm, etc) with great ease and sanity
Change Log
- 2013-06-06: Created an RPM file for Storm’s JZMQ version. Replaced old install from source instructions with the new RPM install instructions.