In this article I introduce some of the benchmarking and testing tools that are included in the Apache Hadoop distribution. Namely, we look at the benchmarks TestDFSIO, TeraSort, NNBench and MRBench. These are popular choices to benchmark and stress test an Hadoop cluster. Hence knowing how to run these tools will help you to shake out your cluster in terms of architecture, hardware and software, to measure its performance and also to share and compare your results with other people.

# Before we start

## What we (not) want to do

That said, at any time feel free to contribute back to the Hadoop project by helping to improve the status of the documentation. I know I’ll try to do my fair share.

## Prerequisites

If you want to follow along the examples in this article, you obviously need access to a running cluster. In the case that you are still trying to install and configure your cluster, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.

I put the focus on the benchmark and testing tools shipped with Hadoop version 0.20.2. At this moment this is the latest production-ready release Hadoop (0.21 is not!). Until Hadoop 0.22 is out, a lot of Hadoop users (including me) are therefore sticking to the tested and true Hadoop 0.20.2 release.

If your cluster is running the stock version of Hadoop 0.20.2 release, you might run into NotReplicatedYetException and/or AlreadyBeingCreatedException “errors” while test-driving your cluster (as you can see in the output below that they are actually logged as INFO):

2011-02-11 03:48:05,367 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 54310, call addBlock(/user/hduser/terasort-input/_temporary/_attempt_201102110201_0008_m_000020_0/part-00020, DFSClient_attempt_201102110201_0008_m_000020_0) from 192.168.0.2:45133: error: org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException: Not replicated yet:/user/hduser/terasort-input/_temporary/_attempt_201102110201_0008_m_000020_0/part-00020
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)


This often happens when you delete a large amount of HDFS data on the cluster – and this is something that you might want to do in between test runs so that your cluster does not fill up. This is a known problem in 0.20.2 and fixed in 0.21.0 (see HDFS-611). Fortunately, there is a patch available: HDFS-611.branch-20.v6.patch (link) is the file you need for Hadoop 0.20.2.

Another workaround is to simply wait some minutes after deleting a large amount of data before starting another test run. You might want to monitor the NameNode’s log file to check when the asynchronous background delete processes have finished their work.

# Overview of Benchmarks and Testing Tools

The Hadoop distribution comes with a number of benchmarks, which are bundled in hadoop-*test*.jar and hadoop-*examples*.jar. The four benchmarks we will be looking at in more details are TestDFSIO, nnbench, mrbench (in hadoop-*test*.jar) and TeraGen / TeraSort / TeraValidate (in hadoop-*examples*.jar).

Here is the full list of available options in hadoop-*test*.jar:

# change to your Hadoop installation directory;
$cd /usr/local/hadoop$ bin/hadoop jar hadoop-*test*.jar
An example program must be given as the first argument.
Valid program names are:
DFSCIOTest: Distributed i/o benchmark of libhdfs.
DistributedFSCheck: Distributed checkup of the file system consistency.
MRReliabilityTest: A program that tests the reliability of the MR framework by injecting faults/failures
TestDFSIO: Distributed i/o benchmark.
dfsthroughput: measure hdfs throughput
filebench: Benchmark SequenceFile(Input|Output)Format (block,record compressed and uncompressed), Text(Input|Output)Format (compressed and uncompressed)
mapredtest: A map/reduce test check.
mrbench: A map/reduce benchmark that can create many small jobs
nnbench: A benchmark that stresses the namenode.
testarrayfile: A test for flat files of binary key/value pairs.
testbigmapoutput: A map/reduce program that works on a very big non-splittable file and does identity map/reduce
testfilesystem: A test for FileSystem read/write.
testipc: A test for ipc.
testmapredsort: A map/reduce program that validates the map-reduce framework's sort.
testrpc: A test for rpc.
testsequencefile: A test for flat files of binary key value pairs.
testsequencefileinputformat: A test for sequence file input format.
testsetfile: A test for flat files of binary key/value pairs.
testtextinputformat: A test for text input format.
threadedmapbench: A map/reduce benchmark that compares the performance of maps with multiple spills over maps with 1 spill


And here is the full list of available options for hadoop-*examples*.jar:

$bin/hadoop jar hadoop-*examples*.jar An example program must be given as the first argument. Valid program names are: aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. dbcount: An example job that count the pageview counts from a database. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that effects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomino: A map/reduce tile laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using monte-carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sleep: A job that sleeps at each map and reduce task. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files.  The wordcount example, for instance, is a typical “Hello, World” test that you can run after you have finished installing a cluster. And before we start, here’s a nifty trick for your tests: When running the benchmarks described in the following sections, you might want to use the Unix time command to measure the elapsed time. This saves you the hassle of navigating to the Hadoop JobTracker web interface to get the (almost) same information. Simply prefix every Hadoop command with time: $ time hadoop jar hadoop-*examples*.jar ...
[...]
real    9m15.510s
user    0m7.075s
sys     0m0.584s


The relevant metric is the real value in the first row.

# TestDFSIO

The TestDFSIO benchmark is a read and write test for HDFS. It is helpful for tasks such as stress testing HDFS, to discover performance bottlenecks in your network, to shake out the hardware, OS and Hadoop setup of your cluster machines (particularly the NameNode and the DataNodes) and to give you a first impression of how fast your cluster is in terms of I/O.

Note: Because this test is run as a MapReduce job, the MapReduce stack of the cluster must be correctly working. In other words, this test cannot be used to benchmark HDFS in isolation from MapReduce.

An official documentation does not seem to exist in Hadoop 0.20.2, so the only way to understand the test in greater detail is to inspect the source code found in $HADOOP_HOME/src/test/org/apache/hadoop/fs/TestDFSIO.java. ## Preliminaries ### 1. The default output directory is /benchmarks/TestDFSIO When a write test is run via -write, the TestDFSIO benchmark writes its files to /benchmarks/TestDFSIO on HDFS. Files from older write runs are overwritten. If you want to preserve the output files of previous runs, you have to copy/move these files manually to a new HDFS location. Benchmark results are saved in a local file called TestDFSIO_results.log in the current local directory (results are appended if the file already exists) and also printed to STDOUT. If you want to use a different filename, set the -resFile parameter appropriately (e.g. -resFile foo.txt). Normally, you would be able to set the property test.build.data to specify a custom output directory for TestDFSIO's output files (e.g. hadoop jar hadoop-*test*.jar TestDFSIO -D test.build.data=/benchmarks/mytestdfsio ...). But due to a known bug in TestDFSIO this will not work (HADOOP-6074). The reason is that TestDFSIO.java sets up the job configuration object via Configuration conf = new Configuration(); but it subsequently forgets to parse and include any user-supplied parameters from the command line. This is also one of the reasons why you cannot assign TestDFSIO jobs (as of Hadoop 0.20.2) to custom pools or queues. ### 2. Run write tests before read tests The read test of TestDFSIO does not generate its own input files. For this reason, it is a convenient practice to first run a write test via -write and then follow-up with a read test via -read (while using the same parameters as during the previous -write run). ## Run a write test (as input data for the subsequent read test) The syntax for running a write test is as follows: TestDFSIO.0.0.4 Usage: hadoop jar$HADOOP_HOME/hadoop-*test*.jar TestDFSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes]


TestDFSIO is designed in such a way that it will use 1 map task per file, i.e. it is a 1:1 mapping from files to map tasks. Splits are defined so that each map gets only one filename, which it creates (-write) or reads (-read).

The command to run a write test that generates 10 output files of size 1GB for a total of 10GB is:

$hadoop jar hadoop-*test*.jar TestDFSIO -write -nrFiles 10 -fileSize 1000  ## Run a read test The command to run the corresponding read test using 10 input files of size 1GB is: $ hadoop jar hadoop-*test*.jar TestDFSIO -read -nrFiles 10 -fileSize 1000


## Clean up and remove test data

The command to remove previous test data is:

$hadoop jar hadoop-*test*.jar TestDFSIO -clean  The cleaning run will delete the output directory /benchmarks/TestDFSIO on HDFS. ## Interpreting TestDFSIO results Let’s have a look at this exemplary result for writing and reading 1TB of data on a cluster of twenty nodes and try to deduce its meaning: ----- TestDFSIO ----- : write Date & time: Fri Apr 08 2011 Number of files: 1000 Total MBytes processed: 1000000 Throughput mb/sec: 4.989 Average IO rate mb/sec: 5.185 IO rate std deviation: 0.960 Test exec time sec: 1113.53 ----- TestDFSIO ----- : read Date & time: Fri Apr 08 2011 Number of files: 1000 Total MBytes processed: 1000000 Throughput mb/sec: 11.349 Average IO rate mb/sec: 22.341 IO rate std deviation: 119.231 Test exec time sec: 544.842  Here, the most notable metrics are Throughput mb/sec and Average IO rate mb/sec. Both of them are based on the file size written (or read) by the individual map tasks and the elapsed time to do so (see this discussion thread for more information). Throughput mb/sec for a TestDFSIO job using N map tasks is defined as follows. The index 1 <= i <= N denotes the individual map tasks: Average IO rate mb/sec is defined as: Two derived metrics you might be interested in are estimates of the “concurrent” throughput and average IO rate (for the lack of a better term) your cluster is capable of. Imagine you let TestDFSIO create 1,000 files but your cluster has only 200 map slots. This means that it takes about five MapReduce waves (5 * 200 = 1,000) to write the full test data because the cluster can only run 200 map tasks at the same time. In this case, simply take the minimum of the number of files (here: 1,000) and the number of available map slots in your cluster (here: 200), and multiply the throughput and average IO rate by this minimum. In our example, the concurrent throughput would be estimated at 4.989 * 200 = 997.8 MB/s and the concurrent average IO rate at 5.185 * 200 = 1,037.0 MB/s. Another thing to keep in mind when interpreting TestDFSIO results is that the HDFS replication factor plays an important role. If you compare two otherwise identical TestDFSIO write runs which use an HDFS replication factor of 2 and 3, respectively, you will see higher throughput and higher average IO numbers for the run with the lower replication factor. Unfortunately, you cannot simply overwrite your cluster’s default replication value on the command line via $ hadoop jar hadoop-*test*.jar TestDFSIO -D dfs.replication=2 -write ...


because as I said above the TestDFSIO shipped with Hadoop 0.20.2 does not parse command line parameters properly. A quick workaround is to create or update the conf/hdfs-site.xml file on the machine you are running the TestDFSIO write test from. TestDFSIO will read this configuration file during startup and use the value specified for the dfs.replication property therein.

The comments in HDFS-1338 include some general remarks on the concept and design of TestDFSIO. And of course you can have a look at the source code in TestDFSIO.java.

# TeraSort benchmark suite

The TeraSort benchmark is probably the most well-known Hadoop benchmark. Back in 2008, Yahoo! set a record by sorting 1 TB of data in 209 seconds – on an Hadoop cluster of 910 nodes as Owen O’Malley of the Yahoo! Grid Computing Team reports. One year later in 2009, Yahoo! set another record by sorting a 1 PB (1’000 TB) of data in 16 hours on an even larger Hadoop cluster of 3800 nodes (it took the same cluster only 62 seconds to sort 1 TB of data, easily beating the previous year’s record!).

Basically, the goal of TeraSort is to sort 1TB of data (or any other amount of data you want) as fast as possible. It is a benchmark that combines testing the HDFS and MapReduce layers of an Hadoop cluster. As such it is not surprising that the TeraSort benchmark suite is often used in practice, which has the added benefit that it allows us – among other things – to compare the results of our own cluster with the clusters of other people. You can use the TeraSort benchmark, for instance, to iron out your Hadoop configuration after your cluster passed a convincing TestDFSIO benchmark first. Typical areas where TeraSort is helpful is to determine whether your map and reduce slot assignments are sound (as they depend on the variables such as the number of cores per TaskTracker node and the available RAM), whether other MapReduce-related parameters such as io.sort.mb and mapred.child.java.opts are set to proper values, or whether the FairScheduler configuration you came up with really behaves as expected.

A full TeraSort benchmark run consists of the following three steps:

1. Generating the input data via TeraGen.
2. Running the actual TeraSort on the input data.
3. Validating the sorted output data via TeraValidate.

You do not need to re-generate input data before every TeraSort run (step 2). So you can skip step 1 (TeraGen) for later TeraSort runs if you are satisfied with the generated data.

Figure 1 shows the basic data flow. We use the included HDFS directory names in the later examples.

Figure 1: Hadoop Benchmarking and Stress Testing: The basic data flow of the TeraSort benchmark suite

The next sections describe each of the three steps in greater detail.

## TeraGen: Generate the TeraSort input data (if needed)

TeraGen (source code) generates random data that can be conveniently used as input data for a subsequent TeraSort run.

The syntax for running TeraGen is as follows:

$hadoop jar hadoop-*examples*.jar teragen <number of 100-byte rows> <output dir>  Using the HDFS output directory /user/hduser/terasort-input as an example, the command to run TeraGen in order to generate 1TB of input data (i.e. 1,000,000,000,000 bytes) is: $ hadoop jar hadoop-*examples*.jar teragen 10000000000 /user/hduser/terasort-input


Please note that the first parameter supplied to TeraGen is 10 billion (10,000,000,000), i.e. not 1 trillion = 1 TB (1,000,000,000,000). The reason is that the first parameter specifies the number of rows of input data to generate, each of which having a size of 100 bytes.

Here is the actual TeraGen data format per row to clear things up:

<10 bytes key><10 bytes rowid><78 bytes filler>\r\n


where

1. The keys are random characters from the set ‘ ‘ .. ‘~’.
2. The rowid is the right justified row id as a int.
3. The filler consists of 7 runs of 10 characters from ‘A’ to ‘Z’.

If you have a very fast cluster, your map tasks might finish in a few seconds if you use a relatively small default HDFS block size such as 128MB (which is not a bad idea though in general). This means that the time to start/stop map tasks might be larger than the time to perform the actual task. In other words, the overhead for managing the TaskTrackers might exceed the job’s “payload”. An easy way to work around this is to increase the HDFS block size for files written by TeraGen. Keep in mind that the HDFS block size is a per-file setting, and the value specified by the dfs.block.size property in hdfs-default.xml (or conf/hdfs-site.xml if you use a custom configuration file) is just a default value. So if, for example, you want to use an HDFS block size of 512MB (i.e. 536870912 bytes) for the TeraSort benchmark suite, overwrite dfs.block.size when running TeraGen:

$hadoop jar hadoop-*examples*.jar teragen -D dfs.block.size=536870912 ...  ## TeraSort: Run the actual TeraSort benchmark TeraSort (source code) is implemented as a MapReduce sort job with a custom partitioner that uses a sorted list of n-1 sampled keys that define the key range for each reduce. The syntax to run the TeraSort benchmark is as follows: $ hadoop jar hadoop-*examples*.jar terasort <input dir> <output dir>


Using the input directory /user/hduser/terasort-input and the output directory /user/hduser/terasort-output as an example, the command to run the TeraSort benchmark is:

$hadoop jar hadoop-*examples*.jar terasort /user/hduser/terasort-input /user/hduser/terasort-output  ## TeraValidate: Validate the sorted output data of TeraSort TeraValidate (source code) ensures that the output data of TeraSort is globally sorted. TeraValidate creates one map task per file in TeraSort’s output directory. A map task ensures that each key is less than or equal to the previous one. The map task also generates records with the first and last keys of the file, and the reduce task ensures that the first key of file i is greater than the last key of file i-1. The reduce tasks do not generate any output if everything is properly sorted. If they do detect any problems, they output the keys that are out of order. The syntax to run the TeraValidate test is as follows: $ hadoop jar hadoop-*examples*.jar teravalidate <terasort output dir (= input data)> <teravalidate output dir>


Using the output directory /user/hduser/terasort-output from the previous sections and the report (output) directory /user/hduser/terasort-validate as an example, the command to run the TeraValidate test is:

$hadoop jar hadoop-*examples*.jar teravalidate /user/hduser/terasort-output /user/hduser/terasort-validate</pre>  ## Further tips and tricks Hadoop provides a very convenient way to access statistics about a job from the command line: $ hadoop job -history all <job output directory>


This command retrieves a job’s history files (two files that are by default stored in <job output directory>/_logs/history) and computes job statistics from them:

$hadoop fs -ls /user/hduser/terasort-input/_logs/history Found 2 items -rw-r--r-- 3 hadoop supergroup 17877 2011-02-11 11:58 /user/hduser/terasort-input/_logs/history/master_1297410440884_job_201102110201_0014_conf.xml -rw-r--r-- 3 hadoop supergroup 36758 2011-02-11 11:58 /user/hduser/terasort-input/_logs/history/master_1297410440884_job_201102110201_0014_hadoop_TeraGen  Note: Unfortunately, not all benchmarks/tests shipped with Hadoop write such job history log files. The TestDFSIO benchmark, for instance, does not save job history files. Here is an exemplary snippet of such job statistics for TeraGen from a small cluster: $ hadoop job -history all /user/hduser/terasort-input

=====================================
Job tracker host name: master
job tracker start time: Fri Feb 11 2011
JobName: TeraGen
Submitted At: 11-Feb-2011
Launched At: 11-Feb-2011 13:58:14 (0sec)
Finished At: 11-Feb-2011 15:00:56 (1hrs, 2mins, 41sec)
Status: SUCCESS
=====================================

============================
Kind    Total   Successful      Failed  Killed  StartTime       FinishTime

Setup   1       1               0       0       11-Feb-2011 13:58:15    11-Feb-2011 13:58:16 (1sec)
Map     24      24              0       0       11-Feb-2011 13:58:18    11-Feb-2011 15:00:47 (1hrs, 2mins, 28sec)
Reduce  0       0               0       0
Cleanup 1       1               0       0       11-Feb-2011 15:00:50    11-Feb-2011 15:00:53 (2sec)
============================

