Exploring Hadoop file system

HDFS is a core and fundamental component of Hadoop. This file system is oriented on handling huge amounts of data. From first glance you may not notice much differences from usual Linux file system as it follows lots of POSIX specifications. But behind the scene HDFS does a lot of extra work to provide stable and quick access to the data which is stored across different machines of distributed cluster. It is indeed a great data management mechanism which takes all responsibility for building most optimal data-flows according to the network topology of the cluster and which performs automatic handling of critical situations related to the breaks in hardware. In this article I’ll try to give a general overview of Hadoop file system and show some common techniques of working with the data inside it.

 Master-slaves data storage model

If we look at some generic file system model from the high technical level we can point out two main components – addressing table and actual data containers. In Hadoop they are represented by the NameNode and a collection of DataNodes who communicate with each other using TCP protocol.

NameNode plays the role of gateway for all Hadoop clients on every new operation with the data. It knows where all information is been located within the cluster by keeping and managing a special addressing component called Namespace. It consists of an image of entire file system called FsImage represented by a single file and journal log of all changes called LogEntry. On startup NameNode service reads latest fsImage file and applies all recent changes from logEntry to get up-to-date state of the file system. Then this image is been loaded into memory. On every request NameNode will use it to provide end clients with the exact location of the data within the cluster. By the time the changes in file system will create a set of new entries in the logEntry component and at some stage NameNode will perform another merge operation with latest fsImage file which will create its new up-to-date copy. This procedure is called Checkpointing. 

Datanode is a physical carrier of the information. This service stores all the data within special file blocks which are physically located on local operation system.The typical size of each block equals 64MB or 128 MB, but this value is configurable through HDFS settings. In general DataNode knows nothing about entire file system. Its main purpose is localized in data storage and synchronization of own state with NameNode. This is achieved by sending a special type of message called BlockReport at some intervals of time. This report contains the information about all available blocks in original DataNode. NameNode will compare this information with its own data and will reconcile the correct state of the system. Besides each 3 seconds (by default) DataNode sends a special signal to NameNode called heartbeat which confirms its availability and accessibility. Default replication factor in HDFS equals 3 and this means that each unit of information will have 3 independent copies on different nodes across the cluster. When heartbeat signal disappears from certain host for some period of time, Hadoop starts restoration process. It begins the replication of all data which existed in missing node using the copies of available instances. So if something goes wrong with any machine of the cluster, there always will be another two instances available to provide the data to the clients and in short term perspective cluster will recreate the 3rd one.

Investigating local HDFS

Hadoop provides a set of commands for working with HDFS file system. Most of them are available through a special hdfs utility located at /usr/hdp/current/hadoop-hdfs-client/bin/ catalog of the cluster. Besides there are two important configuration files located at /usr/hdp/current/hadoop-client/conf which contain the custom settings of the file system:

  • core-site.xml:

fs.defaultFS  – this property holds the actual address of the file system which is used by the clients to work with HDFS

  • hdfs-site.xml:

dfs.namenode.name.dir – location of FsImage and LogEntry files used by the Namenode

dfs.datanode.data.dir – location of block files where DataNode keeps the information

dfs.blocksize – average size of file block used by the DataNode to keep the information

dfs.replication – the number of replications which each unit of information will have in HDFS

dfs.heartbeat.interval – interval of time DataNode send signal to the NameNode to confirm that it is available and accessible

Now lets play around with these settings to understand how HDFS really works. Go to your hdfs-site.xml and find the dfs.namenode.name.dir setting value to figure out the actual location of the file:

cat /usr/hdp/current/hadoop-client/conf/hdfs-site.xml | grep “<name>dfs.namenode.name.dir</name>” -A1 | tail -n 1

<value>/hadoop/hdfs/namenode</value>

ll /hadoop/hdfs/namenode/current

hdfs hadoop 30030 Oct 25 07:49 edits_0000000000000000001-0000000000000000321
hdfs hadoop 1048576 Oct 25 08:15 edits_0000000000000000321-0000000000000005482
hdfs hadoop 1048576 Jan 29 19:32 edits_0000000000000005483-0000000000000005725
hdfs hadoop 5242880 Jan 31 02:02 edits_0000000000000005726-0000000000000039169
hdfs hadoop 9437184 Feb 5 05:35 edits_inprogress_0000000000000039170
hdfs hadoop 80052 Jan 29 19:18 fsimage_0000000000000005482
hdfs hadoop 83142 Feb 2 23:37 fsimage_0000000000000039169

Here we can see the actual state of our NameNode and can easily analyze its history. First four files of this output represent old edit logs of LogEntry component. Fifth file contains the changes set since last NameNode checkpoint. And two last files are FsImage instances  which contain the actual state of file system at some period of time. Internally they keep the pointers to the DataNodes and their content blocks. Now lets restart your NameNode. This can be done using stop-dfs.sh and then start-dfs.sh commands but I prefer using Ambari for this purpose:

art4ambarinn

Lets try to look at the changes in NameNode catalog:

ll /hadoop/hdfs/namenode/current

hdfs hadoop 30030 Oct 25 07:49 edits_0000000000000000001-0000000000000000321
hdfs hadoop 1048576 Oct 25 08:05 edits_0000000000000000322-0000000000000004921
hdfs hadoop 8917 Oct 25 08:09 edits_0000000000000004922-0000000000000004992
hdfs hadoop 1048576 Oct 25 08:15 edits_0000000000000004993-0000000000000005482
hdfs hadoop 1048576 Jan 29 19:32 edits_0000000000000005483-0000000000000005725
hdfs hadoop 5242880 Jan 31 02:02 edits_0000000000000005726-0000000000000039169
hdfs hadoop 9437184 Feb 5 05:54 edits_0000000000000039170-0000000000000098632
hdfs hadoop 1048576 Feb 5 05:56 edits_inprogress_0000000000000098633
hdfs hadoop 83142 Feb 2 23:37 fsimage_0000000000000039169
hdfs hadoop 90945 Feb 5 05:54 fsimage_0000000000000098632

After last reboot operation NameNode synchronized the state of old fsimage_0000000000000039169 FsImage file with the latest changes from edits_inprogress_0000000000000039170 log file and created a new checkpoint for HDFS – the fsimage_0000000000000098632 fileBesides it started a new log file called edits_inprogress_0000000000000098633. 

This operation alternatively can be implemented by a set of commands without NameNode restart:

hdfs dfsadmin safemode enter

hdfs dfsadmin saveNamespace

hdfs dfsadmin safemode leave

But lets take a look into the content of our FsImage file:

cat /hadoop/hdfs/namenode/current/fsimage_0000000000000098632

…▒”P▒▒azure-storage-2.2.0.jar”ʾ▒▒▒*▒▒▒▒▒* ▒▒▒@)▒2 む▒▒▒▒aws-java-sdk-core-1.10.6.jar”վ▒▒▒*;▒▒▒* ▒▒▒@)▒2 䂀▒▒$hadoop-distcp-2.7.3.2.5.0.0-1245.jar”޾▒▒▒*׾▒▒▒* ▒▒▒@)▒2 傀▒▒▒commons-lang3-3.3.2.jar”▒▒▒▒*▒▒▒▒* ▒▒▒@)▒2 悀▒▒guava-11.0.2.jar”▒▒▒▒▒*▒▒▒▒* ▒▒▒@)▒2…

You will probably be able to see some fragments of filenames with other system related data but the most part this information will not be clear for the end user. To make it more similar to some human readable format you can decode it into a separate file on the local file system, fs.xml in our case, using hdfs oiv command:

hdfs oiv -p XML -i /hadoop/hdfs/namenode/current/fsimage_0000000000000098632 -o /fs.xml

cat /fs.xml

There is another command which allows you to perform same decoding operation upon log files – hdfs oev:

hdfs oev -p XML -i /hadoop/hdfs/namenode/current/edits_0000000000000004922-0000000000000004992 -o /fslog.xml

cat /fslog.xml

System metadata is very interesting information from different points of view. But without actual data this information is useless, so lets now try to find this piece within our DataNode. Such task could be a bit more exciting in the real distributed environment with some higher replication factor, but in our sandbox everything is located in the single place without extra copies. So in order to find the location of actual data in the DataNode we can use hdfs-site.xml configuration file mentioned earlier:

cat /usr/hdp/current/hadoop-client/conf/hdfs-site.xml | grep “<name>dfs.datanode.data.dir</name>” -A1 | tail -n 1

<value>/hadoop/hdfs/data</value>

 find /hadoop/hdfs/data/current/ -maxdepth 3

current
current/VERSION
current/BP-1186293916-10.25.5.169-1427746975858
current/BP-1186293916-10.25.5.169-1427746975858/current
current/BP-1186293916-10.25.5.169-1427746975858/current/VERSION
current/BP-1186293916-10.25.5.169-1427746975858/current/finalized
current/BP-1186293916-10.25.5.169-1427746975858/current/rbw
current/BP-1186293916-10.25.5.169-1427746975858/current/dfsUsed
current/BP-1186293916-10.25.5.169-1427746975858/dncp_block_verification.log.prev
current/BP-1186293916-10.25.5.169-1427746975858/tmp

This structure seems to be more complicated comparing to NameNode catalog but I want you to focus on the selected  folder. This is the actual container of the file blocks. It has typical tree structure – self-balancing three level tree hierarchy which keeps the data as a leafs of the bottom level. Every block of data is represented by a two files – data container and metadata:

ll /hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized/subdir18/subdir48

-rw-r–r– 1 hdfs hadoop   19 Oct 24 10:54 blk_1074933958_1193680.meta
-rw-r–r– 1 hdfs hadoop 1076 Oct 24 10:54 blk_1074933960
-rw-r–r– 1 hdfs hadoop   19 Oct 24 10:54 blk_1074933960_1193682.meta
-rw-r–r– 1 hdfs hadoop 1085 Oct 24 10:54 blk_1074933962
-rw-r–r– 1 hdfs hadoop   19 Oct 24 10:54 blk_1074933962_1193684.meta
-rw-r–r– 1 hdfs hadoop 1071 Oct 24 10:54 blk_1074933964

If you look inside of some blk file you will see the actual content of the file:

cat /hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized/subdir18/subdir43/blk_1074932480

. 33 moreW▒▒▒2016-10-24 10:31:31,692 INFO  [LeaseRenewer:hbase@localhost] retry.RetryInvocationHandler: Exception while invoking renewLease of class ClientNamenodeProtocolTranslatorPB after 4 fail over attempts. Trying to fail over immediately.W▒▒▒Qjava.io.IOException: Failed on local exception: java.io.IOException: Couldn’t setup connectionconnection;org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)W▒▒▒7 at org.apache.hadoop.ipc.Client.call(Client.java:1431)

As you may think it would be quite challenging to map the data from NameNode to the DataNode manually. But Hadoop heavily simplifies this process as it provides a set of tools which helps us to do this. Lets create some file in HDFS and instead of messing with the metadata manually lets ask our NameNode where we can find this file by using hdfs fsck command:

echo “Hello world” >> test.txt

hdfs dfs -put test.txt /tmp/

hdfs fsck /tmp/test.txt -files -blocks

/tmp/test.txt 12 bytes, 1 block(s):  OK
0. BP-1186293916-10.25.5.169-1427746975858:blk_1075191146_1451047 len=12 repl=1

Status: HEALTHY
 Total size:    12 B
 Total dirs:    0
 Total files:   1

Now lets go to this location to try to find the block blk_1075191146 within DataNode catalogs and read its content:

find /hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized -name ‘blk_1075191146*’

/hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized/subdir22/subdir29/blk_1075191146_1451047.meta
/hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized/subdir22/subdir29/blk_1075191146

cat /hadoop/hdfs/data/current/BP-1186293916-10.25.5.169-1427746975858/current/finalized/subdir22/subdir29/blk_1075191146

Hello world

Congratulations, we found our data and we managed to access it directly from Linux environment. Now you know how HDFS works at the low level.

Accessing HDFS though APIs

Using hdfs command line utility could be quite handy at different situations but when you start developing some standalone applications which have to interact with HDFS directly, the best option would be to use some native Hadoop interface like Java API. It allows to handle most commonly used operations upon files like create, delete or search.

To make such type of application in source code we need first to setup the configuration part and then to initialize the instance of file system. Besides we need to add dependency on org.apache.hadoop.hadoop-client package which contains all required functionality. So lets create new Maven application and try to do some basic operations with the file system. Don’t forget to add Hadoop client dependency into pom.xml file:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
</dependency>

In source code we will first create configuration for our file system and then will make new file in HDFS:

public static void main(String[] args) {
    Configuration conf = new Configuration();
    Iterator<Entry<String, String>> iterator = conf.iterator();
    while (iterator.hasNext()) {
       Entry<String, String> entry = iterator.next();
       System.out.println(entry.getKey() + ":" + entry.getValue());
    }
    FileSystem fs = FileSystem.get(conf);
    InputStreamReader reader = new InputStreamReader(fs.open(new Path("/tmp/test.txt")));
    BufferedReader br = new BufferedReader(reader);
    StringBuffer text = new StringBuffer();
    String line;
    line = br.readLine();
    while (line != null) {
       text.append(line + "\n");
       line = br.readLine();
    }
    reader.close();

    text.append("This is new File\n");
    FSDataOutputStream output = fs.create(new Path("/tmp/test2.txt"));
    output.write(text.toString().getBytes());
    output.close();

    System.out.println(text.toString());
}

When the code is ready, create jar package and launch it from the cluster. You can verify that it works by checking the content of /tmp catalog in hdfs with new file test2.txt.

This code reads the file from HDFS system, adds new line at the end of its content and saves this information into the new file. The logic is quite primitive and straightforward. But one interesting moment I want to point out here is the configuration part. When you try to debug it from local machine, you can probably see completely different output from the iterator which loops through all configuration properties comparing to the output of the instance in the cluster environment. The reason is that when you create Configuration object, it examines application’s classpath and searches Hadoop configuration files. We’ve already met some of them before – site-core.xml and hdfs-core.xml. If application succeeds to find these files, it will load all the properties inside the instance of the class and uses them to properly configure the state of FileSystem proxy object. To make these dependencies available from local environment you can manually copy them into resources catalog. After this they will become the part of local classpath and Configuration constructor will be able to find them.

Another category of dependencies which is mandatory for normal execution of the package is the native libraries dependencies. In our case it is  org.apache.hadoop.hadoop-client. This package has its own tree of dependencies and in order to run our application successfully it should be able to use them all. User can either provide them within a single jar file called Uber jar using one of Maven plugins or include them directly into the package. The solution would also work if they would exist in application’s classpath. As an option you can also launch your jar package using hadoop utility which internally has all necessary configurations to find required libraries for working with hdfs:

hadoop jar target/hdfs-0.0.1-SNAPSHOT.jar demo.hdfs.myApp

Secondary NameNode and StandBy NameNode

If you look into the HDFS service panel in Ambari you may notice such additional components as Secondary NameNode and StandBy NameNode. This is a lot of confusion about these two parts of Hadoop especially for people who started to work with Big Data recently. I want to clarify the differences between them from the early start as they are playing completely different roles in the work of the cluster:

Secondary NameNode: if you remember when I described the work of NameNode, I mentioned the stage which performed the operation of merging latest fsImage file instance with the edit logs. The name of this procedure is called Checkpointing. In real life such operation can be quite time consuming and resource expensive. It could make a real negative impact on the work of the NameNode service and the whole cluster. That is why at some stage there was a decision to split these two activities among separate applications. Usually administrators install Secondary NameNode on different to Active NameNode machine as this service can use significant amounts of memory to handle Checkpoint operations. But in general such step can greatly improve the work of the cluster by making it more stable and more reliable due to reduce of pressure on the key component of Hadoop environment.

StandBy NameNode: in old versions of Hadoop NameNode was a single point of failure as when it went down nobody could provide the clients with the information about the actual file locations in the DataNodes anymore. In worst scenarios the failure of this service led to the loose of all data within the cluster. That is why Hadoop community invented a backup mechanism called StandBy NameNode. This service mirrors the functionality of primary NameNode from different host. The mandatory requirement for such model currently demand from for both NameNodes of the cluster to keep the edit log files in a single shared space, so that they both could simultaneously merge them with their fsImage instances. In case of failure of Active NameNode StandBy NameNode will have up-to-date state of file  system. It will take the role of primary service and will starts provide the clients with all necessary information.

 

One thought on “Exploring Hadoop file system

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s