Hadoop Basics VIII: Running SQL Queries with Hive

Hive LogoIn this part, we will use Hive to execute all the queries that we have been processing since the beginning of this series of tutorials.

In nearly all parts, we have coded MapReduce jobs to solve specific types of queries (filtering, aggregation, sorting, joining, etc…). It was a good exercise to understand Hadoop/MapReduce internals and some distributed processing theory, but it required to write a lot of code. Hive can translate SQL queries into MapReduce jobs to get results of a query without needing to write any code.

We will start by installing Hive and setting up tables for our datasets, before executing our queries from previous parts and seeing if Hive can have better execution times than our hand-coded MapReduce jobs.

Hive Installation

To download and install Hive in your current directory on linux:

Then edit your .bashrc file to add the following lines at the end:

Then you need to prepare a few working directories in HDFS:

To run the Hive Server, use one of the following commands:

Wait for the server to startup to finish (about 30 seconds or so), and then you can connect to the server and execute commands from any host by using the Beeline Client (here from the same host):

For development, it is also possible to run the server and client in the same process by doing:

Creating Tables

Since the beginning of this series we have used 2 datasets, Donations and Projects, which we have formatted as Sequence Files (see Part I). We will give these data files (donations.seqfile and projects.seqfile) to Hive so that it can create their corresponding “tables” which we will later query.

These tables, stored in the Hive metastore, contain only metadata about the datasets (table names, column information, etc…). They are used for interpreting the queries and running the underlying MapReduce (or Tez or Spark if configured) code. The real data which will be returned from queries is still from the original data files, our Sequence Files.

Managed or External

Tables in Hive can be either Managed or External.

Managed is the default type of table. When you pass a data file in HDFS to Hive to create a managed table, the file will be moved to the directory   /user/hive/warehouse/<table_name>/. Managed tables are meant to be used only by Hive. When you drop a managed table, everything is deleted, including your original data file.

External tables, on the other hand, can be used when you want to modify the data files outside of Hive. When you create an external table, the original datasets stay where they were. When an external table is dropped, only the table’s metadata is dropped. The data files still remain where they were in HDFS.

In my case, I have both sequence files in the /user/hduser/donors/ HDFS directory of the Hadoop user, and I would like to keep it to still be able to run my MapReduce jobs from the previous parts. So we will use external tables for both datasets. I also don’t want to loose my data files each time I drop the tables to recreate them if needed. The only problem is that Hive takes a directory instead of a file when creating a table. So we need to move the sequence files to their own individual directories first:

Sequence Files with Hive

To read the data, Hive needs to know how to deserialize our sequence files, so we need to pass it our own SerDe (serializer/deserializer), which must extend Hive’s AbstractSerDe class.

I wrote the DonationSerDe class (view) for the Donations sequence file.

The important methods that it overrides are:

  • initialize(Configuration conf, Properties tbl): uses the table’s metadata to initialize 2 things:
    1. An ObjectInspector, which contains the data types of all columns.
    2. A List of Objects containing as many elements as there are columns in the table. We initialize all elements to null and call this list  row.
  • deserialize(Writable writable) : takes in parameter the Hadoop Writable from the input format and return all row values as a List of Objects.
    • In our case the input is a Sequence File record of type DonationWritable , and map its values into the row object list.
  • getObjectInspector() : get the object inspector created during the “initialize” method.

We will not write new entries to our datasets, so the serialization related methods like  serialize() and getSerializedClass() will never be called. I simply return null-ish values for them.

Creating the Donations Table

We first need to add our code’s jar file to Hive’s class path in order to let it use our SerDe classes.

Then we create our table using the following commands:

  • ROW FORMAT SERDE to specify our SerDe class
  • STORED AS SEQUENCEFILE to use Sequence File data
  • LOCATION to define where our data files are (must be a directory)

The HiveDDL reference can be read here.

When deserializing, our SerDe class returns an arraylist with indexes 0 to 17, containing the values for each of the 18 fields (or columns). The index of those list entries are mapped to the column names defined during our table creation. This means that we can choose to ignore a few fields from the underlying Sequence File in our table by simply not adding them to the row object list inside the serialize method. This also means that we can give any names we want to our table columns, they don’t have to be the same names as the fields in the Sequence File writables.

For example, in this table created above, I have decided to ignore the field called “for_honoree” from the DonationWritable (view) sequence file object. I have also decided to name “thank_you_packet” field to a shorter column name “ty_packet”. None of these operations cause any trouble, we can verify this by executing the following query:

Creating the Projects Table

For the Projects dataset the procedure is the same. I created the ProjectSerDe class (view) which we will use to create the projects table.

Click on the bar below to display the SQL command to create the projects table:

Executing all Queries on our datasets

By default, Hive uses MapReduce as its processing engine to execute its queries. So let’s run all our queries from the previous parts (filtering, aggregating, sorting, joining, etc…) using Hive, and see how it does compared to the MapReduce code I manually wrote for each query.

Hive uses its pre-built MapReduce generic code to execute queries, instead of generating custom MapReduce jobs per query. So unfortunately we won’t be able to understand exactly what it does for each query and compare it to my hand-coded jobs. However we might still see a few hints like how many jobs were needed, how many mappers/reducers were used, how long it took etc…

Query from Part II: Aggregating/Sorting

The query we processed using MapReduce in Part II of this series was supposed to sum all donation amounts grouped by city, and display all city names with their sums in order of descending sum value. To solve this query, we created 2 MapReduce jobs (one for aggregation and the second one for sorting) which took respectively 1 min 25 sec and 1 min 02 sec, so a total of around 2 min 27 sec.

Here is how to execute this query in Hive and store all the results in a new table, called “hive_output_part2”:

Click below to view the whole output I got from my beehive console:

This create 2 “stages”, which are MapReduce jobs. These 2 jobs also both appeared one after the other in the Yarn jobs UI.

According to the console output, the 2 jobs took a total of 207.86 seconds. But using the same measurements I did for my MR jobs, which is to take the “Elapsed” time from the Yarn UI, the total of both jobs is on average 3 min 15 sec, which is 45 seconds slower than the MapReduce jobs from the Part II post.

💡 Note that when using our hand-coded MapReduce jobs, a human intervention was necessary after the first job to execute the second job, whereas Hive does it automatically. This human time was not counted in the process (it could be fast or slow, depending on you!). It is also possible to chain your MapReduce job using tools such as Apache Oozie.

The Combined Input Format

In the console output (and in the Yarn UI), we can see that 3 mappers were used. Our dataset is split into 4 blocks in HDFS, and when running our MapReduce job, each mapper processed one block, so a total of 4 mappers were run. This is because Hive uses its CombineFileInputFormat by default, which combines input splits to use less mappers when it believes that it is optimal.

It can be deactivated by restarting HiveServer after adding the following configuration to  HIVE_HOME/conf/hive-site.xml:

On this job, using the combined input format made the job a few seconds faster. So let’s leave it as it is, and let Hive try its best with its default behavior.

Viewing the Results

The results can then be viewed in the plain text file generated in the “hive_output_part2” folder in HDFS.

The results can also be queried through the the newly created Hive table called “hive_output_part2”:

As you can see, the sums in the result are a bit weird. That is because the float data type contains approximate values only, like in most SQL databases. To calculate the exact sums without using approximate values of the floating point, you can cast the “total” float field to a decimal field by replacing  SUM(total) by  SUM(cast(total as DECIMAL(7,2))) in the query.

Query from Part III: Secondary Sort

In part III of this series we implemented a secondary sort in order to sort by priority of 3 different fields. The solution was a single MapReduce job using a CompositeKey.

Using a single Reducer

Here is how I executed this query in Hive:

Notice that I had to change IS NOT NULL into <> '' from the original query, because unlike Oracle, Hive does store empty strings, and differentiates them from NULL.

Click below to display the console output and preview of the results:

The MapReduce job executed by Hive used a single reducer, in order to have the data ordered on all 3 criteria, as we did in “Job 1” of part III.

The job still used only 3 mappers, and took only 2 min 47 sec, a lot faster than our 3 min 20 sec.

Using multiple Reducers

It is also possible to use multiple reducers, by replacing ORDER BY with  SORT BY in the query. By doing this, Hive used 3 reducers instead of a single one.

The results are stored in 3 output files, and each “state” can be found in more than one of those outputs. It looks like it did not use a specific partitioner based on “state” values, like we did in “Job 3” from Part III. So what Hive did is actually the same as “Job 2”.

The job took only 2 min 20 sec, a bit faster than our 2 min 40 sec.

Query from Part IV: Total Order Sorting

In Part IV we processed the same query as in Part III, but using a special technique involving a custom CompositeKeySamplingInputFormat which performed reservoir sampling on the input, and returned the samples in the form of composite keys. Using this input format enabled us to use Hadoop’s TotalOrderPartitioner in our Secondary Sort job. The result was having all 3 reducer results totally sorted between them. In other words, we managed to obtain totally sorted results, like in “Job 1” of part III, whilst taking as little execution time as “Job 2”.

I don’t think that it is possible to do Total Order Sorting with multiple reducers in Hive. Anyways I haven’t found out how to do it. It seems like Hive tells us to use ORDER BY to have totally sorted results by using only a single reducer, or use SORT BY to use multiple reducers, but in that case the results are not totally sorted.

Query from Part V: Repartition Join

In part V we used a reduce-side basic repartition join on our 2 datasets on the joining key, project_id, which is the primary key of the projects dataset, and a foreign key in donations table.

Executing our join query in Hive automatically executes a Repartition Join.

The MapReduce job executed by Hive used 4 mappers (instead of 5 for us), but used 3 reducers as we did and give exactly the same results (3 x 250MB outputs).

It took 3 min 50 sec on average, which is only a little bit slower than our optimization using Projection by Text, which took 3 min 40 sec.

Query from Part VI: Replicated Join

In Part VI we processed the same query as in Part V, but using a map-side replicated join. By default, Hive automatically does this optimization if the smallest table is under a certain threshold, defined in the property hive.mapjoin.smalltable.filesize. with the default value of 25 MB. Our projects dataset which needs to be cached is about 120 MB in HDFS, so we can set the small table filesize limit to 200 MB by running this command in the beeline console:

Now that we have set the new limit, the donations table will be considered as a “small table” in joins. We simply have to run the same join query again and notice that this time there are no reducers used in the executed MapReduce job, confirming that Hive performed a replicated join.

The query takes around 4 min 15 sec using the replicated join in Hive, which is slower than our 3 min 30 sec using string projection in our cache.

Query from Part VII: Bloom Filters

Hive only supports Bloom Filters with the ORC file format. In our case we can’t do it, so let’s just execute the query without using a bloom filter and see how fast it gets with the extra WHERE condition:

Using a repartition join with Hive, it took 3 min 32 sec. It is a lot slower than the 2 min 00 sec we achieved by using the Bloom Filter. This was expected because the filter helps cut down the large amounts of shuffled data, which is the main weakness of a repartition join.

Using a replicated join with Hive, took 1 min 59 sec, which is quite close to our 1 min 48 sec using a repartition join without Bloom Filter either.

Benchmark Recap

Here is a reacap of all the execution times for each query:

* We wrote a custom-coded InputFormat to use with the TotalOrderPartitioner (see Part IV). Doesn’t seem possible Hive.
** The custom code used a Bloom Filter to cut down shuffled data. Couldn’t be done in Hive.

Conclusion

With Hive we could execute all of our queries without writing any code. However, our hand-coded optimized jobs managed to be slightly faster than Hive’s MapReduce jobs in nearly all cases.

And in some cases, where we applied more “advanced” techniques such as writing a custom InputFormat to combine Secondary Sort with Total Order Sorting, or using Bloom Filters, Hive was a lot slower. But our execution times does not count the time spent coding such low-level classes or creating the Bloom Filter. It all mostly depends on your query usage. If you need to execute the same or similar query 100 times a day it could be worth working on these extra optimization steps. For a query run only once you’d of course better stick with Hive.

It’s nice to see that getting our hands dirty in MapReduce code can still be rewarding. Well at least against Hive using MapReduce as its execution engine. Against Hive running on Tez or Spark we would probably loose, especially on such short jobs. Unless we write our own Tez or Spark jobs.

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

2 × one =