Coordination of distributed applications through Zookeeper

When I started to work in scope of Big Data, one of the most challenging things for me was to understand the distributed nature of Hadoop applications. Usually most part of software developers think about their products in terms of single program components which are represented by standalone applications. Every program is an independent monolith unit which is located in own machine, runs in its own process and responsible for custom range of tasks. Big data introduces another level of composition, where every single program can be distributed across the nodes of the cluster as set of standalone services driven by some master service. The development of such model creates new challenges related to the synchronization these services and handling the consistent state of the overall application. This problem is common for every distributed system and instead of reinventing custom solution for each particular product of Hadoop family, community created a universal tool called Zookeeper. In this article I want to give you overview of this application and show some examples of working with it.

If you look into the design overview of different Hadoop components you will see that most of them are represented as a number of services located on different physical machines and driven by one or two master daemons. For example Hbase consists of Master Server which keep the overall metadata and end data containers called Region Servers. Storm framework is represented by the Nimbus which manages all running tasks through slave Supervisor services. Zookeeper is used by such types of Hadoop distributed applications to synchronize the work between individual parts.

In order to understand why we really need Zookeeper in Hadoop lets take a look at the common problems of distributed applications:

  • Failure of the services  –  at every moment we should expect that either master or any slave daemon can fail. We need to be able to recover the overall state of the application as soon as possible without any loose of the data
  • Work balancing – we may need to equally balance all incoming tasks among the slave nodes of the application. In this case master service should be able to identify the available capacities of the worker services in the real time
  •  Metadata storage – we should be able to keep the summary information about the task assignments, their progress and should allow end consumers to read this information

As you can see all these problems underline the necessity of keeping some fault tolerant context data outside the scope of the service daemons. Zookeeper provides a place for such storage and universal API for accessing it. From high level in most simple implementation it is a service. In real environments Zookeeper is usually extended to a couple of services which replicate the content between each other to provide fault tolerance.

Zookeeper structures the data into a tree hierarchy similar to Linux file system. Every piece of information is represented by data register called znode which is an equivalent of file system catalog that can store information. Every application which uses Zookeeper usually creates custom root znode on the top level. Then service daemons create their own znodes within parent root node and maintain them by providing the up-to-date information about their state. Here is the example of typical model for some service:

part10Zk.png

As you’ve probably noticed znodes are the primary parts in this system model. Zookeeper provides clients with an API for create, update and delete these items. There are two types of znodes:

  • Persistent nodes – clients create these units to to keep some consistent information about the overall state of the application. The lifetime of such node starts from the create operation and ends with delete operation. The stop or failure from the side of original clients does not effect this data. The deletion of the node can be performed by any authorized client through API.
  • Ephemeral nodes – nodes are relevant to the lifetime of the session established by the client to the Zookeeper. Once then connection ends either due to stop or failure, Zookeeper detects this and deletes the node.

Besides both persistent and ephemeral nodes can be extended to be sequential, so that they would additionally contain incremental integer number which would be appended to the path of znode making it unique.

Every client can subscribe a watcher for listening the changes of znode. This is common publisher-subscriber pattern, where all updates within the original node are forwarded to all subscribed clients as notifications. Besides every change of znode effects its version which is verified before before each new update. This guarantees that every client would interact with correct data and would keep it in consistent state.

Accessing Zookeeper through Native ZkCli shell

In HortonWorks sandbox original service content is located in the following catalog:

ll /usr/hdp/2.5.0.0-1245/zookeeper/

Zookeeper usually opens port 2181 for the clients to access its API. The Configuration of service is available in zoo.cfg file located in the conf catalog of the service:

cat /usr/hdp/current/zookeeper-server/conf/zoo.cfg

clientPort=2181
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=/hadoop/zookeeper
autopurge.snapRetainCount=30
server.1=sandbox.hortonworks.com:2888:3888

To check that service is up and running we can use telnet protocol to send special ruok (are you ok) command and receive imok response in case of its normal state:

telnet 127.0.0.1 2181
Trying 127.0.0.1…
Connected to 127.0.0.1.
Escape character is ‘^]’.
ruok
imokConnection closed by foreign host.

Zookeeper Command Line Interface is the primary tool for direct iterations with the service. We can use it to connect to the server and to perform most commonly used operation upon the Zookeeper tree content:

/usr/hdp/current/zookeeper-client/bin/zkCli.sh -server 127.0.0.1:2181

 [zk: 127.0.0.1:2181(CONNECTED) 0]

  • ls – list content:

 [zk: 127.0.0.1:2181(CONNECTED) 1] ls /

[registry, brokers, storm, zookeeper, infra-solr, hbase-unsecure, admin, isr_change_notification, controller_epoch, hiveserver2, consumers, config]

  • create – create znode:

[zk: 127.0.0.1:2181(CONNECTED) 1] create /demoservice /

Created /demoservice

  • get – get information about znode:

get /demoservice

cZxid = 0x394
ctime = Sun Mar 26 22:25:54 UTC 2017

dataVersion = 0
ephemeralOwner = 0x0
dataLength = 1

  • set – updates the data within znode:

set /demoservice “Hello”
cZxid = 0x394
ctime = Sun Mar 26 22:25:54 UTC 2017
dataVersion = 1
dataLength = 7
[zk: 127.0.0.1:2181(CONNECTED) 18] get /demoservice
“Hello”
cZxid = 0x394
ctime = Sun Mar 26 22:25:54 UTC 2017 …

  • delete – removes znode from the tree:

delete /demoservice

Usage of Zookeeper by the Hadoop services:

As I’ve mentioned before a number of Hadoop services already use Zookeeper to store the data. If we look into the content of the root catalog we will see a number of catalogs which are used by different Big data applications:

[zk: 127.0.0.1:2181(CONNECTED) 4] ls /
[registry, controller, brokers, storm, zookeeper, infra-solr, hbase-unsecure, admin, isr_change_notification, controller_epoch, hiveserver2, consumers, config]

Kafka: Kakfa message broker uses Zookeeper to keep information about the topics, consumers and elected controller:

Brokers metadata, topic registration and partitions info:

[zk: 127.0.0.1:2181(CONNECTED) 52] ls /brokers
[ids, topics, seqid]
[zk: 127.0.0.1:2181(CONNECTED) 73] get /brokers/ids/1001
{“jmx_port”:-1,”timestamp”:”1490568133059″,”endpoints”:[“PLAINTEXT://sandbox.hortonworks.com:6667″],”host”:”sandbox.hortonworks.com”,”version”:3,”port”:6667}
[zk: 127.0.0.1:2181(CONNECTED) 53] ls /brokers/topics
[ATLAS_HOOK, ATLAS_ENTITIES]
[zk: 127.0.0.1:2181(CONNECTED) 54] get /brokers/topics/ATLAS_HOOK
{“version”:1,”partitions”:{“0”:[1001]}}

Kafka leading Controller registration:

[zk: 127.0.0.1:2181(CONNECTED) 79] get /controller
{“version”:1,”brokerid”:1001,”timestamp”:”1490568132898″}

Kafka data consumers:

[zk: 127.0.0.1:2181(CONNECTED) 89] ls /consumers
[ranger_entities_consumer, atlas]

Hbase: HBase uses Zookeeper to keep the data about the master server, region servers, regions, namespaces, locks, transitions, replications and some other metadata. In some versions of cluster data is been stored in /hbase catalog. In HortonWorks /hbase-unsecure is used for non-Kerberized version of the cluster and /hbase-secure – for Kerberized one. This information is used by Master daemon, Region Server daemons and HBase clients

ls /hbase-unsecure
[replication, meta-region-server, rs, splitWAL, backup-masters, table-lock, flush-table-proc, region-in-transition, online-snapshot, switch, master, running, recovering-regions, draining, rolllog-proc, namespace, hbaseid, table]

Master information:

get /hbase-unsecure/master
{�master:16000��t�▒▒d�PBUF#sandbox.hortonworks.com�}����+�}

Region servers:

get /hbase-unsecure/rs
[sandbox.hortonworks.com,16020,1490569542725]

Tables:

[zk: 127.0.0.1:2181(CONNECTED) 107] ls2 /hbase-unsecure/table
[ATLAS_ENTITY_AUDIT_EVENTS, hbase:meta, iemployee, hbase:namespace, atlas_titan, hbase:backup, hbase:acl]

Hive: Hive uses Zookeeper to provide high availability and balancing of the primary client API HiveServer2 application. In case when it is extended to a multiple services, Zookeeper is used to track the available instances and to provide clients with this information (see HiveServer2 through Zookeeper)

Information about running instances of HiveServer2:

[zk: 127.0.0.1:2181(CONNECTED) 117] ls /hiveserver2
[serverUri=sandbox.hortonworks.com:10000;version=1.2.1000.2.5.0.0-1245;sequence=0000000001]

Storm: Storm framework uses Zookeeper to coordinate the work of Nimbus with the running instances of Supervisors, perform the assignments of new topologies and handle the fault tolerant policy:

[zk: 127.0.0.1:2181(CONNECTED) 122] ls /storm
[assignments, backpressure, nimbuses, logconfigs, leader-lock, storms, errors, supervisors, workerbeats, blobstore]

Information about running Nimbuses:

[zk: 127.0.0.1:2181(CONNECTED) 124] get /storm/nimbuses
[sandbox.hortonworks.com:6627]

Information about running Supervisors:

[zk: 127.0.0.1:2181(CONNECTED) 129] ls2 /storm/supervisors
[dd5488f3-0f28-46c3-aed8-4b8dd837ea84]

Kafka spouts create znodes to reconsile the work: in kafka spout configution we have to provide the information about zkRoot and spoutId which would represent the path in the Zookeeper hierarchy tree. Storm workers will use this node to track the actual offset of the spout:

[zk: 127.0.0.1:2181 (CONNECTED) 2] get /zkRoot-demo/spoutId-1/partition_0
{“topology”:{“id”:”video-topology-122-1488797968″,”name”:”video-topology”},“offset”:8076,”partition”:0,”broker”:{“host”:”127.0.0.1″,”port”:6667},”topic”:”videotopic”}

HDFS: Hadoop File System uses Zookeeper to provide High availability of NameNode service. In case when Active NameNode is extended with StandBy NameNode. Special components called ZKFailoverControllers are also installed on target machines. ZKFC periodically pings the local NameNode and holds ephemeral node in the Zookeeper in case of its normal work. Once primary service becomes unavailable the node is dropped by controller and Zookeeper performs another election for new active NameNode:

[zk: 127.0.0.1:2181 (CONNECTED) 3] ls /hadoop-ha/alexcluster
[ActiveBreadCrumb, ActiveStandbyElectorLock]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s