Hadoop Basics IV: Total Order Sorting in MapReduce

We saw in the previous part that when using multiple reducers, each reducer receives (key,value) pairs assigned to them by the Partitioner. When a reducer receives those pairs they are sorted by key, so generally the output of a reducer is also sorted by key. However, the outputs of different reducers are not ordered between each other, so they cannot be concatenated or read sequentially in the correct order.

For example with 2 reducers, sorting on simple Text keys, you can have :
– Reducer 1 output : (a,5), (d,6), (w,5)
– Reducer 2 output : (b,2), (c,5), (e,7)
The keys are only sorted if you look at each output individually, but if you read one after the other, the ordering is broken.

The objective of Total Order Sorting is to have all outputs sorted across all reducers :
– Reducer 1 output : (a,5), (b,2), (c,5)
– Reducer 2 output : (d,6), (e,7), (w,5)
This way the outputs can be read/searched/concatenated sequentially as a single ordered output.

In this post we will first see how to create a manual total order sorting with a custom partitioner. Then we will learn to use Hadoop’s TotalOrderPartitioner to automatically create partitions on simple type keys. Finally we will see a more advanced technique to use our Secondary Sort’s Composite Key (from the previous post) with this partitioner to achieve “Total Secondary Sorting“.

Manual partitioning

In the previous part, our secondary sort job (view) used the NaturalKeyPartitioner to assign map output pairs to reducers based on the natural key (“state” field) hashcode.

One way to achieve total order sorting is to implement the getPartition() method of the Partitioner to manually assign keys by intervals to each reducer. For example, if we are going to use 3 reducers, we can try to have an equal distribution by partitioning keys this way :

  • Reducer 0 : state names starting with A to I (includes 9 letters)
  • Reducer 1 : state names starting with J to Q (includes 8 letters)
  • Reducer 2 : state names starting with R to Z (includes 9 letters)

To do this, you can simply replace the NaturalKeyPartitioner by :

Let’s execute this job again, using 3 reducers and our new custom partitioner :

This yields a correct fully sorted 3-file result. If we look at the beginning and end of each output file, we can see that each state belongs to only one reducer. And we can also see a continuation in the ordering of states across all output files. This is a successful Total Order Sorting.

However, there are a couple of inconveniences using this technique :

  • It was manually coded. If we want to use 4 reducers instead, we need to code a new partitioner.
  • The load distribution is not equal between reducers. The first output file is nearly 3 times bigger than the last output file. We tried our best to divide the 26 letters of the alphabet, but the reality is that state names are not evenly distributed across the alphabet 🙁

To overcome these problems, there is a powerful but complicated partitioner in Hadoop called the TotalOrderPartitioner.

The TotalOrderPartitioner

Basically, the TotalOrderOrderPartitioner does the same thing as our custom class, but dynamically and with correct load balancing between reducers. To do that, it samples the input data to pre-calculate how to “separate” the input data into equal parts before the Map phase starts. It then uses those “separations” as partitioning boundaries during the mapper’s partitioning phase.

Simple Dataset for our Example

Let’s first create a more simple dataset to explain and test how the Total Order Partitioner works.

The GenerateListOfStateCity.java (view) MapReduce job will simply print out text pairs of (state,city) from the “donations” sequence file, in a random order :

TotalOrderPartitioner Workflow

Using the simple dataset we just generated, here is an illustration of how the TotalOrderPartitioner can help us sort the input by state across multiple reducers :

Total Order Sort example using TotalOrderPartitioner
Example of Total Order Sorting using the TotalOrderPartitioner

Here are some important details on what happens throughout the program :

  1. An InputSampler samples keys across all input splits, and sorts them using the job’s Sort Comparator. Different input sampler implementations are available in the Hadoop library :
    • RandomSampler : samples randomly based on a given frequency. This is the one we use in this example.
    • IntervalSampler : takes samples at regular intervals (for example every 5 records).
    • SplitSampler : take the first n samples from each split.
  2. The input sampler writes a “partition file” Sequence File in HDFS, which delimits the different partition boundaries based on the sorted samples.
    • For n reducers there are n-1 boundaries written to this file. In this example there are 3 reducers, so 2 boundaries are created : “MD” and “PA”.
  3. The MapReduce job begins with the mapper tasks. For partitioning, the mappers use the TotalOrderPartitioner, which will read the partition file from HDFS to obtain partition boundaries. Each map output is then stored in a partition based on these boundaries.
  4. After the shuffle, each reducer has fetched a sorted partition of (key,value) pairs from each mapper. At this point, all keys in reducer 2 are alphabetically greater than all keys in reducer 1, which are greater than all keys in reducer 0. Each reducer merges their sorted partitions (using a sorted merge-queue) and writes their output to HDFS.

Java Code for the Example

Here is the code, for this example job, which you can also view here on GitHub :

This is a very simple job with a minimal use of the TotalOrderPartitioner. We don’t need to create specific Mapper and Reducer classes because we simply sort the input without changing any values or format. The default Mapper and Reducer classes will do. Some things to notice here :

  • We use a RandomSampler with the following parameters :
    • freq = 0.01 : The sampler will run through all records and each record will have a 0.01 (=1%) probability of being chosen as a sample.
    • numSamples = 1000 : There will be a maximum of 1000 samples. If the maximum number of samples is reached, each new chosen record will replace an existing sample.
    • maxSplitsSampled = 100 : Only 100 splits will be sampled. In our case there are only 2 splits so this maximum won’t be applied.
  • The InputSampler.writePartitionFile() command is a blocking call. It will use the following configurations, which must to be specified beforehand :
    • The job’s input paths : as inputs for the sampling.
    • The number of reduce tasks : to write the “partition file” sequence file containing partition boundaries.
    • The map output key class : to annoy us !! Well maybe i’m exaggerating a bit. More about this major plot point later on …

Execution and Analysis

This time, when we execute the job, we have a log telling us how many samples will be taken.

The partition file which we specified, donors/states_partitions.seqfile, is not removed from HDFS after the job, so we can check out what’s inside. Well actually it’s compressed so it doesn’t look nice … But what we can see (or guess …) from top to bottom is :

  • SEQ : start of the Sequence File header
  • The <K,V> types, which are <Text,NullWritable>
  • The compression codec class
  • 2 things which look like US states ! Between some funny characters and unicode points we can identify “FL” and “NY”.

By visualizing outputs, we can see that the second outputs starts with FL state ! And the third output starts with NY.

The cities are not sorted, of course, because we have no control on how the values enter the reduce function, since we did not use Secondary Sort. But we succeeded in sorting by state across all reducers, and the output file are quite balanced in size, compared to our manual partitioning in the previous section. We can also change the number of reduce tasks in the job configuration from 3 to 4 or 10 without worrying about the partitioner.

Limitations and Annoyances

As said earlier in the example code remarks, the map output key class needs to be specified. And it must correspond to the Sampler<K,V> key class. If you try to remove the setMapOutputKeyClass(Text.class) command, or use it later after the call to  InputSampler.writePartitionFile(), you will end up with this error :

This raises a few questions :

  1. Why do we specify a V value class for the sampler ? It stored <Text, NullWritable> pairs in the partition file that we viewed, so apparently it doesn’t need to read values from the input.
    • Probably because it was easier to implement, since everything is <K,V> parameterized in MapReduce. Especially for reading/writing Sequence Files.
  2. Why should I specify a map key class ? The Sampler’s generic types are already supposed to reflect the input types …
    • I’m not really sure. I believe that it would be better if the types were independent. This is somewhat linked to the next question.
  3. Why is it a map OUTPUT key class ? The InputSampler reads input splits, which are the INPUTs to the mappers, not the OUTPUTs ! right ?
    • This looks like a way to “encourage” us that we will use the same key types between the map input and output. This kinda makes sense, because sampling is done on the map input data, and partitioning is done on the map outputs. And the partitioner uses boundaries which are actually keys found during sampling, so they should be of the same type and same distribution.

To illustrate the logic of question 3 : take the example job we just ran. The partition file contains the boundaries “FL” and “NY”. Imagine we use a custom mapper which replaces (state,city) by (state.hashcode,city). So for example (‘IL’, ‘Chicago’) becomes (20918, ‘Chicago’), which is no longer of type <Text,Text>, but <IntWritable,Text>. How would you classify 20918 compared to “FL” and “NY” for partitioning ? That would make no sense and would surely break a type or cast error. With a less dramatic scenario, imagine your mapper doesn’t transform the input pairs, but discards all pairs where the state is smaller than “J”. There would be no error, but you would loose half of your records, and your first reducer would have absolutely no output, because it is supposed to sort states before “FL”.

From these observations we can assume that :

The InputSampler expects us to keep the same keys between the input and the output of the map function. This way the sampled data is guaranteed to reflect the map output data, by having the same data type and distribution.

Using a different input for sampling

The previous assumption is very reasonable. But sometimes we can’t use the same input data for sampling and partitioning.

Objective

The “donations” dataset we created in part I and used in part II is a Sequence File with types <Text, DonationWritable>. The keys are the unique IDs of each record. Let’s say we want to perform Total Order Sorting, to sort all records by state.

We can’t just use the InputSampler like we did in the previous section, because here the keys are IDs, and we need to sample some states if we need to partition on state name. But what we can do is :

  • Use the file from the previous section, which contained all <state,city> pairs for sampling.
  • In the mapper, transform the input (id,donation) to (state,donation) so that the output can be partitioned correctly using the partition file.

MapReduce Solution

I coded the solution to this problem in a class called  TotalSortByState which you can view here on GitHub.

To make things clear, we create 2 jobs in the driver code : one for the sampling, and one for the actual MapReduce job.

  1. The  samplingJob (verbose name “Sampling”) is dedicated to sampling its own input file.
    • We call FileInputFormat.setInputPaths() and pass it the sampling file’s path.
    • It uses a KeyValueTextInputFormat because its input file is a text file consisting of (state,city) pairs.
    • We set its Map output key class to Text, because we have to, as explained in the previous section. The funny thing is, this job doesn’t even have a map phase. So this has become a simple formality.
  2. The  mrJob (verbose name “Total Sorting MR Job”) is the real MapReduce job. It will sort data and use the TotalOrderParitioner, whose partition file was generated during the first job.
    • We have to call  FileInputFormat.setInputPaths() again and pass it the dataset’s file path.
    • This time we use the SequenceFileInputFormat because the input file is a Sequence File.

Two values must obviously be shared between both jobs for this to work : the number of reducers, and the partition file destination. Otherwise the job will crash.

Execution and Analysis

We successfully used samples from the unordered “state-city” text file to partition our dataset on keys which were created in the map function.

We first said that the InputSampler wants us to keep the same keys between the input and the output of the map function to stay coherent between sampling and partitioning. This is what we did by using the default identity mapper in the previous section. This time we used a different file just for the InputSampler. But we still maintained coherence between the sampling and partitioning phases, because in the MR job we transformed our map input into an output which was the same data as the sampling file.

Total Secondary Sort by Composite Key

Now what if we want to do Total Order Sorting on our “donations” sequence file of <Text, DonationWritable> entries, using the same ordering as in the previous post ?

A first solution would be to use the same technique as in the previous section. We could create a sequence file of <CompositeKey, NullWritable> pairs for each record of our <Text, DonationWritable> sequence file, and then use it as a separate file for sampling. But it would be boring to do the same thing again. Luckily, I’ve found a workaround to do this directly on the dataset file.

Objective

We have the following contraints on our Total Order Sorting by Composite Key :

  • We must maintain data type and coherence between the sampling and partitioning.
  • The TotalOrderPartitioner uses the map output keys to calculate partitioning.
  • Our map output keys are of type CompositeKey, because we are doing Secondary Sorting.

