In this article I introduce some of the benchmarking and testing tools that are included in the Apache Hadoop distribution. Namely, we look at the benchmarks TestDFSIO, TeraSort, NNBench and MRBench. These are popular choices to benchmark and stress test an Hadoop cluster. Hence knowing how to run these tools will help you to shake out your cluster in terms of architecture, hardware and software, to measure its performance and also to share and compare your results with other people.
- Before we start
- Overview of Benchmarks and Testing Tools
- TeraSort benchmark suite
- NameNode benchmark (nnbench)
- MapReduce benchmark (mrbench)
- Related Articles
Before we start
Let me first talk about a few things that you should be aware of while reading through this article.
What we (not) want to do
The purpose of this article is to give you a quick overview and walkthrough of some of Hadoop’s most popular benchmarking and testing tools. All the tools described in the following sections are part of the Apache Hadoop distribution, so they should already be available in your cluster and waiting to be (ab)used! I do not focus on the general concepts and best practices of benchmarking an Hadoop cluster in this article but rather want to get you up to speed with actually using these tools. My motivation comes from my own – sometimes frustrating – experience with using these tools, as they often lack clear documentation.
That said, at any time feel free to contribute back to the Hadoop project by helping to improve the status of the documentation. I know I’ll try to do my fair share.
If you want to follow along the examples in this article, you obviously need access to a running cluster. In the case that you are still trying to install and configure your cluster, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.
- Running Hadoop On Ubuntu Linux (Single-Node Cluster) How to set up a single-node Hadoop cluster using the Hadoop Distributed File System (HDFS) on Ubuntu Linux
- Running Hadoop On Ubuntu Linux (Multi-Node Cluster) How to set up a multi-node Hadoop cluster using the Hadoop Distributed File System (HDFS) on Ubuntu Linux
Version focus: Hadoop 0.20.2
I put the focus on the benchmark and testing tools shipped with Hadoop version 0.20.2. At this moment this is the latest production-ready release Hadoop (0.21 is not!). Until Hadoop 0.22 is out, a lot of Hadoop users (including me) are therefore sticking to the tested and true Hadoop 0.20.2 release.
NotReplicatedYetException and AlreadyBeingCreatedException errors
If your cluster is running the stock version of Hadoop 0.20.2 release, you might run into
AlreadyBeingCreatedException “errors” while test-driving your cluster (as you can see in the output below that they are actually logged as
2011-02-11 03:48:05,367 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 54310, call addBlock(/user/hduser/terasort-input/_temporary/_attempt_201102110201_0008_m_000020_0/part-00020, DFSClient_attempt_201102110201_0008_m_000020_0) from 192.168.0.2:45133: error: org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException: Not replicated yet:/user/hduser/terasort-input/_temporary/_attempt_201102110201_0008_m_000020_0/part-00020 org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException: Not replicated yet:/user/hduser/terasort-input/_temporary/_attempt_201102110201_0008_m_000020_0/part-00020 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1257) at org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
This often happens when you delete a large amount of HDFS data on the cluster – and this is something that you might want to do in between test runs so that your cluster does not fill up. This is a known problem in 0.20.2 and fixed in 0.21.0 (see HDFS-611). Fortunately, there is a patch available:
HDFS-611.branch-20.v6.patch (link) is the file you need for Hadoop 0.20.2.
Describing how to patch Hadoop and rebuild it from source is unfortunately beyond the scope of this article. Sorry! If you actually happen to run into this problem, please have a look at the Hadoop documentation (e.g. the FAQ, How to Contribute, Working with Hadoop under Eclipse or Git and Hadoop are a very good start) and/or consult the Hadoop mailing lists. You can also check out my article Building an Hadoop 0.20.x version for HBase 0.90.2, which should jumpstart you with creating your own custom Hadoop build.
Another workaround is to simply wait some minutes after deleting a large amount of data before starting another test run. You might want to monitor the NameNode’s log file to check when the asynchronous background delete processes have finished their work.
Overview of Benchmarks and Testing Tools
The Hadoop distribution comes with a number of benchmarks, which are bundled in
hadoop-*examples*.jar. The four benchmarks we will be looking at in more details are
Here is the full list of available options in
# change to your Hadoop installation directory; # if you followed my Hadoop tutorials, this is /usr/local/hadoop $ cd /usr/local/hadoop $ bin/hadoop jar hadoop-*test*.jar An example program must be given as the first argument. Valid program names are: DFSCIOTest: Distributed i/o benchmark of libhdfs. DistributedFSCheck: Distributed checkup of the file system consistency. MRReliabilityTest: A program that tests the reliability of the MR framework by injecting faults/failures TestDFSIO: Distributed i/o benchmark. dfsthroughput: measure hdfs throughput filebench: Benchmark SequenceFile(Input|Output)Format (block,record compressed and uncompressed), Text(Input|Output)Format (compressed and uncompressed) loadgen: Generic map/reduce load generator mapredtest: A map/reduce test check. mrbench: A map/reduce benchmark that can create many small jobs nnbench: A benchmark that stresses the namenode. testarrayfile: A test for flat files of binary key/value pairs. testbigmapoutput: A map/reduce program that works on a very big non-splittable file and does identity map/reduce testfilesystem: A test for FileSystem read/write. testipc: A test for ipc. testmapredsort: A map/reduce program that validates the map-reduce framework's sort. testrpc: A test for rpc. testsequencefile: A test for flat files of binary key value pairs. testsequencefileinputformat: A test for sequence file input format. testsetfile: A test for flat files of binary key/value pairs. testtextinputformat: A test for text input format. threadedmapbench: A map/reduce benchmark that compares the performance of maps with multiple spills over maps with 1 spill
And here is the full list of available options for
$ bin/hadoop jar hadoop-*examples*.jar An example program must be given as the first argument. Valid program names are: aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. dbcount: An example job that count the pageview counts from a database. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that effects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomino: A map/reduce tile laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using monte-carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sleep: A job that sleeps at each map and reduce task. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files.
And before we start, here’s a nifty trick for your tests: When running the benchmarks described in the following sections, you might want to use the Unix
time command to measure the elapsed time. This saves you the hassle of navigating to the Hadoop JobTracker web interface to get the (almost) same information. Simply prefix every Hadoop command with
$ time hadoop jar hadoop-*examples*.jar ... [...] real 9m15.510s user 0m7.075s sys 0m0.584s
The relevant metric is the
real value in the first row.
The TestDFSIO benchmark is a read and write test for HDFS. It is helpful for tasks such as stress testing HDFS, to discover performance bottlenecks in your network, to shake out the hardware, OS and Hadoop setup of your cluster machines (particularly the NameNode and the DataNodes) and to give you a first impression of how fast your cluster is in terms of I/O.
An official documentation does not seem to exist in Hadoop 0.20.2, so the only way to understand the test in greater detail is to inspect the source code found in
1. The default output directory is /benchmarks/TestDFSIO
When a write test is run via
-write, the TestDFSIO benchmark writes its files to
/benchmarks/TestDFSIO on HDFS. Files from older write runs are overwritten. If you want to preserve the output files of previous runs, you have to copy/move these files manually to a new HDFS location. Benchmark results are saved in a local file called
TestDFSIO_results.log in the current local directory (results are appended if the file already exists) and also printed to STDOUT. If you want to use a different filename, set the -resFile parameter appropriately (e.g.
2. Run write tests before read tests
The read test of TestDFSIO does not generate its own input files. For this reason, it is a convenient practice to first run a write test via
-write and then follow-up with a read test via
-read (while using the same parameters as during the previous
Run a write test (as input data for the subsequent read test)
The syntax for running a write test is as follows:
TestDFSIO.0.0.4 Usage: hadoop jar $HADOOP_HOME/hadoop-*test*.jar TestDFSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes]
TestDFSIO is designed in such a way that it will use 1 map task per file, i.e. it is a 1:1 mapping from files to map tasks. Splits are defined so that each map gets only one filename, which it creates (
-write) or reads (
The command to run a write test that generates 10 output files of size 1GB for a total of 10GB is:
$ hadoop jar hadoop-*test*.jar TestDFSIO -write -nrFiles 10 -fileSize 1000
Run a read test
The command to run the corresponding read test using 10 input files of size 1GB is:
$ hadoop jar hadoop-*test*.jar TestDFSIO -read -nrFiles 10 -fileSize 1000
Clean up and remove test data
The command to remove previous test data is:
$ hadoop jar hadoop-*test*.jar TestDFSIO -clean
The cleaning run will delete the output directory
/benchmarks/TestDFSIO on HDFS.
Interpreting TestDFSIO results
Let’s have a look at this exemplary result for writing and reading 1TB of data on a cluster of twenty nodes and try to deduce its meaning:
----- TestDFSIO ----- : write Date & time: Fri Apr 08 2011 Number of files: 1000 Total MBytes processed: 1000000 Throughput mb/sec: 4.989 Average IO rate mb/sec: 5.185 IO rate std deviation: 0.960 Test exec time sec: 1113.53 ----- TestDFSIO ----- : read Date & time: Fri Apr 08 2011 Number of files: 1000 Total MBytes processed: 1000000 Throughput mb/sec: 11.349 Average IO rate mb/sec: 22.341 IO rate std deviation: 119.231 Test exec time sec: 544.842
Here, the most notable metrics are Throughput mb/sec and Average IO rate mb/sec. Both of them are based on the file size written (or read) by the individual map tasks and the elapsed time to do so (see this discussion thread for more information).
Throughput mb/sec for a TestDFSIO job using
N map tasks is defined as follows. The index
1 <= i <= N denotes the individual map tasks:
Average IO rate mb/sec is defined as:
Two derived metrics you might be interested in are estimates of the “concurrent” throughput and average IO rate (for the lack of a better term) your cluster is capable of. Imagine you let TestDFSIO create 1,000 files but your cluster has only 200 map slots. This means that it takes about five MapReduce waves (
5 * 200 = 1,000) to write the full test data because the cluster can only run 200 map tasks at the same time. In this case, simply take the minimum of the number of files (here:
1,000) and the number of available map slots in your cluster (here:
200), and multiply the throughput and average IO rate by this minimum. In our example, the concurrent throughput would be estimated at
4.989 * 200 = 997.8 MB/s and the concurrent average IO rate at
5.185 * 200 = 1,037.0 MB/s.
Another thing to keep in mind when interpreting TestDFSIO results is that the HDFS replication factor plays an important role. If you compare two otherwise identical TestDFSIO write runs which use an HDFS replication factor of 2 and 3, respectively, you will see higher throughput and higher average IO numbers for the run with the lower replication factor. Unfortunately, you cannot simply overwrite your cluster’s default replication value on the command line via
$ hadoop jar hadoop-*test*.jar TestDFSIO -D dfs.replication=2 -write ...
because as I said above the TestDFSIO shipped with Hadoop 0.20.2 does not parse command line parameters properly. A quick workaround is to create or update the
conf/hdfs-site.xml file on the machine you are running the TestDFSIO write test from. TestDFSIO will read this configuration file during startup and use the value specified for the
dfs.replication property therein.
More information about TestDFSIO
The comments in HDFS-1338 include some general remarks on the concept and design of TestDFSIO. And of course you can have a look at the source code in
TeraSort benchmark suite
The TeraSort benchmark is probably the most well-known Hadoop benchmark. Back in 2008, Yahoo! set a record by sorting 1 TB of data in 209 seconds – on an Hadoop cluster of 910 nodes as Owen O’Malley of the Yahoo! Grid Computing Team reports. One year later in 2009, Yahoo! set another record by sorting a 1 PB (1’000 TB) of data in 16 hours on an even larger Hadoop cluster of 3800 nodes (it took the same cluster only 62 seconds to sort 1 TB of data, easily beating the previous year’s record!).
Basically, the goal of TeraSort is to sort 1TB of data (or any other amount of data you want) as fast as possible. It is a benchmark that combines testing the HDFS and MapReduce layers of an Hadoop cluster. As such it is not surprising that the TeraSort benchmark suite is often used in practice, which has the added benefit that it allows us – among other things – to compare the results of our own cluster with the clusters of other people. You can use the TeraSort benchmark, for instance, to iron out your Hadoop configuration after your cluster passed a convincing TestDFSIO benchmark first. Typical areas where TeraSort is helpful is to determine whether your map and reduce slot assignments are sound (as they depend on the variables such as the number of cores per TaskTracker node and the available RAM), whether other MapReduce-related parameters such as
mapred.child.java.opts are set to proper values, or whether the FairScheduler configuration you came up with really behaves as expected.
A full TeraSort benchmark run consists of the following three steps:
- Generating the input data via
- Running the actual
TeraSorton the input data.
- Validating the sorted output data via
You do not need to re-generate input data before every
TeraSort run (step 2). So you can skip step 1 (TeraGen) for later TeraSort runs if you are satisfied with the generated data.
Figure 1 shows the basic data flow. We use the included HDFS directory names in the later examples.
The next sections describe each of the three steps in greater detail.
TeraGen: Generate the TeraSort input data (if needed)
TeraGen (source code) generates random data that can be conveniently used as input data for a subsequent TeraSort run.
The syntax for running TeraGen is as follows:
$ hadoop jar hadoop-*examples*.jar teragen <number of 100-byte rows> <output dir>
Using the HDFS output directory
/user/hduser/terasort-input as an example, the command to run TeraGen in order to generate 1TB of input data (i.e. 1,000,000,000,000 bytes) is:
$ hadoop jar hadoop-*examples*.jar teragen 10000000000 /user/hduser/terasort-input
Please note that the first parameter supplied to TeraGen is 10 billion (10,000,000,000), i.e. not 1 trillion = 1 TB (1,000,000,000,000). The reason is that the first parameter specifies the number of rows of input data to generate, each of which having a size of 100 bytes.
Here is the actual TeraGen data format per row to clear things up:
<10 bytes key><10 bytes rowid><78 bytes filler>\r\n
keysare random characters from the set ‘ ‘ .. ‘~’.
rowidis the right justified row id as a int.
fillerconsists of 7 runs of 10 characters from ‘A’ to ‘Z’.
If you have a very fast cluster, your map tasks might finish in a few seconds if you use a relatively small default HDFS block size such as 128MB (which is not a bad idea though in general). This means that the time to start/stop map tasks might be larger than the time to perform the actual task. In other words, the overhead for managing the TaskTrackers might exceed the job’s “payload”. An easy way to work around this is to increase the HDFS block size for files written by TeraGen. Keep in mind that the HDFS block size is a per-file setting, and the value specified by the
dfs.block.size property in
conf/hdfs-site.xml if you use a custom configuration file) is just a default value. So if, for example, you want to use an HDFS block size of 512MB (i.e. 536870912 bytes) for the TeraSort benchmark suite, overwrite
dfs.block.size when running TeraGen:
$ hadoop jar hadoop-*examples*.jar teragen -D dfs.block.size=536870912 ...
TeraSort: Run the actual TeraSort benchmark
TeraSort (source code) is implemented as a MapReduce sort job with a custom partitioner that uses a sorted list of n-1 sampled keys that define the key range for each reduce.
The syntax to run the TeraSort benchmark is as follows:
$ hadoop jar hadoop-*examples*.jar terasort <input dir> <output dir>
Using the input directory
/user/hduser/terasort-input and the output directory
/user/hduser/terasort-output as an example, the command to run the TeraSort benchmark is:
$ hadoop jar hadoop-*examples*.jar terasort /user/hduser/terasort-input /user/hduser/terasort-output
TeraValidate: Validate the sorted output data of TeraSort
TeraValidate (source code) ensures that the output data of
TeraSort is globally sorted.
TeraValidate creates one map task per file in TeraSort’s output directory. A map task ensures that each key is less than or equal to the previous one. The map task also generates records with the first and last keys of the file, and the reduce task ensures that the first key of file i is greater than the last key of file i-1. The reduce tasks do not generate any output if everything is properly sorted. If they do detect any problems, they output the keys that are out of order.
The syntax to run the TeraValidate test is as follows:
$ hadoop jar hadoop-*examples*.jar teravalidate <terasort output dir (= input data)> <teravalidate output dir>
Using the output directory
/user/hduser/terasort-output from the previous sections and the report (output) directory
/user/hduser/terasort-validate as an example, the command to run the TeraValidate test is:
$ hadoop jar hadoop-*examples*.jar teravalidate /user/hduser/terasort-output /user/hduser/terasort-validate</pre>
Further tips and tricks
Hadoop provides a very convenient way to access statistics about a job from the command line:
$ hadoop job -history all <job output directory>
This command retrieves a job’s history files (two files that are by default stored in
<job output directory>/_logs/history) and computes job statistics from them:
$ hadoop fs -ls /user/hduser/terasort-input/_logs/history Found 2 items -rw-r--r-- 3 hadoop supergroup 17877 2011-02-11 11:58 /user/hduser/terasort-input/_logs/history/master_1297410440884_job_201102110201_0014_conf.xml -rw-r--r-- 3 hadoop supergroup 36758 2011-02-11 11:58 /user/hduser/terasort-input/_logs/history/master_1297410440884_job_201102110201_0014_hadoop_TeraGen
Here is an exemplary snippet of such job statistics for
TeraGen from a small cluster:
$ hadoop job -history all /user/hduser/terasort-input Hadoop job: job_201102110201_0014 ===================================== Job tracker host name: master job tracker start time: Fri Feb 11 2011 User: hadoop JobName: TeraGen JobConf: hdfs://master:54310/app/hadoop/tmp/mapred/system/job_201102110201_0014/job.xml Submitted At: 11-Feb-2011 Launched At: 11-Feb-2011 13:58:14 (0sec) Finished At: 11-Feb-2011 15:00:56 (1hrs, 2mins, 41sec) Status: SUCCESS ===================================== Task Summary ============================ Kind Total Successful Failed Killed StartTime FinishTime Setup 1 1 0 0 11-Feb-2011 13:58:15 11-Feb-2011 13:58:16 (1sec) Map 24 24 0 0 11-Feb-2011 13:58:18 11-Feb-2011 15:00:47 (1hrs, 2mins, 28sec) Reduce 0 0 0 0 Cleanup 1 1 0 0 11-Feb-2011 15:00:50 11-Feb-2011 15:00:53 (2sec) ============================ Analysis ========= Time taken by best performing map task task_201102110201_0014_m_000006: 59mins, 5sec Average time taken by map tasks: 1hrs, 1mins, 24sec Worse performing map tasks: TaskId Timetaken task_201102110201_0014_m_000004 1hrs, 2mins, 24sec task_201102110201_0014_m_000020 1hrs, 2mins, 19sec task_201102110201_0014_m_000013 1hrs, 2mins, 9sec [...]
NameNode benchmark (nnbench)
src/test/org/apache/hadoop/hdfs/NNBench.java) is useful for load testing the NameNode hardware and configuration. It generates a lot of HDFS-related requests with normally very small “payloads” for the sole purpose of putting a high HDFS management stress on the NameNode. The benchmark can simulate requests for creating, reading, renaming and deleting files on HDFS.
I like to run this test simultaneously from several machines – e.g. from a set of DataNode boxes – in order to hit the NameNode from multiple locations at the same time.
The syntax of NNBench is as follows:
NameNode Benchmark 0.4 Usage: nnbench <options> Options: -operation <Available operations are create_write open_read rename delete. This option is mandatory> * NOTE: The open_read, rename and delete operations assume that the files they operate on, are already available. The create_write operation must be run before running the other operations. -maps <number of maps. default is 1. This is not mandatory> -reduces <number of reduces. default is 1. This is not mandatory> -startTime <time to start, given in seconds from the epoch. Make sure this is far enough into the future, so all maps (operations) will start at the same time>. default is launch time + 2 mins. This is not mandatory -blockSize <Block size in bytes. default is 1. This is not mandatory> -bytesToWrite <Bytes to write. default is 0. This is not mandatory> -bytesPerChecksum <Bytes per checksum for the files. default is 1. This is not mandatory> -numberOfFiles <number of files to create. default is 1. This is not mandatory> -replicationFactorPerFile <Replication factor for the files. default is 1. This is not mandatory> -baseDir <base DFS path. default is /becnhmarks/NNBench. This is not mandatory> -readFileAfterOpen <true or false. if true, it reads the file and reports the average time to read. This is valid with the open_read operation. default is false. This is not mandatory> -help: Display the help statement
The following command will run a NameNode benchmark that creates 1000 files using 12 maps and 6 reducers. It uses a custom output directory based on the machine’s short hostname. This is a simple trick to ensure that one box does not accidentally write into the same output directory of another box running NNBench at the same time.
$ hadoop jar hadoop-*test*.jar nnbench -operation create_write \ -maps 12 -reduces 6 -blockSize 1 -bytesToWrite 0 -numberOfFiles 1000 \ -replicationFactorPerFile 3 -readFileAfterOpen true \ -baseDir /benchmarks/NNBench-`hostname -s`
Note that by default the benchmark waits 2 minutes before it actually starts!
MapReduce benchmark (mrbench)
src/test/org/apache/hadoop/mapred/MRBench.java) loops a small job a number of times. As such it is a very complimentary benchmark to the “large-scale” TeraSort benchmark suite because MRBench checks whether small job runs are responsive and running efficiently on your cluster. It puts its focus on the MapReduce layer as its impact on the HDFS layer is very limited.
This test should be run from a single box (see caveat below). The command syntax can be displayed via
MRBenchmark.0.0.2 Usage: mrbench [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] [-numRuns <number of times to run the job, default is 1>] [-maps <number of maps for each run, default is 2>] [-reduces <number of reduces for each run, default is 1>] [-inputLines <number of input lines to generate, default is 1>] [-inputType <type of input to generate, one of ascending (default), descending, random>] [-verbose]
In Hadoop 0.20.2, the parameters default to:
-baseDir: /benchmarks/MRBench [*** see my note above ***] -numRuns: 1 -maps: 2 -reduces: 1 -inputLines: 1 -inputType: ascending
The command to run a loop of 50 small test jobs is:
$ hadoop jar hadoop-*test*.jar mrbench -numRuns 50
Exemplary output of the above command:
DataLines Maps Reduces AvgTime (milliseconds) 1 2 1 31414
This means that the average finish time of executed jobs was 31 seconds.
I hope you have found my quick overview of Hadoop’s benchmarking and testing tools useful! Feel free to provide your feedback, corrections and suggestions in the comments below.
From yours truly:
- Running Hadoop On Ubuntu Linux (Single-Node Cluster)
- Running Hadoop On Ubuntu Linux (Multi-Node Cluster)
- Writing An Hadoop MapReduce Program In Python
- HiBench - an Hadoop benchmark suite developed by Intel