Streaming data to Hive using Spark

Real time processing of the data into the Data Store is probably one of the most spread category of scenarios which big data engineers can meet while building their solutions. Fortunately Hadoop ecosystem provides a number of options of how to achieve this goal and to design efficient and scalable streaming applications. In my previous articles I have already described one way of implementing such solutions using Hadoop framework called Storm. Today I would like to tell you about alternative approach which became very popular among Big Data developers due to its simplicity and high efficiency – Spark Streaming.

Usually when people start learning Spark framework, they have a number of confusions about the proper ways of using this tool. Indeed Spark covers number of completely different scenarios. I would say that it is one of the most universal frameworks in Hadoop stack. But in order to simplify things you can divide Spark use cases into three general categories:

  • Data mining – investigation of the data from different sources and identification of its features and characteristics. Usually used by data scientists through PySpark or RSpark
  • Batching applications – single-time operations upon the data. Such applications can perform one time export of the data from one source into another or transformation of existing data to new format
  • Streaming applications – continues processing of the the data from pre-defined streaming sources. Such applications are event-driven solutions and they continuously listen for signals from real time data producers like Kafka or Kinesis message brokers

Spark streaming application

As you’ve probably guessed in this article I will cover the implementation of the application which falls into the last category of the list. From the high level Spark Streaming application represents a processing layer between data producer and data consumer (usually some data store):

part16-streaming-arch

Figure 1 – Streaming Spark Architecture (from official Spark site)

Developer creates Spark Streaming application using high-level programming language like Scala, Java or Python. Once code part is done, he compiles it into file package and submits it to Spark execution engine using internal Spark tools. Depending on configuration of the framework application can be launched either in Standalone mode, mostly used for testing purposes, or in Distributed mode. In last case it will follow master-slave execution model and Spark will use some cluster management framework like YARN or Mesos in order to allocate the necessary resources for it. Then the application will be submitted to the driver container which will launch and maintain the running workflow. Spark will create special services on the worker nodes called executors. The driver program will distribute the workflow execution between these executors using their CPUs and Memory. Each application is isolated and does not share any state with other workflows.

One of specific features of Spark Streaming is its Micro-Batch model of processing the data. Instead of pushing every new event into the streaming pipeline at once after receiving from the source, like Storm framework does for example, Spark will aggregate them into a special entity called Batch. Then depending on the property called BatchInverval Spark will push these batches through the workflow parallelizing all calculations among available executors. That is why Spark Streaming is often called pseudo realtime processing framework. Such technique introduces certain latency into the application which should be considered at the design stage. But on the other hand such approach can bring benefits for certain types of operations like bulk inserts of the data to the data storages.

Processing is the key point of Streaming application and the main goal of Spark is to provide an API for the developer to wrap that processing logic using some high-level programming language into standalone unit and to spread it across the machines of distributed cluster. Such unit is usually combined from a number of components which encapsulate the overall logic:

  • Driver – the major component of the application which defines the workflow and runtime environment. It usually initializes a connection to primary Spark engine called SparkSession. Then using this connection and different configuration properties driver creates the key point of the application – Streaming context. It is responsible for the start-stop of the application and for the definition of the workflow
  • Receiver – component responsible for listening of the event from the data producers. Receiver aggregates the events using special component called BlockGenerator, which knows how to generate the batches and their frequency, and emits batches in a special form called DStream
  • Transformer (optional) – after the data has been received and normalized, we are ready to apply the transformations upon it. Transformer component can modify  information, decorate it with other properties, calculate certain parameters or perform any other useful processing operations
  • Writer (optional) – after the data has been successfully handled and processed, we need to output the result. Normally we either emit it to some message queue or persist it to the data store like Hbase, Hive or S3

The model I’ve just described is not a mandatory standard of Spark Streaming applications but just an advice about how you can divide the logical content of the program module.

Building custom streaming application

Now lets try to write our custom application. We will use favorite Word Count task to make things clear in terms of internal logic. But we will extend it with a number of steps which will probably give better understanding of processing the data through Spark Streaming framework. In this example we will use Scala. As I’ve mentioned earlier, Spark supports number of languages, but internally it is written on Scala. That is why I personally think that this language most accurately reflects Spark nature and provides native support of its functionality, without any extra translations performed by the interpreters of others languages.

Receiver

The entry point of the application it the receiver. Spark library contains a number of predefined receivers which allow you to read the data from the most widely used data producers like Kafka, Kenesis or Flume. But in order to show the general idea we will create our custom receiver. It will watch for the new files in some physical catalog on the disc and will push their content into the streaming pipeline. Spark library defines abstract base class called Receiver and we will extend its functionality by overriding onStart and onStop abstract methods:

package myorg.demo

import java.nio.file._
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

import scala.io.Source

class SimpleFileReciever(location: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  override def onStart() = {
    val watcher = FileSystems.getDefault.newWatchService
    Paths.get(location).register(watcher, StandardWatchEventKinds.ENTRY_CREATE)

    println("Starting the reciever...")

    while (true) {
      watcher.poll(5, TimeUnit.SECONDS) match {
        case null =>;
        case events if events != null => {
          println("New events recieved...")

          for (event <- events.pollEvents.asScala) {
            val file = event.context().asInstanceOf[Path].getFileName.toString
            val lines = Source.fromFile(location + file).getLines()
            lines.foreach(store)
            Files.delete(Paths.get(location + file))
          }
          events.reset()
        }
      }
    }
  }

  override def onStop(): Unit = {
    println("Stopping the reciever...")
  }
}</pre>

There are two key things here which I want to point out. The first one is that we specify the storage level for our receiver with StorageLevel.MEMORY_ONLY value. That means that it will keep all incoming data within the memory of working process which is the most efficient option. But if you expect larger workload on your executor services from the data producers side, then you can extend this model to other levels. Another key thing is the call of store method. This is where we push our data to the special Spark component called BlockGenerator. The purpose of this tool is to aggregate the incoming events and periodically publish them into the sequence of RDDs. Once RDD batch has been generated and published, Block Generator will start the creation of new one. This loop will be repeating through the whole lifetime of Spark Streaming application.

Transformer

Once Block Generator has produced an RDD of data, we can now perform the transformation upon it. Our example will execute word counting logic. It will take an RDD of the lines of text file, iterate them one by one, split each line with the space delimiter and calculate the number of occurrences of every word in the whole text. Then it will emit this information as a new RDD:

package myorg.demo

import org.apache.spark.rdd.RDD

class Transformer extends (RDD[String] => RDD[(String, Int)]) {
  override def apply(rdd: RDD[String]) = {
    rdd.flatMap(rec => rec.split(" "))
      .map(word => (word, 1))
      .reduceByKey((one, two) => one + two)
  }
}

Writer

Writer is the destination point of streaming application. By default on submit stage Spark validates that the workflow ends with some output operation, throwing an exception if this requirement is not met. Though simple printout is sufficient, in our case we will persist the results of calculations into HDFS system in Parquet format. All the data will be saved into one folder and every new RDD will be generating a separate file at the end. Later we will use Hive functionality to map the content of this folder to an external table:

package myorg.demo

import myorg.demo.StreamingApp.sc
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode

class Writer extends (RDD[(String, Int)] => Unit) {
  override def apply(rdd: RDD[(String, Int)]) = {
    import sc.implicits._

    val result = rdd.toDF("Word", "Count")
    result.coalesce(1)
      .write
      .format("parquet")
      .mode(SaveMode.Append)
      .save("hdfs://sandbox-hdp.hortonworks.com:8020/store/data.parquet")
  }
}

The reason we are using Parquet format here is due to extra benefits which it gives us comparing to simple text representation. It is not probably a big deal for the task we are trying to resolve, but for real production systems Parquet could bring a huge benefits due to compression and performance rates it introduces for storing the data.

Driver 

Now we can create the driver for our application. It will play the role of glue for above components. Besides it will define some extra configuration properties to make our code compatible with environment:

package myorg.demo

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingApp extends App {

  private val path = args(0).toString + "/"

  val sc = SparkSession
    .builder()
    .appName("AppName")
    .config("spark.master", "local[*]")
    .getOrCreate()

  sc.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

  def createContext: StreamingContext = {
    val ssc = new StreamingContext(sc.sparkContext, batchDuration = Seconds(5))
    val inputStream = ssc.receiverStream(new SimpleFileReciever(path))
    val transformedStream = inputStream.transform(new Transformer)
    transformedStream.foreachRDD(rdd => if (!rdd.isEmpty()) new Writer()(rdd))

    ssc
  }

  val ssc = createContext

  ssc.start()
  ssc.awaitTermination()
}

Here we create SparkSession which will establish the connection with Spark engine. The first argument is the folder where we will drop files for the processing. Then we define Hadoop configuration properties for the application. In our case we specify mapreduce.fileoutputcommitter.marksuccessfuljobs which omits Spark writer from creation of metadata at the destination folder. Otherwise we would get the exception on Hive side at the external table creation stage. Developers can initialize other properties here as well or add extra configuration files missing in local class path. For example if you are going to work with Hbase and Spark is not configured to reference its settings, you can add them using following command straight from the driver:

sc.sparkContext.hadoopConfiguration.addResource(“/etc/hbase/hbase-site.xml”)

Same thing is relevant to libraries which application could require for correct execution. You can add them in drive as well using addJar command:

sc.sparkContext.addJar(“/libs/template.jar”)

Running the application

Now lets compile the code and build the package. I was using IntelliJ and SBT template for creating my application so will pack the project into single jar file using SBT shell:

Screen Shot 2017-12-03 at 9.37.55 PM

Once package is ready, I will copy it into my Hadoop Sandbox. There I will use spark-submit tool in order to run the application:

Screen Shot 2017-12-03 at 9.47.57 PM

Once I drop some text file into the /temp catalog of local file system, my program will process it through WordCount logic and will push the results into predefined location on HDFS:

[root@sandbox-hdp ~]# hdfs dfs -ls /store/data.parquet
Found 2 items
-rw-r–r– 1 root hdfs 1991 2017-12-03 15:54 /store/data.parquet/part-00000-50e3134a-0a67-4559-924b-f97aa1219946-c000.snappy.parquet
-rw-r–r– 1 root hdfs 2297 2017-12-03 15:53 /store/data.parquet/part-00000-a8ea68f1-27f8-4ea0-989f-954c546d7501-c000.snappy.parquet

Accessing the data through Hive

The last thing left to do is to map the data to Hive. It will allow us to consume information through friendly UI and well-known SQL syntax. Behind the scene Hive will use all benefits of distributed calculations upon HDFS file system and Map Reduce jobs. So all we need to do is to create an external table upon /store/data.parquet catalog where writer of the Spark application persists the results of the calculations:

Screen Shot 2017-12-03 at 10.02.37 PM

Now we can run the queries upon the data:

Screen Shot 2017-12-03 at 10.05.31 PM

Of cause this scenarios is very simple and I just wanted to show you one of the ways how you can stream the data directly into Hive using Spark. In real scenarios you’ll need to deal with such things as partitioning of the data to other catalogs, reliability of the receivers and checkpointing of the stages of the workflow. But driving that way Spark will provide you with everything necessary to build powerful applications where you will be able to process hundreds of terabytes of data per day in reliable and efficient manner.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s