Hadoop Basics VIII: Running SQL Queries with Hive

Hive LogoIn this part, we will use Hive to execute all the queries that we have been processing since the beginning of this series of tutorials.

In nearly all parts, we have coded MapReduce jobs to solve specific types of queries (filtering, aggregation, sorting, joining, etc…). It was a good exercise to understand Hadoop/MapReduce internals and some distributed processing theory, but it required to write a lot of code. Hive can translate SQL queries into MapReduce jobs to get results of a query without needing to write any code.

We will start by installing Hive and setting up tables for our datasets, before executing our queries from previous parts and seeing if Hive can have better execution times than our hand-coded MapReduce jobs.

Contents

Hive Installation

To download and install Hive in your current directory on linux:

$ wget http://www.eu.apache.org/dist/hive/hive-1.2.1/apache-hive-1.2.1-bin.tar.gz
$ tar -zxf apache-hive-1.2.1-bin.tar.gz
$ mv apache-hive-1.2.1-bin hive

Then edit your .bashrc file to add the following lines at the end:

# Hive Variables
export HIVE_HOME=/home/hduser/hive
export PATH=$PATH:$HIVE_HOME/bin

Then you need to prepare a few working directories in HDFS:

$ hdfs dfs -mkdir /tmp
$ hdfs dfs -mkdir -p /user/hive/warehouse
$ hdfs dfs -chmod g+w /tmp
$ hdfs dfs -chmod g+w /user/hive/warehouse

To run the Hive Server, use one of the following commands:

# Default interactive mode
$ hiveserver2

# As background process, writing output to 'hive.log'
$ nohup hiveserver2 > hive.log &

Wait for the server to startup to finish (about 30 seconds or so), and then you can connect to the server and execute commands from any host by using the Beeline Client (here from the same host):

$ beeline -u jdbc:hive2://localhost:10000

For development, it is also possible to run the server and client in the same process by doing:

$ beeline -u jdbc:hive2://

Creating Tables

Since the beginning of this series we have used 2 datasets, Donations and Projects, which we have formatted as Sequence Files (see Part I). We will give these data files (donations.seqfile and projects.seqfile) to Hive so that it can create their corresponding “tables” which we will later query.

These tables, stored in the Hive metastore, contain only metadata about the datasets (table names, column information, etc…). They are used for interpreting the queries and running the underlying MapReduce (or Tez or Spark if configured) code. The real data which will be returned from queries is still from the original data files, our Sequence Files.

Managed or External

Tables in Hive can be either Managed or External.

Managed is the default type of table. When you pass a data file in HDFS to Hive to create a managed table, the file will be moved to the directory  /user/hive/warehouse/<table_name>/. Managed tables are meant to be used only by Hive. When you drop a managed table, everything is deleted, including your original data file.

External tables, on the other hand, can be used when you want to modify the data files outside of Hive. When you create an external table, the original datasets stay where they were. When an external table is dropped, only the table’s metadata is dropped. The data files still remain where they were in HDFS.

In my case, I have both sequence files in the /user/hduser/donors/ HDFS directory of the Hadoop user, and I would like to keep it to still be able to run my MapReduce jobs from the previous parts. So we will use external tables for both datasets. I also don’t want to loose my data files each time I drop the tables to recreate them if needed. The only problem is that Hive takes a directory instead of a file when creating a table. So we need to move the sequence files to their own individual directories first:

$ hdfs dfs -mkdir donors/donations
$ hdfs dfs -mv donors/donations.seqfile donors/donations/

$ hdfs dfs -mkdir donors/projects
$ hdfs dfs -mv donors/projects.seqfile donors/projects/

Sequence Files with Hive

To read the data, Hive needs to know how to deserialize our sequence files, so we need to pass it our own SerDe (serializer/deserializer), which must extend Hive’s AbstractSerDe class.

I wrote the DonationSerDe class (view) for the Donations sequence file.

The important methods that it overrides are:

  • initialize(Configuration conf, Properties tbl): uses the table’s metadata to initialize 2 things:
    1. An ObjectInspector, which contains the data types of all columns.
    2. A List of Objects containing as many elements as there are columns in the table. We initialize all elements to null and call this list row.
  • deserialize(Writable writable) : takes in parameter the Hadoop Writable from the input format and return all row values as a List of Objects.
    • In our case the input is a Sequence File record of type DonationWritable , and map its values into the row object list.
  • getObjectInspector() : get the object inspector created during the “initialize” method.

We will not write new entries to our datasets, so the serialization related methods like serialize() and getSerializedClass() will never be called. I simply return null-ish values for them.

Creating the Donations Table

We first need to add our code’s jar file to Hive’s class path in order to let it use our SerDe classes.

Then we create our table using the following commands:

  • ROW FORMAT SERDE to specify our SerDe class
  • STORED AS SEQUENCEFILE to use Sequence File data
  • LOCATION to define where our data files are (must be a directory)

The HiveDDL reference can be read here.

> add jar donors.jar;
INFO  : Added [donors.jar] to class path
INFO  : Added resources: [donors.jar]
No rows affected (0.454 seconds)

> list jars;
+-------------+--+
|  resource   |
+-------------+--+
| donors.jar  |
+-------------+--+
1 row selected (0.051 seconds)

> CREATE EXTERNAL TABLE donations (
    donation_id string,
    project_id string,
    donor_city string,
    donor_state string,
    donor_is_teacher string,
    ddate string,
    amount float,
    support float,
    total float,
    payment_method string,
    payment_inc_acct_credit string,
    payment_inc_campaign_gift_card string,
    payment_inc_web_gift_card string,
    payment_promo_matched string,
    via_giving_page string,
    ty_packet string,
    message string
  )
  ROW FORMAT SERDE 'data.hive.DonationSerDe'
  STORED AS SEQUENCEFILE
  LOCATION '/user/hduser/donors/donations';

No rows affected (0.561 seconds)

> SHOW TABLES;
+------------+--+
|  tab_name  |
+------------+--+
| donations  |
+------------+--+
1 row selected (0.215 seconds)

When deserializing, our SerDe class returns an arraylist with indexes 0 to 17, containing the values for each of the 18 fields (or columns). The index of those list entries are mapped to the column names defined during our table creation. This means that we can choose to ignore a few fields from the underlying Sequence File in our table by simply not adding them to the row object list inside the serialize method. This also means that we can give any names we want to our table columns, they don’t have to be the same names as the fields in the Sequence File writables.

For example, in this table created above, I have decided to ignore the field called “for_honoree” from the DonationWritable (view) sequence file object. I have also decided to name “thank_you_packet” field to a shorter column name “ty_packet”. None of these operations cause any trouble, we can verify this by executing the following query:

> SELECT ty_packet FROM donations LIMIT 1;
+------------+--+
| ty_packet  |
+------------+--+
| f          |
+------------+--+
1 row selected (0.622 seconds)

Creating the Projects Table

For the Projects dataset the procedure is the same. I created the ProjectSerDe class (view) which we will use to create the projects table.

Click on the bar below to display the SQL command to create the projects table:

> CREATE EXTERNAL TABLE projects (
    project_id string,
    teacher_acctid string,
    school_id string,
    school_latitude string,
    school_longitude string,
    school_city string,
    school_state string,
    school_country string,
    teacher_prefix string,
    primary_focus_subject string,
    primary_focus_area string,
    secondary_focus_subject string,
    secondary_focus_area string,
    resource_type string,
    poverty_level string,
    grade_level string,
    vendor_shipping_charges float,
    sales_tax float,
    payment_processing_charges float,
    fulfillment_labor_materials float,
    total_price_excluding_opt_donation float,
    total_price_including_opt_donation float,
    students_reached int,
    total_donations int,
    num_donors int,
    funding_status string,
    date_posted string,
    date_completed string,
    date_thank_you_packet_mailed string,
    date_expiration string
  )
  ROW FORMAT SERDE 'data.hive.ProjectSerDe'
  STORED AS SEQUENCEFILE
  LOCATION '/user/hduser/donors/projects';

No rows affected (0.569 seconds)

> SHOW TABLES;
+------------+--+
|  tab_name  |
+------------+--+
| donations  |
| projects   |
+------------+--+
2 rows selected (0.228 seconds)

Executing all Queries on our datasets

By default, Hive uses MapReduce as its processing engine to execute its queries. So let’s run all our queries from the previous parts (filtering, aggregating, sorting, joining, etc…) using Hive, and see how it does compared to the MapReduce code I manually wrote for each query.

Hive uses its pre-built MapReduce generic code to execute queries, instead of generating custom MapReduce jobs per query. So unfortunately we won’t be able to understand exactly what it does for each query and compare it to my hand-coded jobs. However we might still see a few hints like how many jobs were needed, how many mappers/reducers were used, how long it took etc…

Query from Part II: Aggregating/Sorting

The query we processed using MapReduce in Part II of this series was supposed to sum all donation amounts grouped by city, and display all city names with their sums in order of descending sum value. To solve this query, we created 2 MapReduce jobs (one for aggregation and the second one for sorting) which took respectively 1 min 25 sec and 1 min 02 sec, so a total of around 2 min 27 sec.

Here is how to execute this query in Hive and store all the results in a new table, called “hive_output_part2”:

> CREATE TABLE hive_output_part2
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  LOCATION '/user/hduser/donors/hive_output_part2' 
  AS
    SELECT SUM(total) as sumtotal, UPPER(donor_city) as city
    FROM donations 
    WHERE donor_is_teacher != 't'
    GROUP BY UPPER(donor_city)
    ORDER BY sumtotal DESC;

Click below to view the whole output I got from my beehive console:

INFO  : Number of reduce tasks not specified. Estimated from input data size: 3
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : number of splits:3
INFO  : Submitting tokens for job: job_1454508485700_0039
INFO  : The url to track the job: http://ubuntu0:8088/proxy/application_1454508485700_0039/
INFO  : Starting Job = job_1454508485700_0039, Tracking URL = http://ubuntu0:8088/proxy/application_1454508485700_0039/
INFO  : Kill Command = /home/hduser/hadoop/bin/hadoop job  -kill job_1454508485700_0039
INFO  : Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 3
INFO  : 2016-02-10 22:21:15,773 Stage-1 map = 0%,  reduce = 0%
INFO  : 2016-02-10 22:22:08,421 Stage-1 map = 11%,  reduce = 0%, Cumulative CPU 99.2 sec
INFO  : 2016-02-10 22:22:17,019 Stage-1 map = 44%,  reduce = 0%, Cumulative CPU 127.32 sec
INFO  : 2016-02-10 22:22:20,694 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 134.32 sec
INFO  : 2016-02-10 22:22:21,906 Stage-1 map = 78%,  reduce = 0%, Cumulative CPU 135.2 sec
INFO  : 2016-02-10 22:22:32,877 Stage-1 map = 89%,  reduce = 0%, Cumulative CPU 147.49 sec
INFO  : 2016-02-10 22:22:35,379 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 149.85 sec
INFO  : 2016-02-10 22:22:39,108 Stage-1 map = 100%,  reduce = 44%, Cumulative CPU 160.65 sec
INFO  : 2016-02-10 22:22:41,578 Stage-1 map = 100%,  reduce = 56%, Cumulative CPU 170.0 sec
INFO  : 2016-02-10 22:22:42,792 Stage-1 map = 100%,  reduce = 60%, Cumulative CPU 171.87 sec
INFO  : 2016-02-10 22:22:44,022 Stage-1 map = 100%,  reduce = 89%, Cumulative CPU 183.23 sec
INFO  : 2016-02-10 22:22:46,540 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 183.23 sec
INFO  : MapReduce Total cumulative CPU time: 3 minutes 3 seconds 230 msec
INFO  : Ended Job = job_1454508485700_0039
INFO  : Number of reduce tasks determined at compile time: 1
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : number of splits:2
INFO  : Submitting tokens for job: job_1454508485700_0040
INFO  : The url to track the job: http://ubuntu0:8088/proxy/application_1454508485700_0040/
INFO  : Starting Job = job_1454508485700_0040, Tracking URL = http://ubuntu0:8088/proxy/application_1454508485700_0040/
INFO  : Kill Command = /home/hduser/hadoop/bin/hadoop job  -kill job_1454508485700_0040
INFO  : Hadoop job information for Stage-2: number of mappers: 2; number of reducers: 1
INFO  : 2016-02-10 22:23:16,180 Stage-2 map = 0%,  reduce = 0%
INFO  : 2016-02-10 22:23:46,453 Stage-2 map = 50%,  reduce = 0%, Cumulative CPU 13.39 sec
INFO  : 2016-02-10 22:23:47,715 Stage-2 map = 67%,  reduce = 0%, Cumulative CPU 14.73 sec
INFO  : 2016-02-10 22:23:48,945 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 17.38 sec
INFO  : 2016-02-10 22:24:10,960 Stage-2 map = 100%,  reduce = 71%, Cumulative CPU 25.33 sec
INFO  : 2016-02-10 22:24:13,383 Stage-2 map = 100%,  reduce = 98%, Cumulative CPU 31.32 sec
INFO  : 2016-02-10 22:24:14,616 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 32.61 sec
INFO  : MapReduce Total cumulative CPU time: 32 seconds 610 msec
INFO  : Ended Job = job_1454508485700_0040
INFO  : Moving data to: /user/hduser/donors/hive_output_part2 from hdfs://ubuntu0:9000/user/hive/warehouse/.hive-staging_hive_2016-02-10_22-20-50_281_4971139345555329337-4/-ext-10001
INFO  : Table default.hive_output_part2 stats: [numFiles=0, numRows=14966, totalSize=0, rawDataSize=321343]
No rows affected (207.86 seconds)

This create 2 “stages”, which are MapReduce jobs. These 2 jobs also both appeared one after the other in the Yarn jobs UI.

According to the console output, the 2 jobs took a total of 207.86 seconds. But using the same measurements I did for my MR jobs, which is to take the “Elapsed” time from the Yarn UI, the total of both jobs is on average 3 min 15 sec, which is 45 seconds slower than the MapReduce jobs from the Part II post.

💡 Note that when using our hand-coded MapReduce jobs, a human intervention was necessary after the first job to execute the second job, whereas Hive does it automatically. This human time was not counted in the process (it could be fast or slow, depending on you!). It is also possible to chain your MapReduce job using tools such as Apache Oozie.

The Combined Input Format

In the console output (and in the Yarn UI), we can see that 3 mappers were used. Our dataset is split into 4 blocks in HDFS, and when running our MapReduce job, each mapper processed one block, so a total of 4 mappers were run. This is because Hive uses its CombineFileInputFormat by default, which combines input splits to use less mappers when it believes that it is optimal.

It can be deactivated by restarting HiveServer after adding the following configuration to HIVE_HOME/conf/hive-site.xml:

<property>
    <name>hive.input.format</name>
    <value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
</property>

On this job, using the combined input format made the job a few seconds faster. So let’s leave it as it is, and let Hive try its best with its default behavior.

Viewing the Results

The results can then be viewed in the plain text file generated in the “hive_output_part2” folder in HDFS.

The results can also be queried through the the newly created Hive table called “hive_output_part2”:

> SELECT * FROM hive_output_part2 LIMIT 5;
+-----------------------------+-------------------------+--+
| hive_output_part2.sumtotal  | hive_output_part2.city  |
+-----------------------------+-------------------------+--+
| 1.7220595933565181E8        |                         |
| 2.550429422512488E7         | NEW YORK                |
| 1.5451497673708646E7        | SAN FRANCISCO           |
| 6163195.398894411           | CHICAGO                 |
| 5085120.418014616           | SEATTLE                 |
+-----------------------------+-------------------------+--+
5 rows selected (0.531 seconds)

As you can see, the sums in the result are a bit weird. That is because the float data type contains approximate values only, like in most SQL databases. To calculate the exact sums without using approximate values of the floating point, you can cast the “total” float field to a decimal field by replacing SUM(total) by SUM(cast(total as DECIMAL(7,2))) in the query.

Query from Part III: Secondary Sort

In part III of this series we implemented a secondary sort in order to sort by priority of 3 different fields. The solution was a single MapReduce job using a CompositeKey.

Using a single Reducer

Here is how I executed this query in Hive:

> CREATE TABLE hive_output_part3
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  LOCATION '/user/hduser/donors/hive_output_part3' 
  AS
    SELECT donation_id, donor_state, donor_city, total
    FROM donations
    WHERE donor_state <> '' AND donor_city <> ''
    ORDER BY lower(donor_state) ASC, lower(donor_city) ASC, total DESC;

Notice that I had to change IS NOT NULL into <> ” from the original query, because unlike Oracle, Hive does store empty strings, and differentiates them from NULL.

Click below to display the console output and preview of the results:

INFO  : Number of reduce tasks determined at compile time: 1
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : number of splits:3
INFO  : Submitting tokens for job: job_1454508485700_0048
INFO  : The url to track the job: http://ubuntu0:8088/proxy/application_1454508485700_0048/
INFO  : Starting Job = job_1454508485700_0048, Tracking URL = http://ubuntu0:8088/proxy/application_1454508485700_0048/
INFO  : Kill Command = /home/hduser/hadoop/bin/hadoop job  -kill job_1454508485700_0048
INFO  : Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
INFO  : 2016-02-11 10:33:41,045 Stage-1 map = 0%,  reduce = 0%
INFO  : 2016-02-11 10:34:37,481 Stage-1 map = 11%,  reduce = 0%, Cumulative CPU 107.99 sec
INFO  : 2016-02-11 10:34:45,017 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 132.88 sec
INFO  : 2016-02-11 10:34:48,766 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 142.58 sec
INFO  : 2016-02-11 10:34:52,490 Stage-1 map = 78%,  reduce = 0%, Cumulative CPU 150.56 sec
INFO  : 2016-02-11 10:35:05,963 Stage-1 map = 89%,  reduce = 0%, Cumulative CPU 166.88 sec
INFO  : 2016-02-11 10:35:09,739 Stage-1 map = 89%,  reduce = 22%, Cumulative CPU 172.26 sec
INFO  : 2016-02-11 10:35:13,522 Stage-1 map = 100%,  reduce = 22%, Cumulative CPU 176.99 sec
INFO  : 2016-02-11 10:35:16,018 Stage-1 map = 100%,  reduce = 34%, Cumulative CPU 179.78 sec
INFO  : 2016-02-11 10:35:19,746 Stage-1 map = 100%,  reduce = 66%, Cumulative CPU 183.16 sec
INFO  : 2016-02-11 10:35:22,230 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 186.56 sec
INFO  : 2016-02-11 10:35:28,441 Stage-1 map = 100%,  reduce = 68%, Cumulative CPU 197.28 sec
INFO  : 2016-02-11 10:35:32,266 Stage-1 map = 100%,  reduce = 71%, Cumulative CPU 201.1 sec
INFO  : 2016-02-11 10:35:34,781 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU 204.47 sec
INFO  : 2016-02-11 10:35:38,576 Stage-1 map = 100%,  reduce = 79%, Cumulative CPU 207.93 sec
INFO  : 2016-02-11 10:35:41,048 Stage-1 map = 100%,  reduce = 83%, Cumulative CPU 211.3 sec
INFO  : 2016-02-11 10:35:44,765 Stage-1 map = 100%,  reduce = 87%, Cumulative CPU 214.76 sec
INFO  : 2016-02-11 10:35:47,200 Stage-1 map = 100%,  reduce = 91%, Cumulative CPU 218.15 sec
INFO  : 2016-02-11 10:35:50,793 Stage-1 map = 100%,  reduce = 95%, Cumulative CPU 221.5 sec
INFO  : 2016-02-11 10:35:53,254 Stage-1 map = 100%,  reduce = 99%, Cumulative CPU 225.06 sec
INFO  : 2016-02-11 10:35:55,736 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 226.35 sec
INFO  : MapReduce Total cumulative CPU time: 3 minutes 46 seconds 350 msec
INFO  : Ended Job = job_1454508485700_0048
INFO  : Moving data to: /user/hduser/donors/hive_output_part3 from hdfs://ubuntu0:9000/user/hive/warehouse/.hive-staging_hive_2016-02-11_10-32-57_883_1350326745524175387-1/-ext-10001
INFO  : Table default.hive_output_part3 stats: [numFiles=0, numRows=1499074, totalSize=0, rawDataSize=75679899]
No rows affected (181.605 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM hive_output_part3 LIMIT 10;
+-----------------------------------+--------------------------------+-------------------------------+--------------------------+--+
|   hive_output_part3.donation_id   | hive_output_part3.donor_state  | hive_output_part3.donor_city  | hive_output_part3.total  |
+-----------------------------------+--------------------------------+-------------------------------+--------------------------+--+
| c8e871528033bd9ce6b267ed8df27698  | AA                             | Canada                        | 100.0             	|
| 6eb5a716f73260c53a76a5d2aeaf3820  | AA                             | Canada                        | 100.0             	|
| 92db424b01676e462eff4c9361799c18  | AA                             | Canada                        | 98.36000061035156        |
| e0f266ed8875df71f0012fdaf50ae22e  | AA                             | Canada                        | 1.6399999856948853       |
| d9064b2494941725d0f93f6ca781cdc7  | AA                             | DPO                           | 50.0             	|
| 83b85744490320c8154f1f5bcd703296  | AA                             | DPO                           | 25.0             	|
| 7133a67b51c1ee61079fa47e3b9e5160  | AA                             | Fremont                       | 50.0             	|
| f3475f346f1483dfb57efc152d3fbced  | AA                             | Helsinki\, FINLAND            | 153.38999938964844       |
| 378ea8d5ce12c64d2ab77ec3151f8947  | AA                             | Helsinki\, FINLAND            | 117.61000061035156       |
| 220dd53c2d4d168e44cd9f7922d8c24c  | AA                             | Herring Cove\, NS\, Canada    | 150.0             	|
+-----------------------------------+--------------------------------+-------------------------------+--------------------------+--+
10 rows selected (0.796 seconds)

The MapReduce job executed by Hive used a single reducer, in order to have the data ordered on all 3 criteria, as we did in “Job 1” of part III.

The job still used only 3 mappers, and took only 2 min 47 sec, a lot faster than our 3 min 20 sec.

Using multiple Reducers

It is also possible to use multiple reducers, by replacing ORDER BY with SORT BY in the query. By doing this, Hive used 3 reducers instead of a single one.

The results are stored in 3 output files, and each “state” can be found in more than one of those outputs. It looks like it did not use a specific partitioner based on “state” values, like we did in “Job 3” from Part III. So what Hive did is actually the same as “Job 2”.

The job took only 2 min 20 sec, a bit faster than our 2 min 40 sec.

Query from Part IV: Total Order Sorting

In Part IV we processed the same query as in Part III, but using a special technique involving a custom CompositeKeySamplingInputFormat which performed reservoir sampling on the input, and returned the samples in the form of composite keys. Using this input format enabled us to use Hadoop’s TotalOrderPartitioner in our Secondary Sort job. The result was having all 3 reducer results totally sorted between them. In other words, we managed to obtain totally sorted results, like in “Job 1” of part III, whilst taking as little execution time as “Job 2”.

I don’t think that it is possible to do Total Order Sorting with multiple reducers in Hive. Anyways I haven’t found out how to do it. It seems like Hive tells us to use ORDER BY to have totally sorted results by using only a single reducer, or use SORT BY to use multiple reducers, but in that case the results are not totally sorted.

Query from Part V: Repartition Join

In part V we used a reduce-side basic repartition join on our 2 datasets on the joining key, project_id, which is the primary key of the projects dataset, and a foreign key in donations table.

Executing our join query in Hive automatically executes a Repartition Join.

CREATE TABLE hive_output_part5
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  LOCATION '/user/hduser/donors/hive_output_part5' 
  AS
    SELECT d.donation_id, d.project_id as join_fk, d.donor_city, d.ddate, d.total, p.project_id, p.school_city, p.poverty_level, p.primary_focus_subject
    FROM donations as d
    INNER JOIN projects as p
    ON d.project_id = p.project_id;

The MapReduce job executed by Hive used 4 mappers (instead of 5 for us), but used 3 reducers as we did and give exactly the same results (3 x 250MB outputs).

It took 3 min 50 sec on average, which is only a little bit slower than our optimization using Projection by Text, which took 3 min 40 sec.

Query from Part VI: Replicated Join

In Part VI we processed the same query as in Part V, but using a map-side replicated join. By default, Hive automatically does this optimization if the smallest table is under a certain threshold, defined in the property hive.mapjoin.smalltable.filesize. with the default value of 25 MB. Our projects dataset which needs to be cached is about 120 MB in HDFS, so we can set the small table filesize limit to 200 MB by running this command in the beeline console:

> set hive.mapjoin.smalltable.filesize = 200000000;

Now that we have set the new limit, the donations table will be considered as a “small table” in joins. We simply have to run the same join query again and notice that this time there are no reducers used in the executed MapReduce job, confirming that Hive performed a replicated join.

The query takes around 4 min 15 sec using the replicated join in Hive, which is slower than our 3 min 30 sec using string projection in our cache.

Query from Part VII: Bloom Filters

Hive only supports Bloom Filters with the ORC file format. In our case we can’t do it, so let’s just execute the query without using a bloom filter and see how fast it gets with the extra WHERE condition:

CREATE TABLE hive_output_part7
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  LOCATION '/user/hduser/donors/hive_output_part7' 
  AS
    SELECT d.donation_id, d.project_id as join_fk, d.donor_city, d.ddate, d.total, p.project_id, p.school_city, p.poverty_level, p.primary_focus_subject
    FROM donations as d
    INNER JOIN projects as p
    ON d.project_id = p.project_id
    WHERE LOWER(p.primary_focus_subject) LIKE '%science%';

Using a repartition join with Hive, it took 3 min 32 sec. It is a lot slower than the 2 min 00 sec we achieved by using the Bloom Filter. This was expected because the filter helps cut down the large amounts of shuffled data, which is the main weakness of a repartition join.

Using a replicated join with Hive, took 1 min 59 sec, which is quite close to our 1 min 48 sec using a repartition join without Bloom Filter either.

Benchmark Recap

Here is a reacap of all the execution times for each query:

* We wrote a custom-coded InputFormat to use with the TotalOrderPartitioner (see Part IV). Doesn’t seem possible Hive.
** The custom code used a Bloom Filter to cut down shuffled data. Couldn’t be done in Hive.

Conclusion

With Hive we could execute all of our queries without writing any code. However, our hand-coded optimized jobs managed to be slightly faster than Hive’s MapReduce jobs in nearly all cases.

And in some cases, where we applied more “advanced” techniques such as writing a custom InputFormat to combine Secondary Sort with Total Order Sorting, or using Bloom Filters, Hive was a lot slower. But our execution times does not count the time spent coding such low-level classes or creating the Bloom Filter. It all mostly depends on your query usage. If you need to execute the same or similar query 100 times a day it could be worth working on these extra optimization steps. For a query run only once you’d of course better stick with Hive.

It’s nice to see that getting our hands dirty in MapReduce code can still be rewarding. Well at least against Hive using MapReduce as its execution engine. Against Hive running on Tez or Spark we would probably loose, especially on such short jobs. Unless we write our own Tez or Spark jobs.

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *