Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Apache Spark/Cassandra 2 of 2

0.00/5 (No votes)
10 Feb 2016 1  
Looking at Spark/Cassandra working together

Series Links

 

 

Table Of Contents 

 

Introduction

Last time I talked about how to install Apache Cassandra locally as a single node installation. As I said before this is not what you want to do for a production installation, where you would use Cassandra cluster, but for learning one node is just fine.

I have also talked about Apache Sparkp which is a general purpose grid computation engine. Within this article I will show you how you can use Apache Spark with Apache Cassandra to save and retrieve data from Cassandra from/into Apache Spark Resilient Distributed Datasets (RDDs).

 

Where Is The Code?

You can grab the code from my github repository : https://github.com/sachabarber/CassandraSparkDemo

 

Why Would We Want To Save/Retrieve Data

So as I just stated Apache Spark is a general purpose computation engine, which works by having worker nodes, and RDDs. Where the RDDs are executed in parrallel accross the available worker nodes. There is also the concept of a driver program which is expected to perform certain actions to execute the compuatations, which will result in the worker nodes executing their portion of the calcualtions, and handing that back to the driver program where it will be assembled as part of the final result.

So the driver program will end up with some result, it is not unreasonable to think that you may want to store the results of some expensive calculation. Conversly we may wish to retrieve some previously stored values.

This is where Cassandra fits in. We can use Cassandra to save the results of a RDD and we can also use Cassandra to populate an RDD.

I hope this makes some sense. I think to be honest reading the first 2 articles in this series would be a good idea to get some grounding into what I am talking aboug here. I am telling you these technologies are AWESOME and if you are like me and you consider yourself a polyglot the fact that they are JVM based should not phase you. They are AWESOME and you should try and learn them in my humble opinion.

Anyway the rest of this article will talk you through what the various saving/retrievinf operations look like.

 

DataStax Cassandra Connector

In order to deal with the saving/retrieving from Cassandra you will need to use the DataStax Cassandra Connector, which is a MUST.  You can grab that from Maven Central Repository using a SBT (SBT is the scala build tool) file something like this:

 

name := "SparkCassandraDemoApp"

version := "1.0"

scalaVersion := "2.10.5"


libraryDependencies ++=Seq(
  "org.apache.spark"    %     "spark-core_2.10"                 %   "1.4.1",
  "com.datastax.spark"  %     "spark-cassandra-connector_2.10"  %   "1.4.0"
  )

This will bring in the Apache spark libraries, and also the Apache Cassandra Connector.

NOTE

 The version of Scala and Spark/Cassandra connector are quite dependant so make sure you use the correct ones.

 

You can read more about the Apache Cassandra Connector here, it has great docs:

https://github.com/datastax/spark-cassandra-connector

 

Using The Datastax Connector With Apache Spark

As with normal Apache Spark development, we have a what is known as a driver program. This is the portion of code that orchestrates the distributed calculations across the workers normally (via the use of .Parallelize).

For this demo app I have not give you instructions of how to create a Apache Spark cluster or a Apache Cassandra cluster, but we will still parallalize occassionally, which for this demo simply means using the available cores on your machine running the driver program.

That said most of the demo code contained in this article will simple be showing you how to best use the DataStax Cassandra Connector for Spark.

 

I am using Scala as my labguage of choice here, where the following is the complete driver program:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SimpleApp {
  def main(args: Array[String]) {

    val conf = new SparkConf(true)
      .setAppName("Simple Application")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    // Use local Spark (non clustered in this example) with 2 cores.
    // Note this relies on all the SBT dependencies being
    // downloaded to C:\Users\XXXXX\.ivy2 cache folder
    conf.setMaster("local[2]")
    implicit val sc = new SparkContext(conf)

    val tableReader = new CassandraTableReader()
    val tableWriter = new CassandraTableWriter()

    tableWriter.initialise()
    
    tableReader.readTestTableValues()
    tableWriter.writeTestTableValues()
    tableReader.readTestTableValues()
    tableReader.foreachTestTableValues()
    tableReader.getColumnAttributes()
    tableReader.getSets()
    tableReader.getUDT()
    tableReader.foreachSelectedTableColumnValues()
    tableReader.foreachFilteredTableColumnValues()
    tableReader.foreachTableRowCount()
    tableReader.foreachTableRowAsTuples()
    tableReader.foreachTableRowAsCaseClasses()
    tableReader.foreachTableRowAsCaseClassesUsingColumnAliases()
    tableReader.foreachTableRowAsTuples()
    tableWriter.saveCollectionOfTuples()
    tableReader.foreachTableRowAsTuples()
    tableWriter.saveCollectionOfCaseClasses()
    tableWriter.saveUDT()

    println("====== DONE ======")

    readLine()
  }
}

As can be seen there is nothing to fancy going on in there, we simply run some initialisation code and then we either call a function that expects to read some data from Cassandra into a Spark RDD, or writes some data from a Spark RDD into Cassandra.

As such I thought it would be a good idea to split the rest of the article up into 4 sections:

  • SparkContext
  • Initialisation
  • Reading from Cassandra into Spark RDDs
  • Writing  from Spark RDDs into Cassandra

 

SparkContext

In order to carry out anything using the DataStax Cassandra Connector you will need have a SparxContext available. Both the classes that demo contains for reading/writing to Cassandra will need a SparkContext. So how does that happen? For the demo app this is as simple as using a scala implicit val in scope.

import com.datastax.spark.connector._
/* SimpleApp.scala */

//Driver program creates the SparkContext

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SimpleApp {
  def main(args: Array[String]) {
    implicit val sc = new SparkContext(conf)

    val tableReader = new CassandraTableReader()
    val tableWriter = new CassandraTableWriter()
    ....
    ....
    ....
}


//Reader class needs the SparkContext
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext

class CassandraTableReader()(implicit val sc : SparkContext) {
    ....
    ....
    ....
}


//Writer class needs the SparkContext
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext

class CassandraTableWriter()(implicit val sc : SparkContext) {
    ....
    ....
    ....
}

 

Initialisation

In order to ensure that we have some tables setup to play with there is an initialisation function that the driver program calls. That is essentially this code:

def initialise(): Unit =   {
  val conf = sc.getConf
  CassandraConnector(conf).withSessionDo { session =>

    //create keyspace
    session.execute("DROP KEYSPACE IF EXISTS test")
    session.execute("CREATE KEYSPACE test WITH REPLICATION = " +
      "{'class': 'SimpleStrategy', 'replication_factor': 1 }")

    //create table kv
    session.execute("DROP TABLE IF EXISTS test.kv")
    session.execute("CREATE TABLE test.kv(key text PRIMARY KEY, value int)")
    session.execute("INSERT INTO test.kv(key, value) VALUES ('key1', 1)")
    session.execute("INSERT INTO test.kv(key, value) VALUES ('key2', 2)")


    //create table words
    session.execute("DROP TABLE IF EXISTS test.words")
    session.execute("CREATE TABLE test.words(word text PRIMARY KEY, count int)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('elephant', 1)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('tiger', 12)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('snake', 5)")


    //create table emails
    session.execute("DROP TABLE IF EXISTS test.users ")
    session.execute("CREATE TABLE test.users  (username text PRIMARY KEY, emails SET<text>)")
    session.execute("INSERT INTO test.users  (username, emails) " +
      "VALUES ('sacha', {'sacha@email.com', 'sacha@hotmail.com'})")
    session.execute("INSERT INTO test.users  (username, emails) " +
      "VALUES ('bill', {'bill23@email.com', 'billybob@hotmail.com'})")

    //create address and company
    session.execute("DROP TYPE IF EXISTS test.address")
    session.execute("DROP TABLE IF EXISTS test.companies")
    session.execute("CREATE TYPE test.address (city text, street text, number int)")
    session.execute("CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>)")
    session.execute("INSERT INTO test.companies (name, address) VALUES ('company1', " +
      "{ city : 'London', street : 'broad street', number : 111 })")
    session.execute("INSERT INTO test.companies (name, address) VALUES ('company2', " +
      "{ city : 'Reading', street : 'rushmore road', number : 43 })")
  }
}

 

Reading From Cassandra Into Spark RDDs

This section outlines various ways of reading data from Cassandra.

 

Read Table Values

We have all probably use record sets that support get column values by index or name. The DataStax connector also supports this, where you use a CassandraRow to get values.

 Assuming we have this table

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

Here is an example

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def readTestTableValues(): Unit = {
  println(s"In method : readTestTableValues")
  val rdd = sc.cassandraTable("test", "kv")
  val count = rdd.count
  val first = rdd.first()
  val sum = rdd.map(_.getInt("value")).sum
  println(s"============ RDD COUNT $count")
  println(s"============ RDD FIRST $first")
  println(s"============ RDD SUM $sum")
}

Foreach Table Values

We can also run a foreach loop against the values we retrieve from Cassandra.

Assuming we have this table

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

Here is an example of that

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def foreachTestTableValues(): Unit = {
  println(s"In method : foreachTestTableValues")
  val rdd = sc.cassandraTable("test", "kv")
  rdd.foreach(println)
}

Get Column Attributes

The CassandraRow object also exposes a number of useful attributes such as columnNames, size which we can use.

Assuming we have this table

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getColumnAttributes(): Unit = {
  println(s"In method : getColumnValue")
  val rdd = sc.cassandraTable("test", "kv")
  val firstRow = rdd.first
  val colNames = firstRow.columnNames
  val colSize = firstRow.size
  val firstKey = firstRow.get[String]("key")
  val firstValue = firstRow.get[Int]("value")
  println(s"============ RDD COLUMN NAMES $colNames")
  println(s"============ RDD COLUMN SIZE $colSize")
  println(s"============ RDD FIRST KEY $firstKey")
  println(s"============ RDD FISRT $firstValue")

}

Get SETs

Cassandra supports SETs of objects within columns. Say we had this table definition in Cassandra

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) .....

