Counting and sorting words in Hadoop

At this stage you probably have a general idea of what Hadoop is in technical scene. But why do we really need such a huge and complicated platform for doing such simple things like searching, counting or sorting our data. According to the research provided by Cisco last year annual global IP traffic will reach 2.3 zettabytes per year by 2020. Another research forecast performed by International Data Corporation few years ago stated that up to 2020 people will have to operate with 44 zettabytes of data. Can we really handle such capacities with our current hardware and algorithms? Hadoop is probably the best attempt to handle that problem at this time.

There is quite an interesting competition which exists in the world of Big Data called Terasort. It appeared in 2008 with the general idea to generate, sort and validate 1TB of data. At that period the result was 3 minute 48 seconds on Hadoop cluster of 910 nodes. By the time the amount of data increased to 100TB and just few month ago we got a new record of sorting 100TB of data for 98.8 seconds in the cluster of 512 nodes. The actual results are available Sort Benchmark Home page.

TeraSort package is actually a part of standard Apache Hadoop distribution. So you can evaluate the capacities of your cluster launching exact same test from local environment but with some minor amount of data. For example connect to your cluster and run command which will generate 1GB of data  and then another command which will sort 1GB of data:

/usr/hdp/current/hadoop-client/bin/hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples-*.jar teragen 10000000 /tmp/teragenout

 Job Counters
                Launched map tasks=2
                Other local map tasks=2
                Total time spent by all maps in occupied slots (ms)=17877
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=17877
                Total vcore-seconds taken by all map tasks=17877
                Total megabyte-seconds taken by all map tasks=91530240

/usr/hdp/current/hadoop-client/bin/hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples-*.jar terasort /tmp/teragenout /tmp/terasortout

 Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=1
                Rack-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=16632
                Total time spent by all reduces in occupied slots (ms)=4461
                Total time spent by all map tasks (ms)=16632
                Total time spent by all reduce tasks (ms)=4461
                Total vcore-seconds taken by all map tasks=16632
                Total vcore-seconds taken by all reduce tasks=4461
                Total megabyte-seconds taken by all map tasks=85155840
                Total megabyte-seconds taken by all reduce tasks=22840320

In my test cluster it took about 18 seconds to generate 10GB of data and about 21 seconds to sort this data. Of cause such result is far behind from the results of the champions of this competition but in general it can be a good approach to measure the capacities of the environment you are working on.

Preparing Java environment

Till this moment we used to work with the examples of MapReduce jobs embedded into Hadoop distribution jars. But in order to start writing our custom Hadoop packages we need to have certain developer’s environment installed and properly configured. First we need to choose the language for our logic. For this purpose I’m going to use Java. As I’ve told before, I came from .NET realm and from my point of view Java and C# are very similar languages so switching from one to another would be less painful comparing to Python or Scala options. Besides it is native to many Hadoop services and with knowledge of Java sometimes you can look into their sources and understand how things are working inside.

First of all we need to install IDE on our primary system. For Java environment Eclipse is one of the most popular development platform. Lets download and install its latest version called Neon from here. Also make sure you have Java SDK installed in you machine alternatively you can download it here. Now lets create new project for our MapReduce job – File => New => Project => Maven => Maven Project => choose Create a SimpleProject checkbox:

part3Maven.png

We’ve chosen Maven project instead of standard Java project because it has certain advantages which heavily simplify our deployment process. Maven is a management tool which allows to create a flexible build workflow for your package. It introduces the concept of driver file called pom.xml which contains all information about how your application should be created. In this file you can define the metadata for your program, its dependencies and their repositories, global properties, resources, extra build actions and other things which we usually use at building stage of our products. When your code is ready you can run Maven to launch the build process which will follow driver definitions and will create your application ready for the end usage. By default Maven is missing in our sandbox, but if we are planing to build our sources in Hadoop environment we can easily install it there:

cd  /usr/local

wget http://mirrors.whoishostingthis.com/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz

tar xzvf apache-maven-3.3.9-bin.tar.gz

export MAVEN_HOME=/usr/local/apache-maven-3.3.9/bin
export PATH=$MAVEN_HOME:$PATH

Restart connection:

mvn -version

Maven home: /usr/local/apache-maven-3.3.9
 Java version: 1.8.0_111, vendor: Oracle Corporation
 Java home: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-0.b15.el6_8.x86_64/jre
 Default locale: ru_RU, platform encoding: UTF-8
 OS name: "linux", version: "3.10.0-327.el7.x86_64", arch: "amd64", family: "unix"

Writing custom MapReduce job

Now lets try to write our first Hadoop program which will count the words from the files located within some folder in our HDFS. Generic MapReduce job consists of three parts, technically three classes – Mapper, Reducer and Driver. The option where we can have only Mapper and Driver is also available. More complicated workflows can consist of MapReduce chains with a couple of mappers and reducers leading by the single driver class.

Mapper piece of our application should perform three main tasks – filtering input data so that we would operate with the information which we really need, then localizing the non-unique key attribute for this data and finally sorting result set by this key. Sometimes mappers can perform the role of generators where they can create some sort of satellite data and attach it to base information. In our counting example we will actually create such type of data. To make our mapper recognizable by YARN scheduling system we need to derive from base class org.apache.hadoop.mapreduce.Mapper <KEYIN, VALIN, KEYOUT, VALOUT> which is an external dependency of hadoop-mapreduce-client-core package. Besides we are going to use a couple of other classes from another Hadoop package – org.apache.hadoop.io. In order to import them into our project we will use Maven. Add this node into pom.xml file of your project straight after version tag:

 <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
 

Then Right click on your project => Run As…=>Maven Generate Source and you should see Maven Dependencies node within the tree of your project. Now the body of our Mapper class:

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer .hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
}

The logic here is quite straightforward – we pass through our mapper, line by line, the content of our file, each line we split with the separator symbols using StringTokenizer class into collection of words and than we link each word with a satellite data – number one. In our case words are playing the role of the keys. Sorting part would performed by Hadoop automatically behind the scene.

Reducer piece – performs unit of functionality upon the data prepared by the mapper. Before getting to the reduce part Hadoop framework groups input data by the key and then one by one processes each key with attached collection of tuples through the Reducer logic. In our case we need to sum these tuples to get the total number of occurrence of each word in the input text. Same as a mapper we need to inherit the primary reducer logic from the base Reducer class:

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }    
        context.write(key, new IntWritable(sum));
    }
}

Driver is the last part of our package. It is a launcher class with main method which defines the configuration of the job. Here you need to tell YARN scheduler:

  1. Where to take input data from
  2. What type of data it is going to be
  3. How this data will be transformed by mapper and reducer
  4. Where to save the result data produced by reducer

This is the example of our driver:

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, “WordCount mapreduce job”);
        job.setJarByClass(Driver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

As you can see we defined all mentioned settings in this method. Besides pay attention that we are to telling MapReduce framework by means of setJarByClass method which class is the Diver for our job. Now we need to build a single package with all theses classes inside. We can do it either in our local OS directly from IDE or in Hadoop cluster. For the first option we need Right Click on the project => Run As… => Maven build… => Set goal as ‘package’ => Click ‘Run’. Our application will be created as a single jar file within the /target folder of the project. Now we need to copy it in our cluster. For this purpose I use utility called WinScp. I copied my demo-0.0.1-SNAPSHOT.jar package into /tmp catalog of Hadoop cluster.

After this we can connect to our cluster and run the program. But first lets put some input data for our MapReduce job into HDFS:

hdfs dfs -put /usr/hdp/2.5.0.0-1245/spark/LICENSE /tmp

Now we are ready to use yarn Hadoop utility to schedule the logic. We need to specify the full name of the main class of our package, input data location and output data location as a parameters for the command:

yarn jar /tmp/demo-0.0.1-SNAPSHOT.jar demo.Driver /tmp/LICENSE /tmp/demoout

Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2052
Total time spent by all reduces in occupied slots (ms)=2035
Total time spent by all map tasks (ms)=2052
Total time spent by all reduce tasks (ms)=2035
Total vcore-milliseconds taken by all map tasks=2052
Total vcore-milliseconds taken by all reduce tasks=2035
Total megabyte-milliseconds taken by all map tasks=513000
Total megabyte-milliseconds taken by all reduce tasks=508750

It took 4 seconds to execute this job in my cluster. The output can be verified by hdfs command:

hdfs dfs -cat /tmp/demoout/part-r-00000

Counting words with Hive and Pig

How you may have noticed the writing of MapReduce jobs is quite time consuming task as you need to perform a separate unit of logic for every simple piece of functionality. Besides you need to move applications between environments, deal with building of the packages and bother with the dependencies. For lots of basic operations upon the data you can actually switch to higher level of abstraction which will generate these map-reduce jobs for you automatically. Besides you will deal with familiar to many developers SQL-like syntax of querying the data. This can be done by means of to standalone Hadoop tools called Hive and Pig. The main difference between them is the way of writing queries upon the data. Hive uses SQL syntax for working with the information. Pig uses custom language called Pig Latin and introduces the concept of batch scripting approach. Personally I find both solutions very interesting with a bit different target use-cases. At the end both tools generate a set of MapReduce jobs (by default) scheduled by YARN. I will not go deep into the details, just want to show how easier it would be to perform word counting logic upon exact same file using these tools.

Pig approach:

pig

A = load ‘/tmp/LICENSE’;

B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;

C = filter B by word matches \\w+’;

D = group C by word;

E = foreach D generate COUNT(C), group;

dump E;

quit;

Hive approach:

hdfs dfs -mkdir /tmp/hivedemo
hdfs dfs -put /usr/hdp/2.5.0.0-1245/spark/LICENSE /tmp/hivedemo

hive

CREATE EXTERNAL TABLE if not EXISTS words (line STRING)
ROW FORMAT DELIMITED
LINES TERMINATED BY ‘\n’
LOCATION ‘/tmp/hivedemo’;

SELECT word, COUNT(*) FROM words LATERAL VIEW explode(split(line, ‘ ‘)) lTable as word GROUP BY word;

Conclusion

In this article I showed how to create simple MapReduce jobs in Hadoop. This is a low level paradigm which became one of the fundamental parts of Hadoop and distributed computing. More high level approaches like Pig or Hive helps to simplify this process and even sometimes to avoid end developer from dealing with MapReduce directly. But there is still a number of tasks which can be implemented only by means of using of fundamental techniques. Real challenge to MapReduce approach was introduced by different Hadoop processing tool called Spark, but this would be the topic of another article.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s