Structuring Hadoop data through Hive and SQL

In this article I would like to start getting you acquainted with the Hadoop services which can heavily simplify the process of working with the data within the cluster. We’ve already played around the fundamental part of Hadoop – HDFS. But as you could have noticed it is really quite complicated to work with the data in such way especially taking into consideration that every new operation upon the data would require another MapReduce job which should be properly implemented, tested, deployed, executed and verified. Besides MapReduce approach follows batch processing model, meanwhile some solutions could relay on the option of real-time data access which is beyond the scope of this framework. In some cases certainly this technique should be applied, for example to implement some complicated operations with data but the most part of scenarios usually relate to some basic and generic operations like searching, grouping or aggregating the information. And in order to simplify such functionality Hadoop community invented several solutions which help end users to get to the higher level of abstraction and to start using more simple mechanisms for querying the data. One of them called Hive.

Introducing SQL through Hive

Most of software developers especially at the early stages of their career usually go through different relational databases for constructing their solutions. Indeed this data model proved its usefulness as it really provides a strong mechanism for storing the data in some valid and consistent state and gives a common simple technique for working with this data through SQL. But at the same time such model carries certain limitations and performance impacts related to the support of ACID features upon the data, transactions isolation levels and normalized structure of the data. Hadoop on the other side originally used to work with the non-normalized information represented by a collections of files spread across distributed file system. Besides it supported a single mechanism for performing different operations upon this data – MapReduce jobs. But by the time new use-cases appeared and they claimed the requirements for structuring of the data and accessing it using some more simple methods. Facebook and Yahoo started to work on the new project called Hive meanwhile Cloudera stated the development of alternative product called Impala. These two solutions have certain differences in their implementation, but in general they both give end consumers the functionality to structure the data within HDFS and to perform SQL-based queries upon it. As we started to work with the HortonWorks distribution of the cluster which includes Hive option to perform the SQL-related tasks, Impala would be beyond the scope of this article, but as it is said on the Impala community site – “For Apache Hive users, Impala utilizes the same metadata, ODBC driver, SQL syntax, and user interface as Hive—so you don’t have to worry about re-inventing the implementation wheel“.

From the high level Hive consists of the following primary components:

  • HDFS with YARN – Hive uses Hadoop file system for the actual storage of the data and YARN framework for execution operations upon it
  • MetaStore – contains the definition of the data storage, which describes the actual databases, tables, partitions, columns and data types within Hive. Behind the scene this component keeps all information in relational DB like Postgres, MySQL, Derby or Oracle consumed through DataNuclears ORM while all real data is been located in HDFS
  • Driver and Compiler – these two parts are responsible for parsing the actual SQL queries from the client, mapping them with the information from MetaStore and translating them into the set of YARN jobs represented by a single execution plan
  • Execution engine – uses execution plan for performing actual scheduling of the jobs through YARN and retrieval of results

Besides different versions of Hive extend this model with extra components:

  • HiveServer1, HiveServer2 – services, which provide an extended functionality for direct accessing of data storage through Hive Thrift API. These components play the role of proxies which allow to create connections to the DataStore from ODBC and JDBC clients. HiveServer2 has some advanced features comparing to HiveServer1 like concurrency of the connections, authentication of the original users, better support of JDBC and ODBC connectors
  • WebHCatalog – service which allows to access hive data storage through RESTful API. Allows to perform Kerberos authentication using SPNEGO mechanism

This is the general overview of Hive architecture :

part6schema

Hive keeps all the data within databases. Each database consists of a set of tables. They are separated into two categories:

  • External tables – data which already exists at some location outside Hive. It could be a file in HDFS or HBase non-sql table for example. Hive has a special operation which allows to describe the data schema parameters of the resources and persist this information in the MetaStore. As a result service engine would be able to make correct assumptions upon the physical content of these resources and could generate proper execution plans. Hive uses CREATE EXTERNAL TABLE operation to link this data
  • Internal tables – contain the data fully managed by Hive. It is located in a special place in HDFS called Hive Warehouse. Such data is still physically exists in HDFS but we can select different options of how we want to store it. We can keep it as a set of text files or they can be the part of single key-value based sequence file. More advanced options like ORC, Avro or Parquet formats propose some additional features to our tables like compression, ACID or extra performance. Besides we can optimize some of them with additional configurations like bucketing and partitioning

Working with external data through Hive shell

