So you got your first distributed Storm cluster installed and have your first topologies up and running. Great! Now you want to integrate your Storm applications with your monitoring systems and begin tracking application-level metrics from your topologies. In this article I show you how to integrate Storm with the popular Graphite monitoring system. This, combined with the Storm UI, will provide you with actionable information to tune the performance of your topologies and also help you to track key business as well as technical metrics.
Background: What is Graphite?
Quoting from Graphite’s documentation, Graphite does two things:
- Store numeric time-series data
- Generate and render graphs of this data on demand
What Graphite does not do is collect the actual input data for you, i.e. your system or application metrics. The purpose of this blog post is to show how you can do this for your Storm applications.
What we want to do
Spatial granularity of metrics
For the context of this post we want to use Graphite to track the number of received tuples of an example bolt per node in the Storm cluster. This allows us, say, to pinpoint a potential topology bottleneck to specific machines in the Storm cluster – and this is particularly powerful if we already track system metrics (CPU load, memory usage, network traffic and such) in Graphite because then you can correlate system and application level metrics.
Keep in mind that in Storm multiple instances of a bolt may run on a given node, and its instances may also run on many
different nodes. Our challenge will be to configure Storm and Graphite in a way that we are able to correctly collect
and aggregate all individual values reported by those many instances of the bolt. Also, the total value of these
per-host tuple counts should ideally match the bolt’s
Executed value – which means the number of executed tuples of
a bolt (i.e. across all instances of the bolt in a topology) – in the Storm UI.
We will track the number of received tuples of our example bolt through the following metrics, where HOSTNAME is a
placeholder for the hostname of a particular Storm node (e.g.
production.apps.graphitedemo.HOSTNAME.tuples.received.m1_rate– 1-minute rate
production.apps.graphitedemo.HOSTNAME.tuples.received.m5_rate– 5-minute rate
production.apps.graphitedemo.HOSTNAME.tuples.received.m15_rate– 15-minute rate
production.apps.graphitedemo.HOSTNAME.tuples.received.mean_rate– average rate/sec
Here, the prefix of the metric namespace
production.apps.graphitedemo.HOSTNAME.tuples.received is defined by us.
Splitting up this “high-level” metric into a
count metric and four rate metrics –
is automatically done by the Metrics Java library.
Temporal granularity of metrics
Because Storm is a real-time analytics platform we want to use a shorter time window for metrics updates than Graphite’s default, which is one minute. In our case we will report metrics data every 10 seconds (the finest granularity that Graphite supports is one second).
- We are using a single Graphite server called
carbon-aggregatordaemons of Graphite are both running on the Graphite server machine, i.e.
carbon-aggregatorwill send its updates to the
carbon-cachedaemon running at
127.0.0.1. Also, our Storm topology will send all its metrics to this Graphite server.
Thankfully the specifics of the Storm cluster such as hostnames of nodes do not matter. So the approach described here should work nicely with your existing Storm cluster.
Desired outcome: graphs and dashboards
The desired end result are graphs and dashboards similar to the following Graphite screenshot:
The instructions in this article have been tested on RHEL/CentOS 6 with the following software versions:
- Storm 0.9.0-rc2
- Graphite 0.9.12 (stock version available in EPEL for RHEL6)
- Metrics 3.0.1
- Oracle JDK 6
Note that I will not cover the installation of Storm or Graphite in this post.
A Graphite primer
Understanding how Graphite handles incoming data
One pitfall for Graphite beginners is the default behavior of Graphite to discard all but the last update message
received during a given time slot (the default size of a time slot for metrics in Graphite is 60 seconds). For example,
if we are sending the metric values
4 during the same time slot then Graphite will first store a value of
and as soon as the value
4 arrives it will overwrite the stored value from
4 (but not sum it up to
The following diagram shows what happens if Graphite receives multiple updates during the same time slot when we are NOT
using an aggregator such as carbon-aggregator or
statsd in between. In this example we use a time slot of 10 seconds for the metric.
Note again that in this scenario you might see, for instance, “flapping” values for the second time slot (window of
seconds 10 to 20) depending on when you would query Graphite: If you queried Graphite at second 15 for the 10-20 time
slot, you would receive a return value of
3, and if you queried only a few seconds later you would start receiving the
final value of
7 ( the latter of which would then never change anymore).
In most situations losing all but the last update of a given time slot is not what you want. The next diagram shows how aggregators solve the “only the last update counts” problem. A nice property of aggregators is that they are transparent to the client who can continue to send updates as soon as it sees fit – the aggregators will ensure that Graphite will only see a single, merged update message for the time slot.
Implications of Storm’s execution model
In the case of Storm you implement a bolt (or spout) as a single class, e.g. by extending
BaseBasicBolt. So following
the User Manual of the Metrics library seems to be a straight-forward way to add
Graphite support to your Storm bolts. However you must be aware of how Storm will actually execute your topology
behind the scenes – see my earlier post on
Understanding the Parallelism of a Storm Topology:
- In Storm each bolt typically runs in the form of many bolt instances in a single worker process, and thus you have many bolt instances in a single JVM.
- In Storm there are typically many such workers (and thus JVMs) per machine, so you end up with many instances of the same bolt running across many workers/JVMs on a particular machine.
- On top of that a bolt’s instances will also be spread across many different machines in the Storm cluster, so in total you will typically have many bolt instances running in many JVMs across many Storm nodes.
Our challenge to integrate Storm with Graphite can thus be stated as: How can we ensure that we are reporting metrics from our Storm topology to Graphite in such way that a) we are counting tuples correctly across all bolt instances, and b) the many metric update messages are not canceling each other out? In other words, how can we keep Storm’s highly distributed nature in check and make it play nice with Graphite?
Overview of the approach described in this post
Here is an overview of the approach we will be using:
- Each instance of our example Storm bolt gets its own (Java) instance of Meter. This ensures that each bolt instance tracks its count of received tuples separately from any other bolt instance.
- Also, each bolt instance will get its own instance of GraphiteReporter to ensures that each bolt instance sends only a single metrics update every 10 seconds, which is the desired temporal granularity for our monitoring setup.
- All bolt instances on a given Storm node report their metrics under the node’s hostname. For instance, bolt
instances on the machine
storm-node01.example.comwill report their metrics under the namespace
- Metrics are being sent to a
carbon-aggregatorinstance running at
carbon-aggregatorensures that all the individual metrics updates (from bolt instances) of a particular Storm node are aggregated into a single, per-host metric update. These per-host metric updates are then forwarded to the
carbon-cacheinstance, which will store the metric data in the corresponding Whisper database files.
Other approaches (not used)
Another strategy is to install an aggregator intermediary (such as statsd) on each machine in the Storm cluster. Instances of a bolt on the same machine would be sending their individual updates to this per-host aggregator daemon, which in turn would send a single, per-host update message to Graphite. I am sure this approach would have worked but I decided to not go down this path. It would have increased the deployment complexity because now we’d have one more software package to understand, support and manage per machine.
The final setup described in this post achieves what we want by using
GraphiteReporter in our Storm code in a way
that is compatible with Graphite’s built-in daemons without needing any additional software such as
On a completely different note, Storm 0.9 now also comes with its own metrics system, which I do not cover here.
This new metrics feature of Storm allows you to collect arbitrarily custom metrics over fixed time windows. Those
metrics are exported to a metrics stream that you can consume by implementing
and configured with
Config – see the
*_METRICS_* settings. Then you need to use
TopologyContext#registerMetric() to register new metrics.
Integrating Storm with Graphite
I will only cover the key settings of Graphite for the context of this article, which are the settings related to
carbon-aggregator. Those settings must match the settings in your Storm code. Matching
settings between Storm and Graphite is critical – if they don’t you will end up with junk metric data.
First we must add a
[production_apps] section (the name itself is not relevant, it should only be descriptive) to
/etc/carbon/storage-schemas.conf. This controls at which granularity Graphite will store incoming metrics that we are
sending from our Storm topology. Notably these storage schema settings control:
- The minimum temporal granularity for the “raw” incoming metric updates of a given metric namespace: In our case, for
instance, we want Graphite to track metrics at a raw granularity of 10 seconds for the first two days. We configure
10s:2d. This minimum granularity (10 seconds) must match the report interval we use in our Storm code.
- How Graphite aggregates older metric values that have already been stored in its Whisper database files:
In our case we tell Graphite to aggregate any values older than two days into 5-minute buckets that we want to keep
for one year, hence
5m:1y. This setting (5 minutes) is independent from our Storm code.
1 2 3 4 5 6 7 8 9 10 11 12
Next we must tell Graphite which aggregation method – e.g.
average – it should use to perform storage
aggregation of our metrics. For count-type metrics, for instance, we want to use
sum and for rate-type metrics we
want to use
average. By adding the following lines to
/etc/carbon/storage-aggregation.conf we ensure that Graphite
correctly aggregates the default metrics sent by Metrics’ GraphiteReporter –
mean_rate – once two days have passed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
Lastly, make sure that the
carbon-cache daemon is actually enabled in your
/etc/carbon/carbon.conf and configured to
receive incoming data on its
2003/tcp and also (!) on its
2004/tcp. The latter port is used by
carbon-aggregator, which we will configure in the next section.
Example configuration snippet:
1 2 3 4 5 6 7 8 9
Don’t forget to restart
carbon-cache after changing its configuration:
$ sudo service carbon-cache restart
The last Graphite configuration we must perform is to ensure that we can pre-aggregrate the number of reported
tuples.received values across all bolt instances that run on a particular Storm node.
To perform this per-host aggregation on the fly we must add the following lines to
With those settings whenever we are sending a metric such as
production.apps.graphitedemo.storm-node01.tuples.received.count from any bolt instance running on
Graphite (more correctly, its
carbon-aggregator daemon), it will aggregate (here:
sum) all such update messages for
storm-node01 into a single, aggregated update message every 10 seconds for that server.
1 2 3 4 5
Lastly, make sure that the
carbon-aggregator daemon is actually enabled in your
configured to receive incoming data on its
2023/tcp. Also, make sure it sends its aggregates
2004/tcp). See the
Example configuration snippet:
1 2 3 4 5 6 7 8 9 10
Don’t forget to restart
carbon-aggregator after changing its configuration:
$ sudo service carbon-aggregator restart
Other important Graphite settings
You may also want to check the values of the following Carbon settings in
/etc/carbon/carbon.conf, particularly if you
are sending a lot of different metrics (= high number of metrics such as
my.bar) and/or a lot of metric
update messages per second (= high number of incoming metric updates for
Whether or not you need to tune those settings depends on your specific use case. As a rule of thumb: The more Storm nodes you have, the higher the topology’s parallelism and the higher your data volume, the more likely you will need to optimize those settings. If you are not sure, leave them at their defaults and revisit later.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
Configuring your Storm code
Add the Metrics library to your Storm code project
The instructions below are for Gradle but it is straight-forward to adapt them to Maven if that’s your tool of choice.
Now that we have finished the Graphite setup we can turn our attention to augmenting our Storm code to work with
Graphite. Make sure
build.gradle in your Storm code project looks similar to the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
You can then run the usual gradle commands to compile, test and package your code. Particularly, you can now run:
$ gradle clean fatJar
This command will create a fat jar (also called uber jar) of your Storm topology code, which will be stored under
build/libs/*.jar by default. You can use this jar file to submit your topology to Storm via the
storm jar command.
See the section on how to
build a correct standalone jar file of your Storm code
in my Storm multi-node cluster tutorial for details.
Sending metrics from a Storm bolt to Graphite
In this section we will augment a Storm bolt (spouts will work just the same) to report our
tuples.received metrics to
Our bolt, i.e. its instances, will send this metric under the Graphite namespace
production.apps.graphitedemo.HOSTNAME.tuples.received.* every 10 seconds to the
carbon-aggregator daemon running at
The key points of the code below are, firstly, the use of a
transient private field for the
Meter instance. If
you do not make the field
transient Storm will throw a
NotSerializableException during runtime. This is because
Storm will serialize the code that a Storm worker needs to execute and ship it to the worker via the network. For this
reason the code of our bolt will initialize the
Meter instance during the
prepare() phase of a bolt instance, which
ensures that the
Meter instance is set up before the first tuples arrive at the bolt instance. So this part
achieves proper counting of the tuples.
prepare() method also creates a new, dedicated
GraphiteReporter instance for each bolt instance. This
achieves proper reporting of metric updates to Graphite.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
That’s it! Your Storm bolt instances will report their respective counts of received tuples to Graphite every 10 seconds.
At this point you should have successfully married Storm with Graphite, and also learned a few basics about how Graphite and Storm work along the way. Now you can begin creating graphs and dashboards for your Storm applications, which was the reason to do all this in the first place, right?
Where to go from here
- Want to install and configure Graphite automatically? Take a look at my puppet-graphite module for Puppet. See also my previous post on Installing and Running Graphite via RPM and Supervisord for an alternative, manual installation approach.
- Storm exposes a plethora of built-in metrics that greatly augment the application-level metrics we described in this article. In 2015 we open sourced storm-graphite, which automatically forwards these built-in metrics from Storm to Graphite. You can enable storm-graphite globally in your Storm cluster or selectively for only a subset of your topologies.
- You should start sending system metrics (CPU, memory and such) to Graphite, too. This allows you to correlate the performance of your Storm topologies with the health of the machines in the cluster. Very helpful for detecting and fixing bottlenecks! There are a couple of tools that can collect these system metrics for you and forward them to Graphite. One of those tools is Diamond. Take a look at my puppet-diamond Puppet module to automatically install and configure Diamond on your Storm cluster nodes.
- Want to install and configure Storm automatically? I am about to release an automated deployment tool called
Wirbelsturm very soon, which will allow you to deploy software such as Storm and Kafka. Wirbelsturm is essentially a
curated collection of Puppet modules (that can be used standalone, too) plus a ready-to-use
Vagrant setup to deploy machines locally and to, say, Amazon AWS.
puppet-diamondabove are part of the package, by the way. Please stay tuned! In the meantime my tutorial Running a Multi-Node Storm Cluster should get you started.
Caveat: Storm samples metrics for the Storm UI
If you do want to compare values 1:1 between the Storm UI and Graphite please be aware that by default Storm samples
incoming tuples for computing stats. By default it uses a sampling rate of 0.05 (5%), which is an option configurable
The way it works is that if you choose a sampling rate of 0.05, it will pick a random element of the next 20 events in which to increase the count by 20. So if you have 20 tasks for that bolt, your stats could be off by +-380.
To force Storm to count everything exactly to achieve accurate numbers at the cost of a big performance hit to your topology you can set the sampling rate to 100%: