Mini-Cluster Part IV : Word Count Benchmark

Benchmark ArtIn this part, we will run a simple Word Count application on the cluster using Hadoop and Spark on various platforms and cluster sizes.

We will run and benchmark the same program on 5 datasets of different sizes on :

  • A single MinnowBoard MAX, using a multi-threaded simple java application
  • A real home computer (my laptop), using the same simple java application
  • MapReduce, using a cluster of 2 to 4 slaves
  • Spark, using a cluster of 2 to 4 slaves

Using these results we will hopefully be able to answer to the original questions of this section : is a home cluster with such small computers worth it ? How many nodes does it take to be faster than a single node, or faster than a real computer ?

Contents

Word Count : Definition, Data Files and Choices

Definition

Word Count is a simple program which as its name suggests, is used to count the number of times each word is found in a text.

The input text is naturally split into different pieces, called blocks, in HDFS (in the case of Hadoop and Spark). Each block is processed line by line to count the number of words.

I have decided to use this program for the benchmark because it is a reference, a kind of “Hello World” program for both MapReduce and Spark. Apache uses this program on both framework websites in their tutorial or examples to illustrate how they work.

Datasets

To get big quantities of text without repetition, I downloaded text files from Project Guternberg. They have an URL to crawl files of a given language and file type.

To download english .txt files from Project Gutenberg in a background task, I used the following commands:

## Create a directory to store all downloaded files
$ mkdir /data/gutenberg
$ cd /data/gutenberg

## Crawl using wget in background task
$ wget -bqc -w 2 -m -H 'http://www.gutenberg.org/robot/harvest?filetypes[]=txt&langs[]=en'

## Create folder and extract all downloaded zips in it
$ mkdir extracted
$ find . -name '*.zip' -exec sh -c 'unzip -d extracted {}' ';'