Now when we have some basic understanding of how things are working in Hive, lets try to go and look at real implementation of these components in our cluster. If you open Hive link in Ambari UI, you’ll be able to see the list of services which are working in the environment:

part6AmbariHive.png

As you can see my version of cluster uses MySql as the data storage for Metastore. Lets look inside the actual database directly from Linux shell of our Sandbox:

mysql

mysql> show databases;
+——————–+
| Database |
+——————–+
| information_schema |
| hive |
| mysql |
| oozie |
| ranger |
| ranger_audit |
+——————–+
mysql> use hive;

mysql> show tables;
+—————————+
| Tables_in_hive |
+—————————+
| BUCKETING_COLS |
| COLUMNS_V2 |
| PARTITIONS |
| ROLES |
| TBLS |
| VERSION |
+—————————+

I didn’t show the list of all the tables within the database, just wanted to point out only that instances which hold the information described in theoretical part. Now lets create a new file and push it into HDFS:

hdfs dfs -mkdir /tmp/hiveext

vim test.txt

1,oleksii,yermolenko,developer
2,donald,trump,president
3,justin,biber,singer
4,mike,jordan,spotsman

:wq!

hdfs dfs -put test.txt /tmp/hiveext/

After this let’s try to attach this file to hive. This can be achieved by using either direct Hive CLI or using more advanced option – Beeline CLI. The first tool works with MetaStore and HDFS data directly, beeline uses HiveServer2 service which means that you’ll be able to follow proper security policy. It would not make much difference for the single sandbox environment, but in real clusters beeline is the correct way to go. In my example I will consume Hive CLI using hive command. I’m going to create a new table upon this file and describe its data schema:

hive 

hive> CREATE EXTERNAL TABLE test(id INT, firstname STRING, lastname STRING, position STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE
LOCATION ‘/tmp/hiveext’;

hive> INSERT INTO test VALUES (5,’alec’,’baldwin’,’actor’);

hive> SELECT * FROM test WHERE ID>3;

5 alec baldwin actor
4 mike jordan spotsman
Time taken: 0.315 seconds, Fetched: 2 row(s)

As you can see our data has been linked, modified and filtered using pure SQL syntax. The interesting thing is how Hive handled the INSERT operation, as external resources ideally should not be modified from HIVE:

 hdfs dfs -ls /tmp/hiveext/

-rwxr-xr-x 3 hive hadoop 17 2017-02-15 22:05 /tmp/hiveext/000000_0
-rw-r–r– 3 hdfs hdfs 101 2017-02-15 21:58 /tmp/hiveext/test.txt

cat /tmp/hiveext/000000_0

5,alec,baldwin,actor

Insert operation created a separate file for the new record so that our original file could remain unmodified. Now lets look into the metadata of this resource in our Metastore in MySQL:

mysql> select * from TBLS;

| 320 | 1487178138 | 1 | 0 | hive| 0 | 325 | test | EXTERNAL_TABLE | NULL  | NULL |

mysql> select * from COLUMNS_V2 WHERE id=320;

| 320 | NULL | firstname | string | 1 |
| 320 | NULL | id | int | 0 |
| 320 | NULL | lastname | string | 2 |
| 320 | NULL | position | string | 3 |
+——-+———+——————–+—————+————-+

Metastore now contains some descriptive information about our external table. It will be used to properly generate the execution plans from the SQL-based queries which will be then transmitted to YARN for scheduling of the actual jobs.

Another important feature of external tables is that once you delete them, they do not affect the source data. DROP TABLE operation would only remove the records from the Metastore. New data which was created after original linking operation would still exist after this:

hive -e ‘DROP TABLE test’

mysql> select * from COLUMNS_V2 WHERE id=320;

ERROR 1054 (42S22): Unknown column ‘id’ in ‘where clause’

 hdfs dfs -ls /tmp/hiveext/

-rwxr-xr-x 3 hive hadoop 17 2017-02-15 22:05 /tmp/hiveext/000000_0
-rw-r–r– 3 hdfs hdfs 101 2017-02-15 21:58 /tmp/hiveext/test.txt

Now if you recreate this table again, it will have all the information which existed before deletion:

hive> CREATE EXTERNAL TABLE test(id INT, firstname STRING, lastname STRING, position STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE
LOCATION ‘/tmp/hiveext’;

Driving custom Warehouse though Hive

External tables are good for accessing different types of resources using SQL-based syntax, but ideally they should be limited with the read operations. If you want to drive your data, it is better to create a separate internal tables for this in service Warehouse folder. Hive provides a large variety of options for configuration such tables. Some techniques provide better level of compression, other improve performance. I will describe some of them with the examples of their usage. First of all I want to create four different tables in Hive:

hive> CREATE TABLE testTxtPart(id INT, firstname STRING, lastname STRING) PARTITIONED BY (position STRING)

hive> CREATE TABLE testTxtBucket(id INT, firstname STRING, lastname STRING,position STRING) CLUSTERED BY(lastname) INTO 4 BUCKETS

hive> CREATE TABLE testOrc(id INT, firstname STRING, lastname STRING, position STRING) STORED as ORC tblproperties (“orc.compress”=”NONE”)

hive> CREATE TABLE testAvro(id INT, firstname STRING, lastname STRING, position STRING) ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.avro.AvroSerDe’ STORED as INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat’ OUTPUTFORMAT  ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat’

If you now look into Metastore you’ll be able to find new tables with the definition tables:

mysql> select * from TBLS;

| 323 | 1487181993 | 1 | 0 | hive | 0 | 328 | testorc | MANAGED_TABLE | NULL | 
| 329 | 1487237901 | 1 | 0 | hive | 0 | 335 | testtxtpart | MANAGED_TABLE | NULL |
| 332 | 1487238950 | 1 | 0 | hive | 0 | 339 | testtxtbucket | MANAGED_TABLE | NULL | 
| 333 | 1487239555 | 1 | 0 | hive | 0 | 340 | testavro | MANAGED_TABLE | NULL |

The actual storage for all these tables would be located in the HDFS Warehouse directory. This path is defined in the hive-site.xml => hive.metastore.warehouse.dir setting. In my case it is /apps/hive/warehouse:

 hdfs dfs -ls /apps/hive/warehouse

drwxrwx— – hive hadoop 0 2017-02-16 05:06 /apps/hive/warehouse/testavro
drwxrwx— – hive hadoop 0 2017-02-16 05:00 /apps/hive/warehouse/testorc
drwxrwx— – hive hadoop 0 2017-02-16 04:57 /apps/hive/warehouse/testtxtbucket
drwxrwx— – hive hadoop 0 2017-02-16 04:38 /apps/hive/warehouse/testtxtpart

These tables are represented by standalone catalogs in the file system. They are located at the top level of Warehouse by default but we could create a special Database entities for them which would probably be the better option in real environments. Now lets look at the differences of the actual structure and file content of these tables.

Partitioned text table – allows to split the data among several locations basing on the values of single parameter or a set of parameters. If for example we know that our data would be equally distributed and consistently consumed in most part of our queries relating to COUNTRY attribute, we can define partitions upon it. Hive would create a separate catalog for each individual value of this attribute. It would make a significant improvement of the performance of the queries which will use this parameter. Partitioned table would create a separate catalogue in root table folder for every new value of the partition parameter. Lets import all developers from external test table into developer partition of testTxtPart table and check the actual content from hdfs:

hive> INSERT INTO TABLE testTxtPart PARTITION (developer) select id, firstname, lastname from test where position=’developer’;

hdfs dfs -ls /apps/hive/warehouse/testtxtpart/developer

-rwxrwxrwx   3 hive hadoop 0 2017-02-16 04:39 /apps/hive/warehouse/testtxtpart/developer/000000_0

hdfs dfs -cat /apps/hive/warehouse/testtxtpart/developer/000000_0

1oleksiiyermolenko

Bucketing text table – approach is similar to partitioning. But when we know that the list of possible values of our partitioning column could be great, we can minimize it to some limited number of buckets. In this case each value would be processed through a hash function which will result the number of buckets for the end data. Such technique can also give some performance benefits for certain type of queries. From the start Bucketing table would create a set of bucket files within root table folder equivalent to the number we specified in CREATE TABLE query. On each insert operation hash function will define the exact bucket file for writing the data:

hive> INSERT INTO TABLE testTxtBucket select id, firstname, lastname,position from test;

hdfs dfs -ls /apps/hive/warehouse/testtxtbucket

hive hadoop 0 2017-02-16 04:56 /apps/hive/warehouse/testtxtbucket/000003_0
hive hadoop 25 2017-02-16 04:56 /apps/hive/warehouse/testtxtbucket/000002_0
hive hadoop 52 2017-02-16 04:56 /apps/hive/warehouse/testtxtbucket/000001_0
hive hadoop 45 2017-02-16 04:56 /apps/hive/warehouse/testtxtbucket/000000_0

cat /apps/hive/warehouse/testtxtbucket/000001_0

4mikejordanspotsman
3justinbibersinger

ORC table Optimized Row Columnar format of storing the data. Provides ACID features, improvements in speed of writing the data and significant improvements in speed of reading the data. Besides has a good level of compression of actual content. ORC table keeps the data in files within special groups called stripes. Each stripe has indexes, metadata and actual data. If you look into the content of ORC file, you will see our information, but it will be decorated with additional attributes used by Hive engine to perform advanced features mentioned earlier:

hive> INSERT INTO TABLE testOrc select id, firstname,lastname,position from test;

 hdfs dfs -cat /apps/hive/warehouse/testorc/000000_0

ORC P
alecoleksii6P
$ ” baldwin
yermolenkoBP 
actospotsmanJPF▒F▒alecoleksiidonaldjustinmikeFGf@baldwinyermolenkotrumpbiberjordanFzU`actordeveloperpresidentsingerspotsmanFY▒▒ merica/New_York
g P P “
alecoleksii6P “
baldwin
yermolenkoBP “
actospotsmanJ▒
▒u ▒(“$

We can convert this metadata into human readable format by special hive CLI operation tool:

 hive –orcfiledump /apps/hive/warehouse/testorc/000000_0

Rows: 5
Compression: NONE
Type: struct<_col0:int,_col1:string,_col2:string,_col3:string>

Stripe Statistics:
Stripe 1:
Column 0: count: 5 hasNull: false
Column 1: count: 5 hasNull: false min: 1 max: 5 sum: 15
Column 2: count: 5 hasNull: false min: alec max: oleksii sum: 27
Column 3: count: 5 hasNull: false min: baldwin max: yermolenko sum: 33
Column 4: count: 5 hasNull: false min: actor max: spotsman sum: 37

File Statistics:
Column 0: count: 5 hasNull: false
Column 1: count: 5 hasNull: false min: 1 max: 5 sum: 15
Column 2: count: 5 hasNull: false min: alec max: oleksii sum: 27
Column 3: count: 5 hasNull: false min: baldwin max: yermolenko sum: 33
Column 4: count: 5 hasNull: false min: actor max: spotsman sum: 37

Stripes:
Stripe: offset: 3 data: 117 rows: 5 tail: 134 index: 132
Stream: column 0 section ROW_INDEX start: 3 length 8
Stream: column 1 section ROW_INDEX start: 11 length 20

Avro tablekeeps the data in Avro format which also provides good performance characteristics and compression level. But the main feature of this structure is that is supports data schema evolution which is currently missing in other approaches. The end Avro table will also be represented by the files in root folder. In the beginning of content of the file you will be able to see the data schema header. You can redefine this schema without any extra re-export and re-import steps. Avro probably the best option for managing the information with dynamic structural behavior:

hive> INSERT INTO TABLE testAvro select id, firstname, lastname,position from test;

hdfs dfs -cat /apps/hive/warehouse/testavro/000000_0

Objavro.schema▒{“type”:”record”,”name”:”testavro”,”namespace”:”default”,”fields”:[{“name”:”id”,”type”:[“null”,”int”],”default”:null},{“name”:”firstname”,”type”:[“null”,”string”],”default”:null},{“name”:”lastname”,”type”:[“null”,”string”],”default”:null},{“name”:”position”,”type”:[“null”,”string”],”default”:null}]}0Ӻ▒S▒▒▒▒▒fҨ7▒
▒ alecbaldwin
actoroleksiiyermolenkodeveloper

Parquet table – another popular format with a good compression and performance characteristics. The content of Parquet also has the columnar structure, but the internal implementation is significantly different comparing to other formats. Driven by Cloudera it is often proposed as an alternative to HortonWorks ORC structure. Probably for this reason its support is missing in our Sandbox by default.

Conclusion

Hive provides a great mechanism for structuring and processing the data within Hadoop. It heavily simplifies the methods for accessing the data by providing well known level of SQL-based abstraction. Besides it can significantly decrease the primary requirements for the amount of knowledge necessary for work with Big Data from the side of the end users. In my next articles I want to start getting you acquainted with other techniques and frameworks used in the world of Big Data for managing and structuring the information inside the cluster.

One thought on “Structuring Hadoop data through Hive and SQL

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s