For these premises we can deduce that … we have no choice but to sample CompositeKeys ! It looks like the only way this is going to work.

But how ? Our InputSampler takes the “donors” sequence file as input, which is of type <Text, DonationWritable>, doesn’t it ? Well no, it doesn’t ! It takes the InputFormat as input. Muhahhaha ! We can do a small hack by modifying the InputFormat to extract and return CompositeKey keys on the fly instead of Text keys.

In fact, our custom InputFormat will use a custom RecordReader, which will randomly read DonationWritable objects from the sequence file, and extract CompositeKey objects from them.

Here is a little illustration of what we will do :

Total Order Sorting - Sampling with a custom Record Reader
Sampling with a custom Record Reader

Optimization

Normally, the InputSampler reads the entire file and picks out random samples. But in our case this would be very costly, because we would have to read all 5 million DonationWritable objects (which contain over 20 fields) from the sequence file and create a new CompositeKey object each time. And most of these objects would be wasted, because the InputSampler would only pick a fraction of them.

That is why I decided to push down the sampling task to a lower level, during the record reading. This way, only randomly chosen records are subject to CompositeKey extraction.

So our custom RecordReader takes care of sampling before the InputSampler. But we still need to use an InputSampler to create the partitions file for the TotalOrderPartitioner. And if we used a RandomSampler, like in the previous section, it would pick random samples from our pre-sampled input data, which is a bit useless. So this time we will use a SplitSampler instead, to take all of the data coming from the InputFormat.

Also, since our data, the “donations” dataset, is unordered and seems to have a random distribution on the “state” field, we will limit the number of entries that will be read for sampling. We can do this because we will implement our own RecordReader. Taking samples from 500k records will be about two times faster than from 1 million records, and the samples should have a very similar value distribution in both cases.

MapReduce Solution

The MapReduce job code is here : https://github.com/nicomak/[…]/TotalSortByCompositeKey.java

The code is very similar to the job from the previous section, except that we now use the same input for sampling and map-reduce. The big difference with the previous job is not in the driver code, but in the custom InputFormat and RecordReader we implement.

The Custom InputFormat

The custom InputFormat class used here is CompositeKeySamplingInputFormat (view).

It extends the SequenceFileInputFormat with generic types <CompositeKey, NullWritable> so that the “hack” described above can work.

Its main purpose is to manage configuration properties and use these values to create the custom RecordReader. The configurable properties are :

  • nbSamples : number of samples to take.
  • readLimit : number of records (starting from the beginning) to take samples from.

The Custom RecordReader

The custom RecordReader class is  : CompositeKeySamplingRecordReader (view).

This is not a real implementation of a RecordReader. It is a fake record reader which encapsulates a real record reader, a SequenceFileRecordReader. It goes through all records from the sequence file reader, and does Reservoir Sampling to take a fixed number of samples  nbSamples from the first readLimit records. It stores these samples as CompositeKey objects in an ArrayList and then serves them with the nextKeyValue() and  getCurrentKey() method.

Read more about Reservoir Sampling on wikipedia.

Execution and Analysis

This time, we can distinguish the following boundaries from the partitions sequence file :

  • IA, Guthrie Center
  • NY, merrick

The “total” donation amount field is supposed to be in there as well, but it’s probably not readable without decompression.

By checking how the output file were delimited, we can see that they indeed have “IA Guthrie Center 100.00” and “NY merrick 125.00” as boundaries.

This time the files are split on a specific (state,city,total) triplet, so “states” don’t need to be exclusive to a reducer. For example the “NY” state can be found in both outputs 00001 and 00002. This also gives better load balance, as the 3 output file have nearly the same file size.

The job took on average 2min 31s, which is similar as the Secondary Sort from the previous part.

 

3 Comments

  1. Oleg said:

    Very nice post.

    February 12, 2016
    Reply
  2. Meraj Mohd Khan said:

    very well explanation of total order sort

    July 10, 2016
    Reply
  3. Anush said:

    Awesome!!

    July 7, 2017
    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

19 − six =