The world of Big Data provides a wide variety of tools for organizing large amounts of data within a single place. But after we’ve pushed everything in place, usually we start seeking for the options of getting different benefits from this information. We involve statisticians and data experts for the investigation of our data and they start applying certain techniques to detect different features of such information. As a result these features will allow us to build some useful data models which will help us to improve the quality of our products. For example we can build the recommendations for our customers about which products they could like or dislike or even more we can identify the best characteristics for our new products relying on the current demands of our clients. Hadoop provides a set of tools for doing such operations and at this moment Spark is probably one of the best options which fits the demands of data specialists who work within Big Data realm. In this article I want to give the overview of this product and to show how to use it in HortonWorks Hadoop distribution sandbox.
Why we need Spark
Historically the evolution of distributed calculations in world of Big Data started from the MapReduce paradigm. It was implemented through a set of services which allowed developers to define the operations upon the data in standalone units called jobs. These jobs were then scheduled by specific resource manages and were distributed to the worker nodes which did the actual evaluations and saved their results to some distributed file systems like HDFS. Such model ideally suited for some small operations upon the data like aggregation or filtering. By the time the requirements for the complexity of such operations has been increasing and Hadoop community started to introduce new tools for simplifying them in terms of existing paradigm. Hive and Impala brought SQL for working with the data, Pig improved it with an extra piece of functional batch programming. But all of them were still working on the top of MapReduce approach where original queries were interpreted into a set of jobs which kept intermediate results on the physical storage. That caused a certain bottleneck for both performance and convenience of working with the data and at some stage community started to develop alternative to that approach. In 2012 it released a new tool called Spark. Comparing to the previous technique Spark allowed users to perform operations upon the data using modern functional programming languages like Scala and Python, which were already widely used by the data experts. But even more important was that this framework introduced in-memory data processing which brought significant improvement benefits to the overall performance of the applications comparing to existed MapReduce solutions. That is why Spark very quickly became the one of the most popular products for data exploration.
Installing Spark locally
As I’ve mentioned earlier Spark is a processing engine and its primary goal is targeting Big Data scope. But in order to start working with this application it is not necessary to have Hadoop cluster. You can install Spark in standalone mode on the local file system and I’m going to show you how to do this. First make sure that you have all of its dependencies installed and configured:
- Java JDK – Spark requires Java for proper work
- Scala – Spark originally was written on Scala, so this part is also mandatory for the installation
- Python – Python is optional for Spark but if you want to work with Spark using this language, you need to install it first into the system. Personally I prefer working with Spark using this language due to the connivance and simplicity in certain details
Once you have all prerequisites sorted out you can start the installation of the application:
- Step 1 – first you need to download Spark distribution package from the official site (I’m using distribution version 2.0 for Hadoop 2.6), unpack the archive and to set up the configuration variables – $SPARK_HOME/bin for Linux/Mac or %SPARK_HOME%/bin for Windows, where SPARK_HOME is the location of your extracted content:
- Step 2 – as you are using local file system you need a special tool winutils.exe available here. Besides you need to add it to the environmental variables of your file system with the name HADOOP_HOME, so that Spark could properly pick it up:
- Step 3 – in case of error related with lack of permissions to hive temporary folder you need to create a catalog called tmp\hive on drive with Spark distribution and grant full permissions to this catalog using the utility in step 2:
D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive
Now you can run Spark test application to check that everything works properly:
run-example SparkPi 10
Pi is roughly 3.141063141063141
The test passed successfully so it is time to start working with Spark.
Start Sparking from REPL
Spark provides an four different API’s for working with the data – Java, Scala, Python and R. Each of them has certain benefits and disadvantages but in general you can archive the same end result using either of them. Native Spark tools are located in ./bin catalog of the parent directory. There you can find the REPLs for all languages except Java. Java currently does not have its own REPL but in Java 9 release which is planned at this year JShell REPL is announced so probably Spark will soon cover this part also. Different Spark REPLs can be launched directly from the command line:
- ./bin/spark-shell – Scala REPL
- ./bin/pyspark – Python REPL
- ./bin/sparkR – R REPL
In this article I’m going to use Python REPL but the examples are quite similar to the other languages and all of them are following the same concepts and principles. To launch Python REPL run pyspark tool from the command line:
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.0
Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 20:42:59)
According to main concepts of the framework every Spark application represents a driver program which communicates with Spark engine using special object called SparkContext. This model is probably familiar to these people who had an experience in writing MapReduce jobs which also contained driver class which defined the the overall configuration and execution workflow.
In Spark all the functionality is based on its primary abstraction – Resilient Distributed Dataset. RDD is a fundamental core component of the framework which represents a read-only collection distributed across the nodes of the cluster. In general all operation upon RDDs are divided into two categories:
- transformations – create a new RDD from external data sources or modifies existing RDD
- actions – evaluates the actual value or values of the RDD or saving them to external source
In common Spark application at first step you usually start uploading some input data and automatically creating a new RDD for this information. At this stage it will not have any content, but just an instructions about the import operation. That is why RDDs are usually called as a lazy objects. Then you perform certain transformation of this data. On every transformation Spark will create another RDD. And again they it not hold any data yet but just the instructions about how to modify the incoming stream. At the last step when you want to see the content of the RDD you perform action operation and Sparks translates all RDD instructions into actual evaluations. At that stage Spark can perform the optimizations of certain steps and introduce extra performance benefits. After evaluation is complete, you can finally see the result data and start using it for some next stages.
Now lets see how this works in real life. Lets do a simple word count application using Spark from our python shell:
>>> textFile = sc.textFile(“file:///d:/README.txt”)
d:\README.md MapPartitionsRDD at textFile at null:-2
>>> counts = textFile.flatMap(lambda line: line.split(” “)) \
… .map(lambda word: (word, 1)) \
… .reduceByKey(lambda a, b: a + b)
PythonRDD at RDD at PythonRDD.scala:48
[(u’my’, 1), (u’friend’, 1), (u’hello’, 1)]
At first line we upload the file into the textFile variable and receive a RDD without actual file content. We can see this in third line. Then we perform a set of transformation operations upon the data and see that counts variable is still RDD without any content. At the end we call collect() action operation and our evaluations begin. Once they complete we can see the result of all operations.
Running Spark from sandbox
In previous example you saw how easily you can upload the data and perform the calculations upon it using you local Spark instance. But real mighty of Spark can be seen only when you start using it in real distributed environment. Besides Hadoop has an extra set of tools which simplifies the work with this application. First of all lets launch our sandbox and verify that Spark is up and running through Ambari UI:
Now we can go directly to the shell REPL and start working in exact same fashion as we did before but I want to direct you to more advanced option which could heavily simplify your coding on the early stage. This option is called Zeppelin. Zeppelin is another product of Big Data community which allows to interact with your data using different Hadoop services. Spark is one of them. I will not go deep into the details of this project as it would take another few articles but just want to point out the main concepts:
- Zeppelin is a web application, it is running as a part of Hadoop ecosystem but still decoupled from the end data services
- Zeppelin uses interpreters. Each interpreter allows to talk to certain service. Spark is one of Zeppelin interpreters.
- Zeppelin contains notebooks. Every notebook is a container for the collection of the operations performed by different interpreters
You can access this application from you browser directly using the sandbox Zeppelin url:
Lets now move our word count application from last chapter into this application. Open Notebook => Create new note. Zeppelin will create a separate working sheet for you. In this sheet you can start creating paragraphs. Every paragraph represents a piece of certain logic. All together they form a complete program module. A set of interpreters available to the paragraphs of the Notebook is defined in program settings:
You top interpreter is Spark but you can change the order and introduce new instances following official guides. To start using Spark interpreter lets create new paragraph and start call it as Data Import. The first run of the paragraph could take some time as it will have to allocate a separate container using YARN:
As a result of the execution of this paragraph you will see RDD with the description of the original data source similar to that one which we saw in Python REPL:
Now lets create another paragraph with transformation logic and results evaluation. The data from previous paragraph is available to the current instance so we will use our textFile variable here:
The last step is to present the data. Zeppelin provides a great variety of options of presenting the data. But to make it simple we will now focus on a table view:
That’s it. From my point of view this output is a more user friendly comparing to the option we had in pyspark REPL. More advanced scenarios in Zeppelin allow to display data using charts, HTML and even angular dynamic. The final Notebook unit represents complete summary of the data research with all intermediate steps and their visualization and can be easily transferred to the other place like a single program module.