## Create a data file for wordcount (Ctrl+C to stop when big enough)
$ cat extracted/*.txt > bigfile.txt

I created 5 different text files for the benchmarks:

Name File Size HDFS Blocks
tiny.txt 200 MB 2
small.txt 760 MB 6
medium.txt 2.6 GB 21
large.txt 9.6 GB 77
huge.txt 26.2 GB 210

The HDFS block size is 128 MB. The number of blocks will determine the number of mappers in MapReduce, or the number of tasks in Spark.

Choices

Word Count improvements

I have decided to bring a small modification to Word Count. In all the Word Count programs, these extra steps are applied on each line :

  1. Replacement of all characters which are not ASCII alpha-characters by a white space.
  2. Transform to lower case.
  3. After splitting line into words, ignore all words which are 50 or more characters long.

These are ethically questionable choices on many aspects (for Word Count purists) but here are the reasons why I decided to do so :

  • Adds extra computing steps, which makes it look more like a real task. Otherwise there is nothing to process and the performance will depend only on disk read speed.
  • Makes the result more readable, gets rid of all punctuation (e.g “Hello!” ≠ “,hello” ≠ “hello;“).
  • Transforming to lowercase cuts down the number of different words.
  • Getting rid of the 50+ character words removes all science amino-acid sequences and other weird stuff which are not words anyways.
  • The 3 previous points, together, enable all results to fit into the 2 GB memory of a Single MinnowBoard.
    • The single MinnowBoard will naturally loose and won’t even be able to compete when using big datasets.
    • But right now we don’t want that, we want a challenging competition.

MapReduce and Spark configurations

The configuration and execution parameters from Part III are used for running the Word Count program in both MapReduce and Spark.

Multithreaded Program for Single PC

Java Code

Here is a GitHub link to view the java class I wrote to benchmark the Word Count program for a single SBC or PC :

https://github.com/nicomak/[…]/MultithreadedWordCount.java

It uses a Producer-Consumer(s) design pattern with a configurable number of consumers. This is optimal for this kind of program, because :

  • The Producer is a thread which is responsible for reading the input file and storing lines in a buffer.
    • Having only one thread read the input is better for an HDD. As discussed in Part I, parallel access is a weakness of a HDD, because of of slow seek times.
  • The Consumers are threads which read the lines from the buffer and do the word counting.
    • The best number of consumers is variable, and depends on the CPU. This is why the number of consumers is configurable.

After the word-counting, towards the end of the main function, an alphabetical sorting of the results is made. I did this for debugging reasons, so that the results look like those of MapReduce, which naturally orders the words as a part of its process. This program gives the time it took before and after this ordering. We will ignore the sorting time and use the “before” time.

Compilation and Execution

Example of compilation and execution on the “tiny” text file, for MinnowBoard, using 4 consumer threads :

$ javac MultithreadedWordCount.java
$ java -Xmx2048m MultithreadedWordCount /data/gutenberg/tiny.txt results_tiny.txt 4

Execution starting with 4 consumer thread(s) ...
1M lines read from input. Current Queue size : 997
2M lines read from input. Current Queue size : 355
3M lines read from input. Current Queue size : 560
4M lines read from input. Current Queue size : 763
Word Counting took 69453 ms.
Now ordering results ...
Total Execution took 75707 ms.

Benchmark Results

Here are the results for a single MinnowBoard MAX (Intel Atom E3825 Dual Core with 2 GB of RAM) and my Alienware laptop (Intel Core i7-3610QM with 8 GB of RAM). After a several tries, I used the optimal settings for each one, which were :

  • On the MinnowBoard : 2 GB max heap size, running with 4 consumer threads.
  • For the Alienware : 4 GB max heap size, running with 6 consumer threads.

Using MapReduce

Java Code

Here is the modified Word Count code, inspired by the official MapReduce tutorial :

https://github.com/nicomak/[…]/MapReduceWordCount.java

Compilation and Execution

First compile the class using Hadoop’s compilation tool :

## Compile java class
$ hadoop com.sun.tools.javac.Main MapReduceWordCount.java

## Create a jar
$ jar cf wc.jar MapReduceWordCount*.class

Then, assuming that the small.txt input file is located in the hdfs:///user/hduser/small/input folder, and that you want YARN to create the output folder called “output” right next to it :

$ yarn jar wc.jar MapReduceWordCount small/input small/output

Benchmark Results

For a given dataset size, lower values mean better. Hover over the bars to get the exact values.

Using Spark

Java Code

Spark can also run Python and Scala, which are a lot readable and faster to code than Java. But for the sake of fair benchmarking, i.e for language-related performance and also to make sure that the same logic is applied (especially for regex), I will stick to java.

Here is the modified Spark class, inspired by the official Word Count example by Apache :

https://github.com/nicomak/[…]/SparkWordCount.java

Packaging

The easiest way to package a jar for Spark is to use the Maven build tool.

If you are not familiar with maven, here is a tiny tutorial on how to do it. First install maven by running :

sudo apt-get install maven

Then create the following folder with its contents:

  • spark_java_wordcount
    • pom.xml (maven project config file)
    • src/main/java (maven source code folder)
      • spark/wordcount (java package folder)
        • SparkWordCount.java

The pom.xml file should have the following contents :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>fr.ditullio.blog</groupId>
    <artifactId>spark-wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
            </plugin>
        </plugins>
    </build>
</project>

Then, while being in the project folder, run the maven package command :

hduser@ubuntu0:~/spark_java_wordcount$ mvn package

This will generate a folder called “target” which will contain many generated files, among which your packaged jar : spark-wordcount-1.0-SNAPSHOT.jar .

 

Running on YARN

To run the Spark application on YARN, on the “large” text file for example, using the created jar file :

spark-submit \
--master yarn-client \
--num-executors 4 \
--executor-cores 2 \
--executor-memory 640M \
--class spark.wordcount.SparkWordCount \
~/spark_java_wordcount/target/spark-wordcount-1.0-SNAPSHOT.jar \
ubuntu0:9000 \
user/hduser/large/input \
user/hduser/large/spark_output

The options and values that I use are explained in Part III.

Running on Standalone

To run the Spark application on Standalone on the “huge” data file for example, using the same jar file :

spark-submit \
--master spark://ubuntu0:7077 \
--class spark.wordcount.SparkWordCount \
~/spark_java_wordcount/target/spark-wordcount-1.0-SNAPSHOT.jar \
ubuntu0:9000 \
user/hduser/huge/input \
user/hduser/huge/spark_output

The options and values that I use are explained in Part III.

Benchmark Results

Spark did not turn out to be a lot faster on Standalone than on YARN. It was only a few seconds (on small datasets) to 1 minute faster on the “huge” dataset. I have included in my benchmark the YARN results for 2 to 4 slaves, and also the Standalone results, for 4 slaves only since it doesn’t change much.

For a given dataset size, lower values mean better. Hover over the bars to get the exact values.

Spark on YARN vs Spark on Standalone

In this environment and for this type of task, Spark Standalone turned out to be slightly faster than on YARN. But running Spark on top of YARN has the following advantages :

  • Manage the cluster resources together with other YARN apps.
  • Spark can only scale the number of executors dynamically when run on YARN (using the spark.dynamicAllocation.*  properties). It is not yet supported on Spark Standalone.

Summary and Conclusion

Here is a summary of all the results :

Efficiency of the Cluster

Using MapReduce :

  • We need at least 3 slaves to beat the Single MinnowBoard (starting on datasets ≥ 10 GB). But 4 slaves are needed to see some progress.
  • On the “huge” text file MapReduce, the full cluster is 44% faster than the single node.

Using Spark :

  • 2 slaves are enough to beat the single MinnowBoard.
  • 2 slaves with Spark are faster than 4 slaves with MapReduce.
  • On the “huge” text file, the full cluster is 380% faster than the single node and 266% faster than MapReduce.

I couldn’t beat the Alienware laptop. Shame on me ! But at least I got quite close to it, and my cluster is 380% faster that the original MinnowBoard I had. 🙂

Bonus : Word Count without the improvements

These results were based on my version of the WordCount application, which was adapted for devices with little RAM.

But by removing the transformations that I added (the improvements described earlier) and using the basic Word Count provided by Apache, the word dictionary doesn’t fit into memory for single PCs, which have to use swap memory. Without these improvements, the size of the result output for the “huge” dataset went from 30 MB to 500 MB, which seems to take almost 4 GB of memory in the JVM.

The MinnowBoard was so slow that I didn’t finish the benchmark (would have taken days). The Alienware became a lot slower towards the end when the heap started to be full too. Using this version of the Word Count, the cluster won with Spark.

The Alienware and MinnowBoard single PC programs had the advantage of keeping everything in memory, but we can see that their limit can be easily reached.

MapReduce and Spark are built to handle these problems, by processing fragments (blocks) of the input and by using shuffle operations instead of keeping everything in the same machine’s memory.

MapReduce on 4 slaves is still the slowest, but it certainly looks stable. And Spark rocks.

5 Comments

  1. Xavier said:

    Hey Nicolas,
    Just wanted to drop a message as you probably put lot of time in this tutorial which is really concise & well explained. Thank you for sharing all this valuable infos. Even if I am not looking to setup a hadoop/sparks cluster, I did setup something quite similar for elasticsearch but without going this far to benchmark the performance gain compared to a single machine.
    Anyways, I’ll check sometimes for new awesome articles 🙂
    Cheers,
    Xavier

    December 12, 2015
    Reply
    • Nicolas Di Tullio said:

      Hi Xavier ! Thanks a lot for the support. If you have a blog with your own distributed experiments, don’t hesitate to share your link, I’d be interested to read.

      December 12, 2015
      Reply
  2. Just a quick question: what was your HDFS block size for the Hadoop benchmarks?

    March 3, 2016
    Reply
    • Nicolas said:

      Hi Bram. I used the default block size, 128 MB.

      March 3, 2016
      Reply
  3. Roshan said:

    Hmm… The best number for the 26GB benchmark is with 4 spark slaves (ignore alienware). It appears to clock about 2200sec. So thats about 3 MB/sec per host.
    Need we say more ?
    🙂

    May 27, 2018
    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *