Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Introduction to Apache Spark

0.00/5 (No votes)
1 Sep 2015 7  
An introductory article on Apache Spark, with a demo app

Series Links

Table Of Contents

Introduction

It has been a while since I have written a blog post or an article for that matter. There is a good reason for this, which is that I have started a new job, which is in a brand new domain for me (I have mainly worked in Fx), this job is a hedge fund / re-insurance firm. So I have been fairly tired. The reason I took this job, is that they asked me at the interview stage, how I would feel about us not using .NET for everything (which they had done until then), and what my thoughts were about using a stack made up of things like this:

 

I was frankly over the moon, I had just finished a book on Apache Spark, and REALLY want the chance to work with Scala / Cassandra, so I went for it.

This doesn't mean I don't do or LOVE .NET, heck I think .NET is the mutts nuts. There are however some great things going on out there in the non .NET / Microsoft space. Apache Spark is of particular interest to me.

This article will serve as a beginners guide to Apache Spark. In fairness Apache Spark has great documentation, which this article kind of just supplements. If however you have never heard of Apache Spark, you may like what you read, I certainly did the first time I read about it.

 

A Note About This Article

All examples and walkthroughs in this article and the demo app will be using Scala, as it is a nice modern OO/functional language.

 

Where Is The Code

You can grab a small IntelliJ IDEA Scala project from my GitHub repo : https://github.com/sachabarber/SparkDemo

 

What Is Spark

This is what the creators of Apache Spark have to say about their own work.

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

 

http://spark.apache.org/docs/latest/index.html up on date 24/08/15

 

mmm want a bit more than just that, try this

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

 Spark programming guide up on date 24/08/15

 

If you are the sort that likes diagrams, here is one that may help

 

So that is what they have to say about it. Here are a couple of my own bullet points that may also help:

  • Spark provides a common abstraction : Resilient Distributed Datasets (RDDs)
  • Spark provides a LINQ like syntax that you can actually distribute across multiple worker nodes (LINQ like syntax across multiple nodes, think about that for a minute, that is like um WOW)
  • Spark provides fault tolerance
  • Spark provides same API for running locally as it does within a cluster which makes trying things out very easy
  • Spark provides fairly simple API
  • Very active community
  • Put bluntly, It is a bloody brilliant product

 

The following diagram illustrates the current Apache Spark offering. Each one of the dark blue blocks in the diagram are a slightly different offering, for example:

  • Spark SQL : Allows querying of structure data, such as JSON into RDDs (which means you can run RDD operations on JSON effectively)
  • Spark Streaming : Allows the micro batching of data over time (window buffer, size buffer, sliding buffer etc. etc.), where the data received in that buffer is presented as a list. Which makes it very easy to deal with. Spark Streaming has many sources, such as
    • Sockets
    • Kafka
    • Flume
    • Zero MQ

 

 

