I continue writing your about my journey to the world of Big Data and today I want to talk about one of the most popular Hadoop frameworks called Spark. Though Spark has one of the largerst army of contributors and defacto at this moment it is the most popular tool for processing large volumes of data in Hadoop, still sometimes it could be quite challenging to find certain information regarding different parts of this product. Last few months I had to do some deep diving into one of the key components of Spark application called Receiver. This is a starting point of any streaming flow implemented using this framework. That is why it is very important to know how it works and to understand the ways for its proper configuration and tuning. In this article I’d like tell you about how Spark receiver works and to share some knowledge regarding its configuration and tuning.
Spark streaming overview
Originally Apache Spark was implemented as a batch-based alternative to another popular data processing mechanism called MapReduce. It heavily simplified the development of applications for processing large volumes of data and improved its performance rates by introducing full in-memory distrucomputing. But at some stage Spark met a serious requirement from the customers, which originally it didn’t support. The requirement was to provide realtime processing of the data. So the decision was made to create new streaming component and integrate it into existing batch-based architecture. This component was responsible for getting events from different sources and aggregating them into small parts called batches after certain intervals of time. Then existing Spark engine could push them through predefined transformations and outputs. The process was continuous and repeated until the stop of running application. One important thing here is that streaming worked with sequences of events combined into a batches. That is why people often called Spark as pseudo-streaming framework, comparing it to other alternatives like Storm of Flink which worked on event-to-event bases. But probably those of you who already had an experience with streaming applications, could have noticed that very often we have to do certain aggregation of events at some stage. For example you wanted to get better performance when you write data to output parquet file or to get summarized analysis of chunk of video frames for some period of time. That is way Spark is often preferred for implementing such sort of solution as it includes this functionality by default.
The key component of Spark streaming application is called Receiver. It is responsible for opening new connections with the sources, listening events from them and aggregating incoming data within the memory. If receiver’s worker node is running out of memory, it starts using disk storage for persistence operations. But this negatively impacts the overall application’s performance.
All incoming data is first aggregated within receiver into chunks called Blocks. After preconfigured interval of time called batchInterval Spark does logical aggregation of these blocks into another entity called Batch. Batch has links to all blocks formed by receivers and uses this information for generation of RDD. This is the main Spark’s entity which is used by the engine for the operations upon the data. Normally RDD would consist of a number of partitions where each partition would reference the block generated by the receiver on the start stage. Streaming application can have lots of receivers located at different physical nodes, so the actual data would be distributed across the cluster from the start. Batch interval is global for the whole application and is defined on the stage of creation of Streaming Context. Block generation interval is a receiver based property which could be defined through the configuration of spark.streaming.blockInterval property. By default blocks would be generated every 200ms but you can tune this property according to the nature of your data.
Spark targets to handle big amounts of data so it is important that receiver would not become a bottleneck of the application. Either you are implementing custom receiver or using existing receiver solutions, you need to know two things regarding its implementation:
- Receiver extends Spark abstract Receiver class by implementing OnStart() and OnStop() methods. Normally OnStart() will initialize connection pools and threads to grab the data from streaming sources using infinite loop. This is where you can tune your performance using multiple threads and frequency of iterations for accessing sources while retrieving the data.
- Receiver should define the place where it will call store() method provided by base class to persist the data into memory. There are two ways how you can use this method and depending on this you can follow either Reliable or Non-reliable approach:
- Reliable – use store(ArrayBuffer) to persist number of objects at a time. Spark will save the data straight into memory guaranteeing its replication and fault tolerance from executor failures. Straight after call of this operation you can send an acknowledgement to the sources of the data like deletion of the message from SQS queue for example or updating offset in Kafka partition. This type of reliability does not cover Spark driver failures and for these scenarios you have to introduce more advanced techniques like checkpointing
- Non-reliable – use Store(Object) to persist a single item. In this case data will not become the part of generated block at once, but will be pushed to temporary collection. Internally receiver will use BlockGenerator which will trigger the acquiring of the content of this collection on every block interval and will push the data into memory using store(ArrayBuffer) method. You need to understand that if something would go wrong with the executor between the time of pushing data into temporary collection and storing it into memory, you will loose this data.
This is the high-level diagram of how Spark is getting the data from input sources:
(Figure 1 – Spark Streaming)
All receivers are wrapped within component called DStream. This is a single start point of streaming application which knows how to produce RDDs using the data and knows how to produce RDD from receivers. Single application can have multiple DStreams which could target different sources.
Spark backpreasure and rate limiting
Lets say we’ve developed our application and started getting the data from streaming sources. But how we can continuously track the processing capacities used by our application? The major problem is that at some stage we can actually get too much. As a result we will consume all memory by our receivers and normally our executors will not be able to perform transformations upon the data due to lack of resources. In order to avoid this we can use Spark feature called Backpreasure. It allows Spark application to dynamically track the trend of increasing or decreasing of input volume and adjust threshold for maximal amount of events which receiver can consume at one second. The algorithm will calculate rate limits after each batch by analyzing the information from previous batches and making estimation on incoming volume. In order to enable this feature you should explicitly set spark.streaming.backpreasure.enabled property to true in Spark configuration.
Now, once Backpreasure is enabled, our applications are protected from occasional spikes of incoming data. But what about the first launch of the application? Or imagine the situation when have stopped it for a couple of hours and want to run it again? In this case data sources could aggregate pretty heavy backlog of messages for processing and they would just kill our application on the first batch. Backpreasure can’t help us here as we didn’t process anything yet and we don’t have any statistics which can be used to protect receivers from getting too much. For this case Spark introduced another property called spark.streaming.backpresure.initialRate. Here you can define the maximal number of messages which you receiver can take per second on the first batch. On next batches Spark will start doing the analysis described earlier and apply new values for the rates. This property works only when Backpreasure is enabled.
These two techniques are protecting Spark application on the start and during the spikes from the side of producers of the data. But for scenarios when some sources are just sending too much consistently you can apply global rate limitation inside your receivers using spark.streaming.receiver.maxRate property. It will apply a threshold for maximal number of messages which receiver will be able to process every second. Behind the scene it will choose the minimal value between this one and the one calculated by Backpreasure algorithm. You still have to do something with the growing backlog of messages coming from data produces but at least your application will be up and running at that time without blocking any other potential streaming sources.