Perfect Storm – real-time data streaming from .NET through Kafka to HBase, HDFS and Hive

In my previous articles I tried to give the overview of primary Hadoop services responsible for storing the data. With their help we can organize information into some common structures and perform operations upon them through such tools like MapReduce jobs or more high-level abstractions like Hive SQL or HBase querying language. But before doing this we certainly need to somehow put our data inside the cluster. The simplest way would be a copying of the information between the environments and  performing a set of commands from different service-related CLIs  to do the import or launching some bash scripts which can partly automate this work. But it would be great if we could have some common tool which would allow us to define different workflows for such processes so that single units of information could be imported, transformed, aggregated or passed through some algorithm before actual preservation. Such type of framework certainly should be scalable and should follow the general requirements of distributed environment. In Hadoop we have such tool called Storm and from my point of view this product is probably one of the most interesting and exciting parts of Big Data ecosystem. In this article I want to give you its overview and to share my experience of using it.

Storm initially appeared in 2011 as a standalone product developed by the company called BackType. In 2014 it migrated to Apache community and became its one of Top-Level Projects. Since that time it had a couple of stable release and nowadays you can meet it in most distributions of official Hadoop vendors. Software developers usually use Storm to encapsulate some real-time processing workflows inside the Hadoop. Data import is one of such scenarios. Using Storm we can receive information from external sources, handle different algorithms upon it and either save it inside the cluster or send results back to the end consumers. The main goal of Storm is to provide a generic real-time processing mechanism for creation of such programs and running them across the nodes of the cluster in reliable fashion.

From technical perspective Storm operates with developer workflow applications called Topologies. Each topology contains a set of components which are divided into two categories:

  • Spouts – represent data source for the topology. In Java world it is a custom class which implements IRichSpout interface. Storm provides a set of ready for use spouts in standard distribution which allow to read data from such popular sources like Kafka message broker, Kestrel queue or HDFS file system. The main goal of this component is to read data from source, convert it into tuple and send it to next unit called Bolt. Spouts can be reliable or non-reliable. Reliable instances will emit the data and will wait for the acknowledgement from all bolts about successfulness of the processing of the tuples. Otherwise spouts will resubmit information again after some period of time. Non-reliable spouts just emit the data without bothering about its handling on the next stages.
  • Bolts – represent a single unit of logic upon the data. In Java bolt is a class which implements IRichBolt interface or extends  BaseBasicBolt class. As an example bolts can save incoming tuples directly into the Hadoop storage services like HDFS, HBase or Hive. Developers can create their own logic by implementing the methods of base bolt interfaces and classes.

Spouts and bolts are core components of Storm and altogether they allow to create complicated conditional forking pipelines. In simple case they are stateless, so that every tuple which processes through the workflow has no relations with other tuples. But more advanced scenarios in Storm allow to keep some global processing state of the data driven by topology. In this case tuples will be handled in batches and developers will be able to implement such operations upon them like aggregation, iteration or grouping in different parts of the pipelines. Such workflows called TridentTopologies.

Storm design

Storm follows a common master-slave model typically used in many Hadoop services. Behind the scene it actively uses Zookeeper coordinator service for reconciling the work of its components. Framework consists of the following components:

  • Nimbus – stateless master Storm service. It is responsible for validation of the topologies and their submitting to the slave nodes called Supervisors. Nimbus consistently tracks the healthy state of the Supervisors and their running tasks. If service goes down it will not affect any running processes as they would take place on the slave nodes.
  • Supervisor – slave Storm service which receives the assignments of the topologies from master node and creates end workflow instances called Workers. Supervisor can schedule a limited number of such workers which is relevant to the supervisor.slots.ports property of its configuration. Technically after assignment of the worker in case of crash of the Supervisor it should not have impact on the running process. But if the original service will not be able to recover for some period of time, Nimbus will resubmit the workflow into another Supervisor.
  • Worker – JVM process which wraps and runs the workflows of spouts and bolts. Each worker uses standalone port. In case of critical exception during the processing of the tuples Supervisor will resubmit worker again. By default worker assigns one thread called executor with one container allocated for this executor called task. Increasing the number of executors within the worker improves the level of parallelism of the topology by running bolts and spouts in separate threads. Tasks meanwhile play a another specific role – they are used for testing and live rebalancing of the running applications.
  • Storm UI – Web Application which allows to track the state of the topologies running in the cluster. Portal provides a general information about running tasks on the main page. More detailed information is secured through ACL authorization. Each topology in topology.users property can define custom access list of users which will have the permissions to access details. Usually Storm UI is hosted on the same node as Nimbus service.
  • DRPC Server – Storm process which allows to consume topologies through remote procedure calls to perform some complicated operations. Server receives the requests from clients, processes them through the running topologies and sends results back to clients

