Hadoop Basics I: Working with Sequence Files

In this new series of posts, we will explore basic techniques on how to query structured data. Querying means filtering, projecting, aggregating, sorting and joining data. We will view different methods of querying on different Hadoop frameworks (MapReduce, Hive, Spark, etc …).

Sequence Files CompressionThis first part will briefly introduce the dataset which will be used throughout this series, and then present the Sequence File data format. We will see how to write and read Sequence Files with a few code snippets, and benchmark the different compression types to choose the best one.

We will format our dataset in a Sequence File, for later use in various frameworks in my following posts.

Contents

The Cluster

I will use my own mini cluster for running jobs and benchmarking stuff. You may find details on my 5-node home-made cluster in another series of posts, starting here.

However, I have decided to use 1 GB of memory for MapReduce containers (mappers and reducers) instead of 512 MB during this series of tutorials. Since my micro-computers only have 2 GB of RAM, this results in a maximum of 2 containers running at the same time on each of my 4 slave nodes.

The Dataset

The dataset we will use in this series of tutorials is from the “DonorsChoose” organization, which provides a platform for donating money to school projects.

Their school projects and donations database is open for downloads as separate CSV files for each table. The files and detailed explanations can be found here on their website : http://data.donorschoose.org/open-data/overview/.

In this post we will simply use the “donations” table data, which contains 5 million donation entries (rows). The original CSV’s uncompressed file size is about 1.6 GB.

You may view a sample that I made of the first 10 lines of the “donations” data here.

Sequence Files

Instead of working directly with the CSV dataset files for our MapReduce operations, we will create a Sequence File containing all of the data that we want.

Introduction

A Sequence File is a file containing a sequence of binary Key/Value records, where both Key and Value are serialized objects. These objects can be anything extending Hadoop’s Writable interface. So they can be simple objects like strings or integers, but they can also be a custom-made object with many attributes.

Using Sequence Files instead of CSV or other text files has the following advantages :

  • Read structured objects directly from the file, instead of splitting and mapping each value of a CSV line each time.
  • Transparent and splittable compression natively supported in Hadoop. No extra code or steps to do if you want to work with compressed Sequence Files.

On the other hand, there is an overhead for coding the necessary java classes and generating the Sequence File itself.

Sequence File vs. Other Data Formats

Sequence File is in fact a basic low-level data storage format.

Other more advanced data formats can be used in Hadoop, such as Avro, Parquet, ORC, Protocol Buffers, Thrift, etc…

The advantages that Sequence Files has over them is :

  • Being natively supported by Hadoop. No need to install anything. In fact it is used internally by Hadoop behind the scenes.
  • Write performance : it is faster than all other data formats to write data.
  • It also has good reading performance when it comes to reading full rows of data.

However it has some disadvantages as well, mainly because of being low-level and designed for Hadoop  :

  • No multi-language support : only Java
  • No code generation : you have to write your own objects in Java. And Java is not the fastest language to write 😉
  • No schema evolution : If you want to remove attributes, you have to manually recreate another file with new modified objects.

When should we use Sequence Files ?

In a corporate setting you probably wouldn’t use Sequence Files to store your datasets, because the disadvantages listed above are quite crucial most of the time in the industry. Unless you need to create datasets which will be written once, never change, and always read full rows using Java code, you’d better use more advanced data formats :).

However, for learning Hadoop it’s great because it is a native Hadoop tool, and it forces you to code lower level classes for the Hadoop serialization framework. Understanding Sequence Files will help you understand Hadoop internals as well.

Writing and Reading Sequence Files

 Creating a custom Writable

To use a custom object in Sequence Files, we must first create the object’s java class. Our class (let’s call it ExampleWritable) needs to implement the WritableComparable interface, which is an extension of these 2 interfaces, listed with their methods :

  • Writable  : necessary for Hadoop’s serializer
    • write(DataOutput out)  : how to serialize the object from its attribute
    • readFields(DataInput in)  : how to read the input and map the values to the object’s attributes
  • Comparable  : from standard java library, but necessary for sorting keys in MapReduce
    • compareTo(ExampleWritable o)  : define how to sort the instances (keys in MR)

The write and readFields methods need to write and read the attributes in the same order. For example if you needed an object with 3 attributes of types String, float and long, your Writable class would look like this :

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ExampleWritable implements WritableComparable<ExampleWritable> {
    
    public String stringAttr;
    public float floatAttr;
    public long longAttr;
    
    @Override
    public void readFields(DataInput in) throws IOException {
        stringAttr = in.readUTF();
        floatAttr = in.readFloat();
        longAttr = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(stringAttr);
        out.writeFloat(floatAttr);
        out.writeLong(longAttr);
    }
    
    @Override
    public int compareTo(ExampleWritable o) {
        return Long.compare(this.longAttr, o.longAttr);
    }
}

To read and write these Writables from or to Sequence Files, there are Writer and Reader classes provided by the org.apache.hadoop.io.SequenceFile class. The Javadoc of this class is quite clear and complete, with explanations on the file structure and compression of Sequence Files. It can be read here.

Writing a Sequence File

Here is a basic snippet to write all the entries from input.csv to a Sequence File called example.seqfile using the previous ExampleWritable  as values :

File inputFile = new File("input.csv");
Path outputPath = new Path("example.seqfile");

try 
(
    BufferedReader br = new BufferedReader(new FileReader(inputFile));
    
    // Create a SequenceFile Writer with block compression
    SequenceFile.Writer writer = SequenceFile.createWriter(new Configuration(), 
        SequenceFile.Writer.file(outputPath),
        SequenceFile.Writer.keyClass(LongWritable.class),
        SequenceFile.Writer.valueClass(ExampleWritable.class),
        SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
) {
    
    for (String line = br.readLine(); line != null; line = br.readLine()) {
        
            // Parse csv line to create an ExampleWritable object
            String[] parts = line.split(",");
            ExampleWritable example = new ExampleWritable();
            example.longAttr = Long.parseLong(parts[1]);
            example.stringAttr = parts[3];
            
            // Create a key and write the Key/Value pair to output
            LongWritable key = new LongWritable(example.longAttr);
            writer.append(key, example);
        
    }
}

In the try-with-resources initialization block, we create a BufferedReader to read the CSV, and a Writer to write to the Sequence File. The writer is provided and configurable by static methods all inside Hadoop’s SequenceFile class. We choose to have LongWritable as keys and ExampleWritable as values. We also choose “Block Compression”, which is the most complete form of compression for Sequence Files.

Then all we need to do is loop through the CSV file lines to split and set values to a new object before append it to the Sequence File.

💡 In this example, we create new LongWritable and ExampleWritable instances in each loop. Actually you may also re-use the same 2 instances, because they are directly serialized in the Writer.append(key,value) method call in each iteration. This may slightly increase performance.

Reading a Sequence File

Here is a code snippet to read data from a Sequence File called example.seqfile :

Path inputFile = new Path("example.seqfile");

try (SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), SequenceFile.Reader.file(inputFile))) 
{
    Text key = new Text();
    ExampleWritable value = new ExampleWritable();

    while (reader.next(key, value)) {
        System.out.println(key);
        System.out.println(value);
    }
}

As you can see, reading is very simple. Key/Value types and compression detection is done automatically. All Sequence Files contain this information in a header at the beginning of the file.

Various getters are available on the Reader object to find out what the types and compression are. All you need to do is provide objects with the matching writable key and value types to the reader.next(key,value) method. This method will use the readField(in) from your writable objects to populate their attributes.

Creating the “Donations” Sequence File

Java Code

Below are the classes I used to create the Sequence File for the “donations” dataset. They use all of the techniques described in the previous section.

You can click on each class name to view it on GitHub :

  • DonationWritable.java
    • Contains 18 fields from the CSV (leaving out 4 useless fields).
    • Has a parseLine(String line)  method to populate fields from a csv line.
  • DonationsWriter.java
    • Executable file with metrics and ignoring the first line of the csv (containing column names).
    • Uses “Block Compression”.
    • Uses the donation_id field as the key value => Text key type.
  • DonationsReader.java
    • Executable file to print out all records, just for testing.

Here are the terminal commands for using these classes to create the Sequence File in HDFS from the CSV file, and then read it :

$ hadoop jar donors.jar data.seqfile.DonationsWriter /data/donors/opendata_donations.csv donors/donations.seqfile

15/12/17 17:46:11 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
15/12/17 17:46:11 INFO compress.CodecPool: Got brand-new compressor [.deflate]
1 million lines processed
2 million lines processed
3 million lines processed
4 million lines processed
5 million lines processed
Number of lines processed : 5069961
Number of errors : 438625
Took 722464 ms.


$ hadoop jar donors.jar data.seqfile.DonationsReader donors/donations.seqfile

15/12/17 18:02:25 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
15/12/17 18:02:25 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
15/12/17 18:02:25 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
15/12/17 18:02:25 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
15/12/17 18:02:25 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
Compressed ? true
00000ce845c00cbf0686c992fc369df4|project=7be7184572a67468a2ef4402e7de6c99|city=|total=25.00
0000174d1d38072889d47e51b587a10c|project=fbef21e4fdf303eebe6c48b99ef9de8f|city=San Francisco|total=2138.07
0000219e4b37d2504fb6b8c28e24a2d4|project=b4afbc05ce9ff8420c176810bd59db79|city=Ballwin|total=47.56

There are a number of errors when creating the file, but this is due to errors in the original csv. The “comments” field of the csv contains carriage returns, and it seems like they caused line returns in the csv, because we can see the comment going on to the next line sometimes.

Compression Benchmark

Here is a small benchmark on the 3 different types of compression, using my mini cluster on the “donations” dataset :

Type Sequence File Size HDFS Blocks Write Time Read Time
None 1.19 GB 10 8 min 50 s 1 min 12 s
Record Compression 1.01 GB 9 15 min 05 s 3 min 52 s
Block Compression 493 MB 4 11 min 49 s 1 min 18 s

The write and read times are an average of 2 executions, both executed from the master node.

Conclusion

The Record Compression barely saves space here, reducing the file by only 1 block, and is 3 times slower to read. So it’s not really a good idea to use it.

The Block Compression however, reduces number of blocks from 10 to 4. The write time is a bit slower than without compression, but that’s only a one-time operation so it doesn’t really matter. What matters is that the read time is only slightly slower. This is a great deal, because it means that when working on this dataset in MapReduce, we will only need 4 mappers instead of 10, with only a very little decompression overhead.

In the next post we will see how to use the Sequence File that we created in a MapReduce job.

One Comment

Leave a Reply

Your email address will not be published. Required fields are marked *