First dive into Hadoop

In my first article I want to share my experience with the steps I did to start working the world of Big Data. As a guy from .NET stack it was really quite challenging for me to understand what technologies I should start studying for getting myself into the world of Hadoop and to identify these first practical steps which need to be done to start working in this realm.

There is huge amount of different materials about this topic in the Internet, but most of them start making sense once you getting deeper into this scope. For the beginner it is usually quite hard to catch the general idea of what Hadoop really is. Especially when the most part of your experience is related to Windows technology stack.  To simplify things at first stage I usually say people to think of Hadoop as just a number of daemons (services) running in on a different nodes (machines) connected with each other by the network. That’s what it really is. Each single type of daemon carries some piece of functionality and target some specific purpose. But together they solve one general problem – operate huge amount of data in a short term. Generally daemons are implemented on different languages like Java, Python or Scala and almost all of them are completely open-source. Besides in real environment each daemon is usually represented by separate instance per each node and these instances communicate with each other directly through some API or indirectly through some intermediate daemon. Common feature which most of them have is that usually they are working in master/slave fashion. It means that one node has some primary service which gives the tasks to slave daemons hosted on the different machines. Altogether they form a single Hadoop cluster.

The number of these daemons is relatively large, so it is usually really confusing for the beginners to point out the primary ones. Besides sometimes some folks say that something in Hadoop is quite obsolete and that you should not touch it at all.  But personally I consider that the foundation of every technology is always important and without it you will not be able to get all parts together. So in order not to make overheads here I want to start with two primary parts of Hadoop which form its foundation – HDFS (Hadoop Distributed File System) and YARN (Yet Another Resource Negotiator).

Before moving forward I want to mention that most distributions of Hadoop are based on Linux environment so it really makes sense in putting some efforts into the learning of this OS. From personal experience I would recommend to pass through some base courses in Pluralsite:

Also I would recommend to pass some essential Linux certification at the end as it really helps to fill the gaps and to summarize all knowledge. Personally I’ve chosen LPI: Linux Essentials (010-150) for this purpose.

But let’s get back to Hadoop. In general it has two primary parts which form the foundation of the overall technology and I want to give a brief explanation of them:

  • HDFS – file system distributed across Hadoop cluster, some guys call it a heart of technology. The main difference from the other file systems is that this one is oriented on the storage of very large files across different machines in reliable fashion. HDFS works upon the original file system of the hosting operation system, Linux in common case. Following the base master/slave approach it is represented by two types of daemons – NameNode and DataNode. The first one is probably one of the most important and critical part of Hadoop as it stores the metadata information about all the data within the file system. In other words if Namenode fails – everything stops working. That is why it is really important to handle the correct maintains of this daemon with proper fault-tolerant policies. Slaves are represented by the DataNode daemons hosted on different machines. They are responsible for physical storage of the data and providing an access to it. HDFS follows fault tolerant policy where all the data is been replicating across the DataNodes according to the replication factor which usually equals three as a recommended value in production environments. That means that each piece of data would have three copies across the cluster and each save operation would consist of three operations on different machines. But if one DataNode fails the information will still be available to all the clients from other two DataNodes. Hadoop will deal with this automatically without any manual interactions
  • YARN – second part of Hadoop which is responsible for resource allocation and scheduling the tasks upon the data which exists in HDFS. Same as HDFS it is implemented as set of daemons which work in master/slave fashion. In old version of Hadoop they were represented by Job Tracker (master daemon who had the role of primary scheduler and resource allocator on some master node) and collection of Task Tracker daemons  on the slave nodes who did the actual work upon the data. This model coupled the concepts of scheduling, tracking  and execution of the tasks into single approach. As a result, all running tasks failed due to the failure of the Job Tracker. Especially it was critical for long-running tasks. Besides that framework was tiered to a single computing paradigm called MapReduce while some solutions required more advanced approaches for working with the data. Next version of Hadoop resolved this gap by introducing of new version of resource managing and scheduling system called YARN. This framework also follows the concept of the master/slave approach – it is represented by the Resource Manager daemon on the master node and set of Node Manager daemons on the children nodes, but comparing to the Task Manager and Task Tracker each new task scheduled within YARN is been processed within independent set of containers scheduled and driven by the main container, called Application Master. The main purpose of Resource Manager and Node Managers is to allocate these containers in proper nodes, track their state, reschedule them in case of failure and inform the end clients about their execution. The real work is now performed within the container on the slave node and even in case of failure of Resource Manager it will still keep processing without any issues.