Here goes the high-level architecture diagram of the architecture of the application:

part7storm

Now I want to show how to create topologies and process data though them. In our first topology we are going to subscribe to message broker called Kafka which is the part of Hadoop distribution, get the data from it, perform our favorite word counting operation upon this information and save the results to HDFS, HBase and Hive.

Preparations

Before implementation of the workflow lets make some preparation steps which will allow us to process the data through this pipeline:

  • Open Kafka port on SandBox – this step is relevant to the versions of Sandbox where Kafka 6667 port is not available from the hosting environment. This problem existed in the latest versions of Sandboxes which hosted Hadoop within Docker container. To open the port we need to perform the steps described in my answer to this question on HortonWorks portal
  • Create new topic in Kafka service – using Kakfa CLI lets create a simple topic with one partition and single replication on our server:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stormwc
 
  • Create new catalog in HDFS for output results:
 hdfs dfs -mkdir /stormwc
  • Create a table in Hive for output results:
hive -e 'CREATE TABLE stormwc (filename STRING, words STRING, count BIGINT)
 CLUSTERED BY (words)
 INTO 8 BUCKETS
 STORED AS ORC
 TBLPROPERTIES ("transactional"="true")';
  • Create a table in HBase for output results:
echo  'create "stormwc","cfwc"' | hbase shell

Creation of the topology

Now when our external resources are ready we can start writing our topology source code. The workflow would be quite simple and would follow this model:

part7top.png

Lets create a new Maven project in Eclipse and add some required dependencies to our pom.xml file. For HortonWorks distributions most part of the referenced packages can be found in this Maven repository:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>demo</groupId>
<artifactId>stormwc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<repositories>
  <repository>
    <id>HDPReleases</id>
    <url>http://repo.hortonworks.com/content/repositories/public</url>
  </repository>
</repositories>
<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>3.0.0</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>defaults.yaml</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
     </executions>
    </plugin>
  </plugins>
  <resources>
  <resource>
    <directory>/etc/hbase/conf</directory>
    <includes>
      <include>hbase-site.xml</include>
    </includes>
  </resource>
  <resource>
    <directory>/etc/hadoop/conf</directory>
    <includes>
      <include>hdfs-site.xml</include>
      <include>core-site.xml</include>
    </includes>
  </resource>
  <resource>
    <directory>/etc/hive/conf</directory>
      <includes>
        <include>hive-site.xml</include>
      </includes>
  </resource>
  </resources>
</build>
<dependencies>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.1.2.5.0.0-1245</version>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hdfs</artifactId>
    <version>1.0.1.2.5.0.0-1245</version>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>1.0.1.2.5.0.0-1245</version>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hive</artifactId>
    <version>1.0.1.2.5.0.0-1245</version>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.0.1.2.5.0.0-1245</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.0.2.5.0.0-1245</version>
    <exclusions>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.10.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.jboss.netty</groupId>
        <artifactId>netty</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
</dependencies>
</project>

As you’ve noticed our pom.xml file contains a specific build section where it declares maven-shade-plugin. This plugin allows to extend our build stage with extra operation which will include all the dependencies of the project to the end jar file and will create Uber jar. One of the very important aspects of the every java application is the proper definition of the referenced packages. Once JVM starts running it will examine the CLASSPATH for the necessary dependencies. If it will not find any of them, ClassNotFoundException will be thrown and application will crash. In Hadoop things are becoming a more complicated as each service is running in its own JVM on standalone node with custom CLASSPATH. So every new dependency of our applications should be forwarded throughout the cluster. Especially it becomes an issue when you have several environments and for each new dependency you need to ping system engineer to install it on every Hadoop node or upgrade the service’s CLASSPATH with new locations. Uber jar allows us to resolve this on the start stage by packaging all required modules directly with the program. That drawback is that you now have to operate with a huge archive of packages instead of small application with custom logic only.

At this stage we can create topology driver class where we will declare spout with bolts and some configuration properties. Nimbus will use this class to properly setup the workflow and submit it to Supervisors. Driver should contain main entry point method. In this method we need to define the workflow using TopologyBuilder class. At the end we need to submit the instance to Nimbus using StormSubmitter.submitTopology method:


package storm;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.hbase.bolt.HBaseBolt;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hive.bolt.HiveBolt;
import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringKeyValueScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import com.google.common.collect.Maps;

public class Driver {

    private static String TOPOLOGY_NAME = "WordCountTopology";

    private static final String ZK_SERVERS = "localhost:2181";

    private static final String HDFS_URL = "hdfs://sandbox.hortonworks.com";
    private static final String HDFS_CATALOG = "/stormwc";

    private static final String KAFKA_TOPIC = "stormwc";
    private static final String KAFKA_ZK_ROOT = "/stormspouts";
    private static final String KAFKA_ZK_ZNODE = "stormwckafkaspout";

    private static final String HIVE_METASTORE_URI = "thrift://localhost:9083";
    private static final String HIVE_DBNAME = "default";
    private static final String HIVE_TABLENAME = "stormwc";
    private static final String[] HIVE_COLUMNNAMES = { "filename", "words", "count" };

    private static final String HBASE_TABLENAME = "stormwc";
    private static final String HBASE_KEY = "filename";
    private static final String HBASE_COLUMNFAMILY = "cfwc";
    private static final String[] HBASE_COLUMNNAMES = { "words", "count" };

    public static void main(String[] args) throws Exception {

        int numSpoutExecutors = 1;

        KafkaSpout wcKafkaSpout = buildWordCountSpout();
        IBasicBolt wcCountBolt = CustomWordCountBolt.GetBoltInstance();
        HdfsBolt wcHdfsBolt = buildHdfsBolt();
        HBaseBolt wcHBaseBolt = buildHBaseBolt();
        HiveBolt wcHiveBolt = buildHiveBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("wcspout", wcKafkaSpout, numSpoutExecutors);
        builder.setBolt("wccountbolt", wcCountBolt).shuffleGrouping("wcspout");
        builder.setBolt("wcHdfsBolt", wcHdfsBolt).shuffleGrouping("wccountbolt");
        builder.setBolt("wcHBaseBolt", wcHBaseBolt).shuffleGrouping("wccountbolt");
        builder.setBolt("wcHiveBolt", wcHiveBolt).shuffleGrouping("wccountbolt");

        Config cfg = new Config();
        Map<String, String> HBConfig = Maps.newHashMap();
        cfg.put("HBCONFIG",HBConfig);

        StormSubmitter.submitTopology(TOPOLOGY_NAME, cfg, builder.createTopology());
    }

    private static KafkaSpout buildWordCountSpout() {
        SpoutConfig spoutCfg = new SpoutConfig(new ZkHosts(ZK_SERVERS), KAFKA_TOPIC, KAFKA_ZK_ROOT, KAFKA_ZK_ZNODE);
        spoutCfg.scheme = new KeyValueSchemeAsMultiScheme(new
        StringKeyValueScheme());;

        return new KafkaSpout(spoutCfg);
    }

    private static HdfsBolt buildHdfsBolt() {
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(128.0f, Units.MB);
        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(HDFS_CATALOG);
        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
        return new HdfsBolt().withFsUrl(HDFS_URL)
          .withFileNameFormat(fileNameFormat)
          .withRecordFormat(format)
          .withRotationPolicy(rotationPolicy)
          .withSyncPolicy(new CountSyncPolicy(1000));
    }

    private static HiveBolt buildHiveBolt() {
        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
            .withColumnFields(new Fields(HIVE_COLUMNNAMES));
        HiveOptions hiveOptions = new HiveOptions(HIVE_METASTORE_URI, HIVE_DBNAME, HIVE_TABLENAME, mapper);
        hiveOptions.withTxnsPerBatch(2);
        hiveOptions.withBatchSize(2);
        hiveOptions.withIdleTimeout(2);

        return new HiveBolt(hiveOptions);
    }

    private static HBaseBolt buildHBaseBolt() {
        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
            .withRowKeyField(HBASE_KEY)
            .withColumnFields(new Fields(HBASE_COLUMNNAMES))
            .withColumnFamily(HBASE_COLUMNFAMILY);

        return new HBaseBolt(HBASE_TABLENAME, mapper).withConfigKey("HBCONFIG");
    }
}

Driver.class

Our main method in Driver class defines a workflow using a set of TopologyBuilder setSpout and setBolt methods. Besides we are specifying  a number of environmental properties of our external sources so that spout and bolts could properly work with them. The only thing left for us to do is to implement our CustomWordCountBolt. Instead of implementing IRichBolt interface we can simplify our task by inheriting from BaseBasicBolt which already has some piece of functionality. We only need to implement Execute method with our logic and define the output format of tuples emmited by this bolt in declareOutputFields method:


package storm;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.BiMap;

public class CustomWordCountBolt extends BaseBasicBolt
{

    private static final Logger LOG = LoggerFactory.getLogger(CustomWordCountBolt.class);

    public static IBasicBolt GetBoltInstance(){
        return new CustomWordCountBolt();
    }

    private CustomWordCountBolt(){
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("filename", "words", "count"));
    }

    public void execute(Tuple tuple, BasicOutputCollector oc){
        LOG.info("New tuple recieved");
        Map pairs = (Map)tuple.getValue(0);
        for (Object key: pairs.keySet())
        {
            String keystr = String.valueOf(key);
            String text = (String)pairs.get(key);

            Map<String, Integer> wordsMap = CountWords(text.split("\\s+"));

            int total = 0;
             for (Map.Entry<String, Integer> entry : wordsMap.entrySet())
            {
                 total = total + entry.getValue();
                 LOG.info(keystr + ": " + entry.getKey() + ": "+entry.getValue());
            }
            oc.emit(new Values(keystr,text,total));
        }
    }

    private Map<String, Integer> CountWords(String[] words) {
        Map<String, Integer> map = new HashMap<String, Integer>();
        for (String s : words) {

            if (!map.containsKey(s)) {
                map.put(s, 1);
            } else {
                int count = map.get(s);
                map.put(s, count + 1);
            }
        }
       return map;
    }
}

CustomWordCountBolt.class

The topology code part is done. Lets now copy the program into SandBox, submit it using storm jar command and validate it through storm list command:

storm jar /Demo/Part7StormWc/target/stormwc-0.0.1-SNAPSHOT.jar storm.Driver

storm list

Topology_name Status Num_tasks Num_workers Uptime_secs
——————————————————————-
WordCountTopology ACTIVE 0 0 44

We created our first Word Counting topology and verified that it up and running. Now lets create some application which will push data into our topic.

Sending data from .NET to Storm through Kafka

As I’ve told before I came from .NET technology stack and when I was doing my first steps in Hadoop, it was quite challenging for me to find proper solutions for connecting these two worlds. At this stage I consider the usage of message brokers to be probably one of the best options which currently exist to resolve this task. Now I want to show you how to implement a simple Kafka producer directly from .NET WinForm application which will select some file and send it directly to stormwc topic in Hadoop. Create a new WinForm application and add Microsoft Kafka NET NuGet package to the project. The following method shows how to send the file into the Hadoop Kafka topic:


private async Task AddFileToKafka(byte[] filename, byte[] message, string topic)
{
    KafkaOptions options = new KafkaOptions(new Uri("http://sandbox.hortonworks.com:6667"));
    using (BrokerRouter router = new BrokerRouter(options))
    using (Producer client = new Producer(router))
    {
        var topicMetas = router.GetTopicMetadata(topic);

        var responses = await client.SendMessageAsync(topic,
        new[] {
            new KafkaNet.Protocol.Message
            {
                 Key = filename,
                 Value = message
            }});

        ProduceResponse response = responses.FirstOrDefault();
        MessageBox.Show(String.Format("File added to the queue - partition {0} offset {1}",
                                       response.PartitionId,
                                       response.Offset));
    }
}

Now lets subscibe to our Storm topology log file and see what would happen once we will push some data into the topic from our program:

tail -f /var/log/storm/workers-artifacts/WordCountTopology-1-1488441386/6700/worker.log

2017-03-02 08:13:59.853 s.CustomWordCountBolt [INFO] New tuple recieved
2017-03-02 08:13:59.854 s.CustomWordCountBolt [INFO] C:\test.txt: This: 1
2017-03-02 08:13:59.856 s.CustomWordCountBolt [INFO] C:\test.txt: is: 1
2017-03-02 08:13:59.857 s.CustomWordCountBolt [INFO] C:\test.txt: my: 1
2017-03-02 08:13:59.858 s.CustomWordCountBolt [INFO] C:\test.txt: text: 1

Hive bolt section:

2017-03-02 08:27:24.368 o.a.h.h.q.i.o.WriterImpl [INFO] ORC writer created for path: hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/stormwc/delta_0000293_0000294/bucket_00006 with stripeSize: 8388608 blockSize: 268435456 compression: ZLIB bufferSize: 32768
2017-03-02 08:27:24.404 o.a.s.h.b.HiveBolt [INFO] acknowledging tuples after writers flushed

If everything went well we can find this data in HDFS, HBase and Hive storage. The content can be checked directly through the services CLIs:

hdfs dfs -cat /stormwc/wcHdfsBolt-4-0-1488443173720.txt

C:\test.txt|This is my text|4

hive -e 'select * from stormwc'

C:\test.txt    This is my text     4

Time taken: 0.407 seconds, Fetched: 4 row(s)

echo  'scan "stormwc"' | hbase shell

ROW COLUMN+CELL
 C:\test.txt column=cfwc:count, timestamp=1488460779930, value=\x00\x00\x00\x04
 C:\test.txt column=cfwc:words, timestamp=1488460779930, value=This is my text

Our first topology is ready and working. The sources are available on GitHub

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s