Getting data into HBase and consuming it through native shell and Hive

If you followed my previous articles, probably at this stage you should have common understanding of primary components of Hadoop ecosystem and basics of distributed calculations. But in order implement performant processing we first need to prepare a strong data foundation for it. Hadoop provides a number of solutions for this purpose and HBase data store is one of the best products in this ecosystem which allows you to organize big amounts of data in a single place. In this article I want to tell about some techniques of working with HBase – how to import the data, how to read it through native API and how to simplify its consumption through another Hadoop product called Hive.

Getting data into HBase

Before we start working with the storage the first thing we should do is to put information inside it. I usually divide typical scenarios of getting this data inside HBase database into two categories:

Batch upload – used when we have some amount of data inside the cluster and we want to upload it into HBase for further manipulations. There are several ways of how to do this:

  1. Loading through MapReduce job – in this option we create a MapReduce job with a single Map step which would use HBase client to perform put operation on every input key-value pair. This approach allows to achieve a certain level of control upon the incoming data so that developer could perform some transformation steps in order to fully suit the data model. See example here.
  2. Bulk upload using ImportTsv tool – HBase provides a special utility called importtsv which allows to perform bulk insert of the content of the files directly into the storage. This option is faster than previous one, but it assumes that the data is already prepared for getting into the database and does not reqire transformations.  There is a nice example available on HortonWorks site provided by Ned Shawa.
  3. Perform import through Pig – for these of you how feel comfortable with Pig Latin language, the utility allows to upload data from different sources directly into the data-store by means of HBaseStorage library. The description is available here.
  4. Perform upload through Phoenix – Phoenix is a special service running on the top of HBase. It allows to work with the storage in SQL-based fashion. Phoenix provides custom mechanisms to upload the data into the database described here

RealTime upload – when we need to follow some real-time processing scenarios where the data is pushed into HBase continuously, Hadoop provides few options for this:

  1. Steaming through Storm – Storm framework allows to listen the for the messages from different sources and process them through predefined workflows. This service contains a special HBase bolt which encapsulates HBase client for performing put operations. I’ve provided the example in one of my articles.
  2. Streaming through Spark – Spark is another option which allows to organize real-time data processing. Nice example can be found in one of MapR article related to processing data into HBase

I mentioned most popular and commonly used approaches, but this is not the end list of possible options as every day the ecosystem introduces new tools and frameworks which allow to cover existing scenarios in more advanced fashion.

Importing Dataset through Pig

For our example we need some test data which we will upload into the storage. I’ve chosen the Crimes – 2001 to present for this purpose. It contains some information about incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago from 2001 to present, minus the most recent seven days. The approximate size of the archive is about 800MB so it will take some time to get from the source and push it inside the cluster:

wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

After we downloaded the file, lets prepare the storage for its content. We will create a table with three column families which would logically combine the common attributes of the information:

hbase shell

create ‘crime’,’primary’,’secondary’,’location’

hdfs dfs -put testdata.csv /tmp

Our batch file is in place, so now we need to perform the import of this data into HBase. As a common lazy developer in order to resolve some task I always try to go from most simple options to the most complicated one. For HBase data import I try to follow such priority  ImportTsv => Pig => MapReduce (I left Phoenix and Spark outside the scope of this article).

The usage of  ImportTsv is a matter of one command, but I can’t guarantee that the file keeps similar structure for every row. I can loose some data as a result because MapReduce job created by the utility will not be able to perform correct mapping of the data for the lines with different number of columns. In order to solve this problem I will use Pig Latin commands for my import opertion. Pig is another product of Hadoop ecosystem which allows to perform batch-based operations by means of predefined Pig querying syntax. All interactions with Pig engine can be done through a special shell called Grunt. In our case we will perform two commands – load only consistent data from csv file and put it into HBase:

pig

grunt> data = load ‘/tmp/testdata.csv’ using PigStorage(‘,’) AS (
ID: chararray, CaseNumber: chararray, Date:chararray, Block: chararray, IUCR: chararray, Type:chararray, Description:chararray, Location:chararray, Arrest:chararray, Domestic:chararray, Beat:chararray, Ward:chararray, CommunityArea:chararray, FBICode:chararray, xCoord:chararray, yCoord:chararray, year:chararray);

grunt> STORE data INTO ‘hbase://crime’
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
primary:CaseNumber primary:Date primary:Block primary:IUCR primary:Type primary:Description location:LocationDescription secondary:Arrest secondary:Domestic secondary:Beat secondary:District secondary:Ward location:CommunityArea secondary:FBICode location:xCoord location:yCoord primary:year’);

Now I lets calculate total number of rows in our new table:

hbase shell

hbase(main):002:0> count ‘crime’

2720737 row(s) in 148.2380 seconds

Almost 3 million records is nothing in terms of modern Big Data clusters, but for local instance this set is sufficient for some analysis of performance which we are going to do.

Reading the data in HBase: Get vs Scan

HBase is a key-value storage, so the main aim of the database is to provide real-time data access to the information by some unique key. For this purpose HBase has Get operation – “Used to perform Get operations on a single row.” 

Scans on the other hand allow to retrieve the collection of rows by a number of attributes specified in the filters. The trade-off of its usage will be the increase of time due to partial regions scans or full table scans – “All operations are identical to GET with the exception of instantiation. Rather than specifying a single row, an optional startRow and stopRow may be defined. If rows are not specified, the Scanner will iterate over all rows.” Scan operations are working in terms of filters which define searching patterns upon the data. There is nice overview of different types of filters available on Cloudera blog.

Retrieval by key: if we perform get request upon our crime table we will see that it done almost instant:

 hbase(main):004:0> get ‘crime’,’9940612′

COLUMN CELL
primary:CaseNumber timestamp=1492040011520, value=HV142083

16 row(s) in 0.0290 seconds

Retrievel by single non-key column value: now let’s try to do a scan with single row retrieval

hbase(main):004:0> scan ‘crime’, { LIMIT =>1, COLUMNS => ‘primary:CaseNumber’, FILTER => “ValueFilter( =, ‘binaryprefix:HV142083‘ ) ” }

ROW COLUMN+CELL
8482013 column=primary:CaseNumber, timestamp=1492040011520, value=HV142083
1 row(s) in 2.7920 seconds

As you can see it took about 3 seconds to perform the scan search comparing to the 29 milliseconds comparing to get. The difference keeps increasing once you start introducing other attributes for the scan operations.

Retrieval by multiple column values: now lets try to use a set of SingleColumnValueFilter filters to find data by two attributes:

hbase(main):004:0> scan ‘crime’, { FILTER => “SingleColumnValueFilter(‘primary’, ‘CaseNumber’, =, ‘binary:HV142083‘) AND SingleColumnValueFilter(‘primary’, ‘Type’, =, ‘binary:THEFT‘)” }

ROW COLUMN+CELL
8482013 column=location:CommunityArea, timestamp=1492040011520, value=38
…..
1 row(s) in 48.6850 second

Assuming that command performed two full table scans the time increased up to 40 seconds in my case.

Accessing HBase through Hive

And indeed HBase documentation states that the row key is the primary searching parameter of every table and its proper construction is the key factor of good performance. But sometimes we need to access data differently according some searching logic which goes beyond the borders of the key attribute. In the world of RDBMS secondary indexes are playing the helper role for this problem but HBase does not have native support of such type of functionality.

Another thing which should be taking into consideration is the syntax. Hbase has a custom API which allows us to discover the data. But for these of you who are familiar with SQL it would take time to start feeling free with this filtering technique.

Hadoop ecosystem contains several options which allow deal with these two problems and one of the them could be familiar to you from my previous articles. It is called Hive. This product is especially useful when you are going to follow the model of distributed warehouse when different pieces of the information could reside in different sources. Hive allows to build the link to HBase tables and consume it through familiar SQL-based syntax. In order to build such link we need to properly define the mapping to HBase table column families and their qualifiers:

hive

hive> CREATE EXTERNAL TABLE ecrime (key int, Block string, CaseNumber string, EventDate string, Description string, IUCR string, Type string, Arrest string, Beat string, District string, Domestic string, FBICode string, Ward string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,primary:Block,primary:CaseNumber,primary:Date,primary:Description,primary:IUCR,primary:Type,secondary:Arrest,secondary:Beat, secondary:District, secondary:Domestic, secondary:FBICode, secondary:Ward”)
TBLPROPERTIES (“hbase.table.name” = “crime“, “hbase.mapred.output.outputtable” = “crime“);

In this example we created an external Hive table upon our Hbase table. Hive will use special interpreter to translate the HQL queries into a set of scanners and filters mentioned earlier. But now you can perform SQL-based queries upon exact same data.

hive> SELECT * FROM ecrime WHERE key=’9940612‘;

Time taken: 1.316 seconds, Fetched: 1 row(s)

hive> SELECT * FROM ecrime WHERE caseNumber =’HV142083‘ LIMIT 1;

//about 35 seconds

As you can see search by key is still fast while for the second query it took about 35 seconds on my machine to get the response. That is slower then in our first scan but query plan can give us some explanation:

hive> explain SELECT * FROM ecrime WHERE caseNumber =’HV142083′ LIMIT 1;

Stage-0
Fetch Operator
limit:1
Limit [LIM_3]
Number of rows:1
Select Operator [SEL_2]
outputColumnNames:       [“_col0”, “_col1″,” _col2″, “_col3”, “_col4”, “_col5”, “_col6”,                                                          “_col7”, “_col8”, “_col9”, “_col10”, “_col11”, “_col12”]
Filter Operator [FIL_5]
predicate:(casenumber = ‘HV142083’) (type: boolean)
TableScan [TS_0]
alias:ecrime

The query performs three operations according to the plan – full table scan, retrieving the satellite data from other qualifiers and limitation to the top first result. We can optimize this by limiting the select condition of our request to single qualifier which will avoid the search and retrieval of other units of data from hbase table saving us some time:

 hive> SELECT key FROM ecrime WHERE caseNumber =’HV142083′ LIMIT 1;

//about 7 seconds 

Creating index through ORC Hive table

When the amount of data comes closer to the values which relate it to the term of Big Data we start looking for some optimization techniques to reduce the total time of our queries. As I’ve mentioned earlier HBase does not support secondary indexes. But we can try to attach this piece of functionality from other framework – Hive for example. The idea of this approach is to create native lookup table in Hive and use the indexing capacities of this application to optimize the queries upon this information and then to perform the join operation with hbase external table by the key attribute. We will use ORC file format for storing this data as it automatically provides a very performant indexing mechanism on 3 different levels:

  • file level – statistics about the values in each column across the entire file
  • stripe level – statistics about the values in each column for each stripe
  • row level – statistics about the values in each column for each set of 10,000 rows within a stripe

hive> CREATE TABLE hcrime (key int,caseNumber string,type string) STORED AS orc ;

hive> INSERT INTO TABLE hcrime SELECT key, casenumber,type FROM ecrime;

As you see we can query these parameters in much performant fashion:

hive> select * from hcrime where caseNum=’HV142083′ and type=’THEFT’;
OK
8482013 HV142083 THEFT

Time taken: 0.22 seconds, Fetched: 1 row(s)

Now we can perform joins with this table and use it as our filter for hbase data:

hive> SELECT key FROM hcrime h inner join ecrime e on e.key = h.key where h.casenum = ‘HV142083′ and h.type =’THEFT’;

I need to mention that such query brings an extra overhead in executing our operations as a set of MapReduce jobs so we will receive an extra performance impact on their scheduling in YARN. That is why for small amounts of data you will not probably be able to see some significant benefits from following this approach. Another drawback is that you now have to maintain extra table which duplicates the original one and to keep it up-to-date through some MapReduce job or maybe, if you are following some streaming approach, to do it on the start by saving data two places at once. But once the amounts of information will start growing, you will see significant benefits from using this index data when you will perform non-key based queries upon storage.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s