Where it can be seen that we store a set<text> to store emails. How can we read sets? Luckily this is also very easy. Here is an example:

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getSets(): Unit = {
  println(s"In method : getSets")
  val rdd = sc.cassandraTable("test", "users")
  val row = rdd.first()
  PrintHelper.printIt("getSets", "List[String]", row.get[List[String]]("emails"))
  PrintHelper.printIt("getSets", "IndexedSeq[String]", row.get[IndexedSeq[String]]("emails"))
  PrintHelper.printIt("getSets", "Seq[String]", row.get[Seq[String]]("emails"))
  PrintHelper.printIt("getSets", "Set[String]", row.get[Set[String]]("emails"))
}

Get UDTs

Cassandra also supports User Defined Types (UDTs) as such the DataStax connector offers support for reading this

Assuming we have a table definition that looks like this

CREATE TYPE test.address (
    city text,
    street text,
    number int
)


CREATE TABLE test.companies (
    name text PRIMARY KEY,
    address frozen<address>
) .....

We can then use this code to read from this table, and its UDT (address)

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getUDT(): Unit = {
  println(s"In method : getUDT")
  val rdd = sc.cassandraTable("test", "companies")
  val row = rdd.first()
  val address: UDTValue = row.getUDTValue("address")
  PrintHelper.printIt("getUDT", "city", address.getString("city"))
  PrintHelper.printIt("getUDT", "street", address.getString("street"))
  PrintHelper.printIt("getUDT", "number", address.getString("number"))
}

Foreach Using Projections

Another popular idea is to use projections where we create new objects containing only the property/fields we are interested in. This is like a select(x => new { a = someProp, b = someOtherProp } type LINQ expression in .NET

Lets see an example of this shall we, where we query a table that has more columns than we want, but we specify via the projection that we ONLY want to use the "username" column

Lets assume we have this table

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def foreachSelectedTableColumnValues(): Unit = {
  println(s"In method : foreachSelectedTableColumnValues")

  val filteredColumnsRdd = sc.cassandraTable("test", "users")
    .select("username")

  filteredColumnsRdd.foreach(println)

  val row = filteredColumnsRdd.first()
  PrintHelper.printIt("foreachSelectedTableColumnValues", "username", row.getString("username"))
}

Foreach Using Server Side Filtering

Another thing that one would expect to be able to do is to perform a server side filter (think where clause in SQL land) which would be applied at the Cassandra database level.

Lets assume we have this table

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

This is actually fairly easy to do, which we can do as follows

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def foreachFilteredTableColumnValues(): Unit = {
  println(s"In method : foreachFilteredTableColumnValues")

  val filteredColumnsRdd = sc.cassandraTable("test", "users")
    .select("username")
    .where("username = ?", "bill")

  filteredColumnsRdd.foreach(println)

  val row = filteredColumnsRdd.first()
  PrintHelper.printIt("foreachFilteredTableColumnValues", "username", row.getString("username"))
}

Aggregation (Count)

Aggregation is also a typical use case, things like count/sum. This is not something you should try and do yourself you should most definately use the DataStax aggregation methods, as these may need to hit all the nodes in the Cassandra cluster to perform the aggregation. So leave this to the experts.

Lets assume we have this table

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

Anyway here is an example of count, where we use the cassadraCount method that the DataStax connector gives us.

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def tableRowCount(): Unit = {
  println(s"In method : foreachTableRowCount")

  val count = sc.cassandraTable("test", "users")
    .select("username")
    .cassandraCount()

  PrintHelper.printIt("foreachTableRowCount", "all users count", count)
}

Foreach Using Tuples

From time to time we may need to just grab a couple of columns and we don't want to create actual classes for these reads. Tuples are a great fit in these cases, and luckily this is well supported too

Assuming we have  this table

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....
//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsTuples(): Unit = {
  println(s"In method : foreachTableRowAsTuples")

  val rdd = sc.cassandraTable[(String, Int)]("test", "words")
    .select("word", "count");
  val items = rdd.take(rdd.count().asInstanceOf[Int])
  items.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
    "tuple(String, Int)", tuple))

  val rdd2 = sc.cassandraTable[(Int, String)]("test", "words")
    .select("count", "word")
  val items2 = rdd2.take(rdd2.count().asInstanceOf[Int])
  items2.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
    "tuple(Int, String)", tuple))
}