Analysis
=========

Average time taken by map tasks: 1hrs, 1mins, 24sec
[...]


# NameNode benchmark (nnbench)

NNBench (see src/test/org/apache/hadoop/hdfs/NNBench.java) is useful for load testing the NameNode hardware and configuration. It generates a lot of HDFS-related requests with normally very small “payloads” for the sole purpose of putting a high HDFS management stress on the NameNode. The benchmark can simulate requests for creating, reading, renaming and deleting files on HDFS.

I like to run this test simultaneously from several machines – e.g. from a set of DataNode boxes – in order to hit the NameNode from multiple locations at the same time.

The syntax of NNBench is as follows:

NameNode Benchmark 0.4
Usage: nnbench <options>
Options:
-operation <Available operations are create_write open_read rename delete. This option is mandatory>
* NOTE: The open_read, rename and delete operations assume that the files they operate on, are already available. The create_write operation must be run before running the other operations.
-maps <number of maps. default is 1. This is not mandatory>
-reduces <number of reduces. default is 1. This is not mandatory>
-startTime <time to start, given in seconds from the epoch. Make sure this is far enough into the future, so all maps (operations) will start at the same time>. default is launch time + 2 mins. This is not mandatory
-blockSize <Block size in bytes. default is 1. This is not mandatory>
-bytesToWrite <Bytes to write. default is 0. This is not mandatory>
-bytesPerChecksum <Bytes per checksum for the files. default is 1. This is not mandatory>
-numberOfFiles <number of files to create. default is 1. This is not mandatory>
-replicationFactorPerFile <Replication factor for the files. default is 1. This is not mandatory>
-baseDir <base DFS path. default is /becnhmarks/NNBench. This is not mandatory>
-readFileAfterOpen <true or false. if true, it reads the file and reports the average time to read. This is valid with the open_read operation. default is false. This is not mandatory>
-help: Display the help statement


The following command will run a NameNode benchmark that creates 1000 files using 12 maps and 6 reducers. It uses a custom output directory based on the machine’s short hostname. This is a simple trick to ensure that one box does not accidentally write into the same output directory of another box running NNBench at the same time.

$hadoop jar hadoop-*test*.jar nnbench -operation create_write \ -maps 12 -reduces 6 -blockSize 1 -bytesToWrite 0 -numberOfFiles 1000 \ -replicationFactorPerFile 3 -readFileAfterOpen true \ -baseDir /benchmarks/NNBench-hostname -s  Note that by default the benchmark waits 2 minutes before it actually starts! # MapReduce benchmark (mrbench) MRBench (see src/test/org/apache/hadoop/mapred/MRBench.java) loops a small job a number of times. As such it is a very complimentary benchmark to the “large-scale” TeraSort benchmark suite because MRBench checks whether small job runs are responsive and running efficiently on your cluster. It puts its focus on the MapReduce layer as its impact on the HDFS layer is very limited. This test should be run from a single box (see caveat below). The command syntax can be displayed via mrbench --help: MRBenchmark.0.0.2 Usage: mrbench [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] [-numRuns <number of times to run the job, default is 1>] [-maps <number of maps for each run, default is 2>] [-reduces <number of reduces for each run, default is 1>] [-inputLines <number of input lines to generate, default is 1>] [-inputType <type of input to generate, one of ascending (default), descending, random>] [-verbose]  Important note: In Hadoop 0.20.2, setting the -baseDir parameter has no effect. This means that multiple parallel MRBench runs (e.g. started from different boxes) might interfere with each other. This is a known bug (MAPREDUCE-2398). I have submitted a patch but it has not been integrated yet. In Hadoop 0.20.2, the parameters default to: -baseDir: /benchmarks/MRBench [*** see my note above ***] -numRuns: 1 -maps: 2 -reduces: 1 -inputLines: 1 -inputType: ascending  The command to run a loop of 50 small test jobs is: $ hadoop jar hadoop-*test*.jar mrbench -numRuns 50


Exemplary output of the above command:

DataLines       Maps    Reduces AvgTime (milliseconds)
1               2       1       31414


This means that the average finish time of executed jobs was 31 seconds.

# Summary

I hope you have found my quick overview of Hadoop’s benchmarking and testing tools useful! Feel free to provide your feedback, corrections and suggestions in the comments below.

# Related Articles

From yours truly:

From others:

• HiBench - an Hadoop benchmark suite developed by Intel
Interested in more? You can subscribe to this blog and follow me on Twitter.