Under pinning all of these is the common abstraction (Resilient Distributed Datasets (RDDs). So let's carry on and talk a but more about RDDs.

 

PS : In this article I WILL ONLY be covering Apache Spark basics and not each of the extra offerings above shown in the dark blue areas of the image.

 

How Does It Compare To Map/Reduce

Map Reduce

Map Reduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster. Conceptually similar approaches have been very well known since 1995 with the Message Passing Interface standard having reduce and scatter operations.

A Map Reduce program is composed of a Map() procedure that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a Reduce() procedure that performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "Map Reduce System" (also called "infrastructure" or "framework") orchestrates the processing by marshaling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

https://en.wikipedia.org/wiki/MapReduce up on date 24/08/2015

 

Map Reduce typically runs on these Map/Reduce stages, which would typically require hardware for each stage.

 

Spark

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

 

http://spark.apache.org/docs/latest/programming-guide.html#overview up on date 24/08/2015

 

Spark runs entirely in memory on commodity grade hardware, and if the dataset can be fitted into memory is lightening fast, Spark claims that in this scenario it can be up to 100x faster than Map/Reduce. If you are persisting RDDs to disk Spark claim to be up to 10x faster that Map/Reduce.

Another areas where Spark wins out is the RDD Api, it is much richer than Map/Reduce operations.

 

Lets see a diagram which tries to illustrate the differences between Spark and Map/Reduce. It can be seen that in Map/Reduce there is a staged approach or Map/Reduce, repeat.

In Spark work is distributed to workers, each of which can do a portion of the work and then the results are consolidated and brought back to the driver program, or possibly persisted to disk.

 

 

For more discussions on the differences, Google is a good bet.

 

 

RDD Transformations / Actions

As previously stated one of the core concepts of working with Apache Spark is trying to work with RDDs.  Taking some information (quite shamelessly) from the Apache Spark docs

 

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

 

RDD Operations up on date 24/08/15

 

Lets examine a few of these

 

Creating RDDs

There are 2 ways to create RDDs, either by parallelizing some existing data in the driver program, or by using some of the factory methods on the SparkContext object. We will now see examples of both.

 

Parallelize

If we want to create an RDD from existing data (say from Cassandra), we could do something like this:

al data = Array(1, 2, 3, 4, 5)
val dataRdd = sc.parallelize(data)

This creates a RDD that can now be operator on in parallel. You can read more about this using the Spark documentation.

 

External Datasets

Spark also supports creating RDDs from external sources such as a hadoop file system, or from Amazon S3 file system, and Cassandra to name a few. If you just want to try something out locally you can even use a file system path. But remember this will not work if you choose to deploy to a cluster unless the file system object you have chosen is in the same place in all nodes in the cluster.

 

The setup I am using for this article is that I have a standalone scala application running on windows, and I am using Spark locally ONLY. That is I am not connecting to a Spark cluster, I am parallelizing only across the cores that my laptop has, which for me is 2. As a result of that I am able to use a local file system file to create an RDD, but this would likely not work if I was to try and run my code in a cluster, as a Spark cluster would need to be running on Linux and my Scala application is running on Windows, as such the format of the file system path is different. We will go into how to create a spark cluster later in the article. Granted not in very much detail, but I will lead you to the water, from there you may drink in the goodness of trying to setup a Spark cluster yourself.

 

Anyway enough chit chat, how do you create an RDD from one of these external sources.

val txtFileRdd = sc.textFile("data.txt")

 

This example  simple creates an RDD from a text file.

 

Now that you have seen how to create RDDs, we can move along to see how to use the RDDs to perform transformation and carry out actions.

 

 

RDD Transformations

There are many RDD transformations, and for a full list of them you should consult the following page of the spark documentation : http://spark.apache.org/docs/latest/programming-guide.html#transformations

 

We will look at a few here, but essentially the name implies what is occurring when you use a transformation, you are essentially transforming the data in some way.

 

Filter

Description : Return a new dataset formed by selecting those elements of the source on which func returns true

Signature: filter(func)

//create an RDD using external data (i.e. the text file)
val textFileRDD = sc.textFile(someTextFilePath, 2).cache()

//filter example
val numAs = textFileRDD.filter(line => line.contains("a")).count()

 

Map

Description : Return a new dataset formed by selecting those elements of the source on which func returns true

Signature: map(func)

//MAP example
val mapHoHo = textFileRDD.map(line => line + "HO HO")
println("HoHoHo line : %s".format(mapHoHo.first().toString()))

 

For more examples see this link : Spark transformations

 

 

RDD Actions

There are many RDD actions, and for a full list of them you should consult the following page of the spark documentation :

http://spark.apache.org/docs/latest/programming-guide.html#actions

 

We will look at a few here, with actions you may think of them as performing some sort of final operation, such as count(), or    first() both of which would need the full dataset to be in place (which may mean shuffling results back across the worker nodes). You may also find methods for saving an RDD such as saveAsTextFile(..), which would allow you save an RDD.

 

Collect

Description : Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data

Signature: collect()

//COLLECT ACTION example, note that filter() is actually an transformation
val numAsArray = textFileRDD.filter(line => line.contains("a")).collect()
println("Lines with a: %s".format(numAsArray.length))
numAsArray.foreach(println)
println("Lines with a as Array: %s".format(numAsArray.getClass().getTypeName()))

 

First

Description : Return the first element of the dataset (similar to take(1))

Signature: first()

//FIRST ACTION example
val firstLine = textFileRDD.first()
println("First Line: %s".format(firstLine))

 

Persist

You may choose to persist an RDD to IO. There are lots of options for this, and many things to consider. For more examples see this link : Spark actions

 

For more examples see this link : Spark actions

 

 

Shared Variables

Typically within spark you will send a operation to a remote node for execution, where the node works on a separate copy of the data. Providing general purpose read/write variables across tasks/nodes would be inefficient. Spark does however provide 2 types of  shared variables.

 

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method

http://spark.apache.org/docs/latest/programming-guide.html#shared-variables up on date 24/08/2015

 

Here is a quick example:

val broadcastVar = sc.broadcast(Array(1, 2, 3))
.....
broadcastVar.value

 

 

 

Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in Map Reduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code below shows an accumulator being used to add up the elements of an array:

 

http://spark.apache.org/docs/latest/programming-guide.html#shared-variables up on date 24/08/2015

 

Here is a quick example:

val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
var accuulatedValue = accum.value

 

 

Shuffle Operations

There are certain operations that will cause Spark to perform what is known as a "Shuffle". What this refers to is the job of reading all the data from all the partitions. This is a costly thing to, as it involves disk I/O, network I/O, serialization.

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

You can read more about this here : shuffle operations

 

 

Persisting RDDs

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

Persisting RDDs up on date 24/08/15

 

 

 

 

 

How Do I Install Spark

Apache Spark itself is quite large, luckily we can get up and running with just a bunch of JAR files. The easiest way to start out with Apache Spark (at least in my opinion) is to do the following

  1. Download SBT (it's free)
  2. Download the 1.8 JDK (or later) (it's free)
  3. Download IntelliJ IDEA Community Edition (it's free)

 

You can of course download the actual Apache Spark Windows installer, which will dump a bunch of stuff in a folder of your choosing. The thing is the Apache Spark team say that Apache Spark runs on Windows, but it doesn't run that well. You would typically run it on a Linux Cluster. We will talk more about this later. But for now just remember that Apache Spark really does run MUCH better on a Linux VM/Box/Cluster, and you should ensure you do that for a real environment. For messing around you could use Windows, which we will also talk about later.

 

SBT

You will need to install the simple build tool, which you can grab from here:

http://www.scala-sbt.org/

If you have not heard of SBT or do not know what it is, you can think of it as a build tool and package management tool, in some ways it is similar to Nuget, though SBT actually uses Scala as the grammar/DSL in which to express your build requirements.

 You may use it to scaffold a Scala project and also download a Scala project's dependencies. We will see more on this later.

 

JDK

Apache Spark uses one of the following 3 programming languages

  1. Java
  2. Scala
  3. Python

For the top 2 we obviously need the JDK. So we need to grab it. You can grab it from the Oracle web site:

Java SDK Downloads

 

The typical Java 8 SDK installation should be fine, pick the one that you need for your environment

 

 

IntelliJ IDEA

You can download a free copy of IntelliJ IDEA from here : https://www.jetbrains.com/idea/download/

 

Dealing With Weird Hadoop Commons Issue On Windows

Like I have stated in this article already you would likely not run Apache Spark cluster on Windows, however you may want to run the driver program on windows. I wanted to do this, and I got a weird error to do with a missing "winutils.exe" file.

I did a bit of Googling and it is quite easy to fix, you can get the fix from the chaps web site shown below, where he states the exact exception you may see.

 

.....and you will see the following exception:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.


The reason is because spark expect to find HADOOP_HOME environment variable pointed to hadoop binary distribution, .........

Let’s download hadoop common binary, extract the downloaded zipped file to ‘C:\hadoop’ , then set the HADOOP_HOME to that folder.

 

http://ysinjab.com/2015/03/28/hello-spark/ up on date 24/08/2015

 

The download you need to fix this is here :  http://www.srccodes.com/p/article/39/error-util-shell-failed-locate-winutils-binary-hadoop-binary-path

And the steps to follow are as stated above

 

 

Creating Our First Project

This section of the article is really the only original part of the entire article. In this section I will discuss how I myself got up and running as follows:

  • Spark in local mode on Windows
  • No cluster
  • Using Scala through IntelliJ IDEA

 

The thing with this section is if you need to change to using a cluster, all you need to change is the master node connection string details and the rest will be the same.

 

Creating The Project Structure Using IntelliJ And SBT

In this section we will create a new Apache Spark application which has the following features

  • Runs on Windows through IntelliJ IDEA
  • Demonstrates the usage of SBT to manage dependencies
  • Demonstrates a few RDD transformations
  • Demonstrates a few RDD actions

 

Creating The Initial IntelliJ IDEA Project

We need to create a new project

CLICK FOR LARGER IMAGE

 

We then need to choose SBT project

CLICK FOR LARGER IMAGE

 

You should choose the following settings when you are creating the project

CLICK FOR LARGER IMAGE

 

NOTE : It is quite important to pick Scala version of 2.10.5, as the current released build (at time of writing) of Apache Spark was v1.4.1, which doesn't support Scala 2.11.x, so you need to make sure you target Scala 2.10.5.

 

Once you have done that, you need to wait a while to let SBT to do a few things, after a while you should then see this sort of project structure

 

Modifying The SBT Requirements

So we now have a default SBT project. This is good, but now we need to get the actual Spark JAR files. This is done by modifying the main build.sbt file

So open that up, and change its contents to this:

 

name := "SparkDemo"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

 

Next open up the SBT Window, as shown below

 

Click on the refresh icon

 

This will fetch all the dependencies that are needed for the dependencies you specified in the build.sbt file above.

This will take a LONG time, just let it do its thing.

 

When this is done you should do 2 things:

  1. Check your SBT cache contains the Spark JARs. Which is this directory "C:\Users\YOUR_USER\.ivy2\cache"
  2. You should make sure that the SBT cache is used for the actual IntelliJ IDEA as a package source  (similar to what you do with Nuget and Visual Studio is you have setup your own Nuget store)

 

This is done as follows from within IntelliJ IDEA.

  • Project Structure -> Library

Then follow steps below, where you would add stuff from your own .ivy2 cache directory (for example C:\Users\YOUR_USER\.ivy2\cache")

 

1023037/ProjectStructure.png

CLICK FOR LARGER IMAGE

 

 

 

 

Scala Driver Simple App

Once you have gone through the pain of downloading the whole heap of dependencies you need to run Spark, you should be able to create a simple driver program something like the following:

The only thing that I have not explained so far in the code below, is the line

conf.setMaster("local[2]")

What this does, it runs spark locally using 2 cores (which is all my laptop has). This line is what you would change if you were running the driver program against a cluster.

You can read more about this at this Url : Spark Master Urls It's an old page but the data is still valid

 

Anyway here is the example driver program

 

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {

    // Should be some file on your system
    val someTextFilePath = "C:/Temp/SomeTextFile.txt"

    val conf = new SparkConf().setAppName("Simple Application")

    //use local Spark (non clustered in this example). Note this relies
    // on all the SBT dependencies being downloaded to
    // C:\Users\XXXXX\.ivy2 cache folder
    conf.setMaster("local[2]")
    val sc = new SparkContext(conf)

    //creaate an RDD using external data (ie the text file)
    val textFileRDD = sc.textFile(someTextFilePath, 2).cache()

    //FILTER TRANSFORMATION example, note that count() is actually an action
    val numAs = textFileRDD.filter(line => line.contains("a")).count()
    val numBs = textFileRDD.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

    //MAP TRANSFORMATION example, note that first() is actually an action
    val mapHoHo = textFileRDD.map(line => line + "HO HO")
    println("HoHoHo line : %s".format(mapHoHo.first().toString()))



    //COLLECT ACTION example, note that filter() is actually an transformation
    val numAsArray = textFileRDD.filter(line => line.contains("a")).collect()
    println("Lines with a: %s".format(numAsArray.length))
    numAsArray.foreach(println)
    println("Lines with a as Array: %s".format(numAsArray.getClass().getTypeName()))


    //FIRST ACTION example
    val firstLine = textFileRDD.first()
    println("First Line: %s".format(firstLine))

    readLine()	
  }
}

 

 

Further Readings

You can carry on reading more about these things using these links

 

That's It

Anyway that is all I wanted to say this time, I hope that some of you that may have not come across these things before got a little something out of it

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here