Foreach Using Case Classes

If however the reads we need to do are a bit more permanent we could use classes to represent the retrieved data. Case classes are good for that.

Assuming we have  this table

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) .... 

Here is an example

case class WordCount(word: String, count: Int)


//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsCaseClasses(): Unit = {
  println(s"In method : foreachTableRowAsCaseClasses")

  val items = sc.cassandraTable[WordCount]("test", "words")
    .select("word", "count").take(3)

  items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClasses",
    "WordCount(word : String, count : Int)", wc))
}

Column Aliases

Aliasing of columns may be quite useful. Here is an example of that, where we use the "select" and "as".

Assuming we have this table:

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsCaseClassesUsingColumnAliases(): Unit = {
  println(s"In method : foreachTableRowAsCaseClassesUsingColumnAliases")

  val items = sc.cassandraTable[WordCount]("test", "kv")
    .select("key" as "word", "value" as "count").take(1)

  items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClassesUsingColumnAliases",
    "WordCount(word : String, count : Int)", wc))
}

 

Writing From Spark RDDs Into Cassandra

This section outlines various ways of writing data to Cassandra.

 

Saving Collection Of Tuples

Scala supports the concept of a tuple which may be something like this (1,"cat") which would be a tuple of  (Int,String). Tuples can have variable lengths, but for this demo we will be sticking with tuples with an arrity of 2.

So if had a table definition that looked like this

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....

We would then be able to save our tuple (with arrity of 2) to this table as follows:

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveCollectionOfTuples(): Unit = {
  println(s"In method : saveCollectionOfTuples")

  val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
  collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}

 

Saving Collection Of Case Classes

Scala also has the concept of case classes which are classes that are kind of magical and implement a whole bunch of stuff for us (such as HasCode, Equals etc etc). So these are quite often nicer to work with than Tuples. As they are stronly typed well known classes.

Say we have this scala case class

case class WordCount(word: String, count: Int)

And we had a table definition that looked like this

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....

We could then save a collection of these using this code

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveCollectionOfCaseClasses(): Unit = {
  println(s"In method : saveCollectionOfCaseClasses")

  val collection = sc.parallelize(Seq(WordCount("shark", 50), WordCount("panther", 60)))
  collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}

 

Saving Collection Of User Defined Types (UDTs)

Cassandra also supports the notion of UDTs, which can be seen in the table definition below where we have a "address" UDT column

CREATE TABLE test.companies (
    name text PRIMARY KEY,
    address frozen<address>
) ....

So we should be able to store these UDTs using Scala. The way we work with UDTs in Scala is by using case classes. So for this example we would have the following case class

case class Address(city: String, street: String, number: Int)

Which we could then save to Cassandra like this:

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveUDT(): Unit = {
  println(s"In method : saveUDT")

  val address = Address(city = "Oakland", street = "Broadway", number = 90210 )
  val col = Seq(("Oakland cycles", address))
  sc.parallelize(col).saveToCassandra("test", "companies", SomeColumns("name", "address"))
}

 

That's It

So that's it, I have now given a run through on how to use Apache Spark to perform general purpose grid computing. I have also shown you how to install a local cassandra instance, and this article talked you through how to use the DataStax cassandra spark connector, to save/retreive data to/from Cassandra into into/from Spark Resilient Distributed Datasets.

I would urge you all to try these tools out for yourself. Cassandra + Spark are tools that are becoming very much in demand certainly in the UK anyway. So from that point of view it is a no brainer.

 

 

 

 

 

 

 

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here