After some piece of basic theory in the next section of the article I would like to walk through the manual process of installation and configuration of HDFS and YARN on Linux environment. I really recommend to pass over this procedure personally as it gives some basic understanding of how things are working on the low level. I will walk through most simple option, where Hadoop will work in a single node and will share same resources within single Java Virtual Machine. This is called Single Distribution Mode setup and usually used for studying purpose. More advanced approach is to perform distribution in Pseudo-Distributed mode where different daemons will run on different JVM instances but still on the same machine. Most of Hadoop vendors provide their sandboxes with pre-installed and pre-configured Hadoop daemons which are running in this mode. Such model usually suits for some local testing or studying purposes but not for operating with relatively big amounts of data. Besides it has certain significant requirements to the hardware  of hosting machine. The real environment works in Fully Distributed Mode where cluster is configured across different machines with a set of different Hadoop daemons running in independent JVM instances.

Preparing the environment:

Before the installation of Hadoop daemons we need first to prepare suitable environment for this. For training purpose as an option I would recommend to use Linux Ubuntu operation system hosted as a virtual machine in VirtualBox application which is available on the official VirtualBox site.

After downloading the distribution package and installing the application we need to launch it and proceed through configuration of standalone virtual machine – Menu => Machine => New. Then we need to select Linux Ubuntu image option and specify some suitable name:


At next section we need to setup sufficient size of virtual memory and disc resources available for this instance. Depending on the amount of daemons and secondary Linux utilities we want to install this number could vary, but for current use case we need to share at least 2 GB of RAM and to allocate at least 4 GB of physical disk space. After proceeding through the configuration options and creating a new virtual disc we will be able to see it in the list of existing instances in the sidebar of VirtualBox application.

Now we need to map out virtual machine to the real instance of Linux OS. There are two ways of doing this:

  • install custom Ubunti OS from original distribution package into virtual machine: for this approach first we need to download original Ubuntu distribution package image. After this we should start our virtual machine and select downloaded iso file as a start-up disk:


The installation itself is quite straightforward. You should follow through the wizard    and select some environmental and configuration properties. This process is deeply described on original Ubuntu site

  • another option would be to download pre-configurated instance and attach it to the virtual machine: such instances of Ubuntu system can be found all over the Internet. As an option one can be downloaded directly from the VirtualBox site from a separate Linux branch

Configuration of ssh:

All interactions with the virtual machine could be done directly through the VirtualBox UI using Ubuntu desktop. But I would recommend to follow command-line approach from the start and use some remote ssh client to interact with the Linux environment. Personally I use Putty SSH client for this purpose. But in order to make ssh connection working in Ubuntu, we should modify some configuration settings. They could already been done in pre-configurated downloaded images. But in self-made images usually twe need to make it on our own.

In order to implement these configuration changes first we need to launch our virtual machine instance, connect to the original Ubuntu desktop and launch command line shell through original Terminal Ubuntu application:


Now we are in the bash shell, the management center of Lunix. To implement the configuration for remote ssh connection we need to perform the following steps:

  • install ssh-server daemon in Ubuntu environment from Ubuntu Packages repository using terminal command line input:

sudo apt-get install openssh-server

  • enable ssh port forwarding in virtual machine – by default our Ubuntu instance has NAT configuration applied to interact with the hosing system through virtual network. But in order to enable external access to the local resources we need to forward the necessary ports in the settings of the virtual machine. For ssh connection we need to forward port 22. To do this we need to add a new forwarding rule through VirtualBox application – Machine => Settings… => Network => Port Forwarding:


After performing these steps you should be able to connect to the Ubuntu bash shell using remote ssh connection under local user accounts.

Installing Hadoop

Finally we are at the point where we will perform manual installation of primary Hadoop daemons – HDFS and YARN. But before doing this we need to install another critical well-known dependency – Java. Both HDFS and YARN are Java-based daemons so we need to install this software into our environment. This process is very simple as we need to run just one command to perform the installation:

sudo apt-get install default-jdk

As you might noticed, we already used apt-get command before when we installed the ssh-server. This is a native Linux Ubuntu tool for connecting to remote repositories and uploading different program distribution packages from them. After downloading and installing java you can check that it is now available in our system by executing the following command:

java -version
openjdk version “1.8.0_111”
OpenJDK Runtime Environment (build 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14)
OpenJDK 64-Bit Server VM (build 25.111-b14, mixed mode)

From Linux perspective (and not only Linux) it is a best practice to have separate accounts and groups for different types of tasks. So for our Hadoop cluster we need to create a separate group and a separate user who will be the member of this group and will be responsible for Big Data functionality. Besides we need to have some place in our physical file system where we will install our Hadoop daemons and this place should be accessible by our Hadoop user. Lets create hadoop group and hdfs user for this purpose:

sudo addgroup hadoop

Adding group `hadoop’ (GID 1001) …

sudo adduser –ingroup hadoop hdfs

Adding user `hdfs’ …
Adding new user `hdfs’ (1001) with group `hadoop’ …
Creating home directory `/home/hdfs’ …
Copying files from `/etc/skel’ …
Enter new UNIX password:
Retype new UNIX password:

sudo adduser hdfs sudo

You can verify that user exists in the environment by the following command:

cat /etc/passwd | grep hdfs


In Hadoop cluster the interactions between nodes usually handled by means of ssh connections. Normally this procedure could require the input of the password (you probably did this before when connected to the OS through remote client). But for system users we need to use authentication through certificates. Following this approach our hdfs user will automatically be verified on each interaction without any additional password prompt. To generate such certificate for our Hadoop account we need to perform following steps:

su hdfs

 ssh-keygen -t dsa -P ” -f ~/.ssh/id_dsa

Generating public/private rsa key pair.
Enter file in which to save the key (/home/hdfs/.ssh/id_rsa):
Created directory ‘/home/hdfs/.ssh’.
Your identification has been saved in /home/hdfs/.ssh/id_rsa.
Your public key has been saved in /home/hdfs/.ssh/
The key fingerprint is:
SHA256:KS5KQAO4tD76WAsYhMdSrtDknWEWarix6Om20erdaZ0 hdfs@oerm-VirtualBox
The key’s randomart image is:
+—[RSA 2048]—-+
|o o =.           |
|+X = o           |
|B=X o            |
|*X.      .       |
|O     . S        |
|o=o  . .         |
|+++.. o .        |
|o*+o.o.E         |
|+*=..o           |

cat ~/.ssh/ >> ~/.ssh/authorized_keys

Now its time to get Hadoop distribution archive which is available on the official Apache community site. We will use wget Linux utility for downloading Hadoop version 2.6.0 installation. After that we will extract its content into new directory:


Connecting to (||:80… connected.
HTTP request sent, awaiting response… 200 OK
Length: 195257604 (186M) [application/x-gzip]
Saving to: ‘hadoop-2.6.0.tar.gz’

hadoop-2.6.0.tar.gz                                 100%[==================================================================================================================>] 186.21M  9.65MB/s    in 23s

2017-01-24 00:15:25 (8.16 MB/s) – ‘hadoop-2.6.0.tar.gz’ saved [195257604/195257604]

sudo mkdir /usr/local/hadoop

sudo tar xvzf hadoop-2.6.0.tar.gz -C /usr/local/hadoop/

sudo chown -R hdfs:hadoop /usr/local/hadoop

Now we need to perform configuration steps for our Hadoop cluster to run it in a Single Distribution Mode:

1). Modify bashrc script file: bashrc is initial bash script which is launched on every interactive shell session. Independent versions of this file exist for each individual user of the system. Inside the file we can define our global variables and locations to the applications which we want to make available in the environment. For our case we need map the location of Hadoop utilities so that they would become available directly from command line. I recommend to use vim editor for working with the files directly from bash shell . It could look really messy at first glance but by the time you will start feeling more and more comfortable with this amazing utility. You can find a separate section about vim editor in the Pluralsite courses I’ve mentioned before:

sudo apt-get install vim

sudo vim ~/.bashrc

Go to the end of the file, switch to the insertion mode and add the following configuration commands:

export HADOOP_INSTALL=/usr/local/hadoop/hadoop-2.6.0
export HADOOP_OPTS=”-Djava.library.path=$HADOOP_INSTALL/lib”

Then recreate the ssh connection. After this you’ll be able to run Hadoop commands directly from shell without specifying their full path.

2). Modify Hadoop uses this bash script file to define some custom environmental variables and affect daemons behavior. We need to update this script with correct reference to Java home catalog in this file:

update-alternatives –config java

There is only one alternative in link group java (providing /usr/bin/java): /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

vim /usr/local/hadoop/hadoop-2.6.0/etc/hadoop/

Replace the line export JAVA_HOME={JAVA_HOME} with correct reference:

export JAVA_HOME=’/usr/lib/jvm/java-8-openjdk-amd64

3). Modify core-site.xml: this resource file is used by Hadoop on the launch stage. It contains global Hadoop properties which are used by different daemons. We need to set the name and address of our Hadoop file system here:

vim /usr/local/hadoop/hadoop-2.6.0/etc/hadoop/core-site.xml

4). Modify hdfs-site.xml: this resource file contain configuration data of our HDFS. We need to override the default replication factor of our data in file system to one to avoid overhead in writing the data several times for the Single Distributed Mode:

vim /usr/local/hadoop/hadoop-2.6.0/etc/hadoop/hdfs-site.xml


At this stage we can perform the launch of HDFS file system. To do this we need first to  format of our NameNode using hadoop tool:

hadoop namenode -format

17/01/24 23:21:07 INFO common.Storage: Storage dhdfirectory /tmp/hadoop-hdfs/dfs/name has been successfully formatted.

After this launch the NameNode and DataNode daemons using command:


17830 NameNode
18135 SecondaryNameNode
18237 Jps
17967 DataNode

I used jps command to list JVM on my Linux system. Here we can see three HDFS-related daemons – NameNode, DataNode and SecondaryNameNode. I will not go deep into describing the functions of the SecondaryNameNode, just want to mention that at this stage we have or HDFS file system running and ready for work.To test this we can put some file from Linux file system to Hadoop file system using Hadoop utility called hdfs:

hdfs dfs -put /usr/local/hadoop/hadoop-2.6.0/LICENSE.txt /

hdfs dfs -ls /

Found 1 items
-rw-r–r–   1 hdfs supergroup      15429 2017-01-24 23:55 /LICENSE.txt

Hadoop file system is looks quite similar to the Linux one. In generally it follows POSIX specifications, but trades off some of their requirements for extra performance benefits. Many native Linux commands for working with the Hadoop file system are implemented through hdfs shell utility. So in order to show the content of the file we can apply cat command through hdfs utility:

hdfs dfs -cat /LICENSE.txt

So we have our Hadoop file system up and running and now it is time to switch to its second component – YARN. But we’ve already done all required preparations so we can just run the shell script and check that our Resource Manager and Node Manager daemons will appear on stage:


18897 NodeManager
19154 ResourceManager
19461 Jps
17830 NameNode
18135 SecondaryNameNode
17967 DataNode

Congratulations!!! You’ve just implemented Hadoop distribution personally. Another way to check that things are running fine is to verify the WebUI provided by both HDFS and YARN. They are accessible from following addresses:

HDFS: http://localhost:50070/


YARN Resource Manager: http://localhost:8088

resource manager.png

If you remember we’ve already uploaded a file into our file system through hdfs Hadoop utility. Now lets try to perform some logic upon this file. Word counting is the common task which you can often meet in Hadoop realm – you need to identify unique words in some document and calculate their total count.  We will try to resolve this task in our standalone cluster using YARN component which is configured run MapReduce jobs.

MapReduce is a functional programming paradigm which states the each job can be performed through a number of relatively small tasks. Each task usually contains two phases – map and reduce. Map performed by the mapper would generally loop through the input data and parse it into key-value pair collection. Then reduce phase performed by reducer will group and sort this collection by the key and loop through this result set,  performing some logic upon the collection of values per each unique key. In our word counting example mapper reads input file line by line, splits each line into words and generates key-value pair collection as an output for the reducer. The key of this collection would be the word and the value would the the number 1. Then reducer groups the collection by these words and summarizes their values. Lets say we have a string:

My name is Alex and this is my blog

Mapper stage will create the following output:


Reducer stage will prepare the data:


The output of the job will be:


Hadoop distribution includes number of examples which are available from share directory. Word counting example exists in hadoop-mapreduce-examples-{version}.jar file. Lets launch it using hadoop utility and specify out LICENSE.txt file in HDFS as an input data for the MapReduce job:

hadoop jar /usr/local/hadoop/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /LICENSE.txt /output

This command will use wordcount class from this package and will accept LICENSE.txt as an input parameter. Besides we specify /output a directory for the results our our job. We should have an output for this command which would give us some general information about its execution:

17/01/25 18:27:23 INFO mapreduce.Job:  map 100% reduce 100%
17/01/25 18:27:23 INFO mapreduce.Job: Job job_local321869147_0001 completed successfully
17/01/25 18:27:23 INFO mapreduce.Job: Counters: 38
        File System Counters
                FILE: Number of bytes read=562980
                FILE: Number of bytes written=1076028
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=30858
                HDFS: Number of bytes written=8006
                HDFS: Number of read operations=13
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
        Map-Reduce Framework
                Map input records=289
                Map output records=2157
                Map output bytes=22735
                Map output materialized bytes=10992

Now we can look through actual output of our job which should exist in the location which we’ve specified in the command:

hdfs dfs -cat /output/part-r-00000

“AS     4
“Contribution”  1
“Contributor”   1
“Derivative     1
“Legal  1
“License”       1
“License”);     1
“Licensor”      1
“NOTICE”        1
“Not    1
“Object”        1
“Source”        1
“Work”  1
“You”   1
“Your”) 1
“[]”    1
“control”       1
“printed        1
“submitted”     1
(50%)   1
(C)     1
(Don’t  1


In this article I tried to bring you the general idea of what Hadoop really is. Of cause this is very little part of the huge world of Big Data and probably guys from Microsoft would find these techniques a bit complicated and messy. But from my personal experience I can say that by the time you start feeling more and more comfortable in this environment and each day things are starting to get more and more sense. In my next articles I will try to share my experience in working with such Hadoop tools like Hive, HBase, Spark, Storm, Ranger, Ambari and other products from Hadoop stack. Hope you’ll enjoyed it.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s