Series Links
Table Of Contents
Last time I talked about how to install Apache Cassandra locally as a single node installation. As I said before this is not what you want to do for a production installation, where you would use Cassandra cluster, but for learning one node is just fine.
I have also talked about Apache Sparkp which is a general purpose grid computation engine. Within this article I will show you how you can use Apache Spark with Apache Cassandra to save and retrieve data from Cassandra from/into Apache Spark Resilient Distributed Datasets (RDDs).
Where Is The Code?
You can grab the code from my github repository : https://github.com/sachabarber/CassandraSparkDemo
So as I just stated Apache Spark is a general purpose computation engine, which works by having worker nodes, and RDDs. Where the RDDs are executed in parrallel accross the available worker nodes. There is also the concept of a driver program which is expected to perform certain actions to execute the compuatations, which will result in the worker nodes executing their portion of the calcualtions, and handing that back to the driver program where it will be assembled as part of the final result.
So the driver program will end up with some result, it is not unreasonable to think that you may want to store the results of some expensive calculation. Conversly we may wish to retrieve some previously stored values.
This is where Cassandra fits in. We can use Cassandra to save the results of a RDD and we can also use Cassandra to populate an RDD.
I hope this makes some sense. I think to be honest reading the first 2 articles in this series would be a good idea to get some grounding into what I am talking aboug here. I am telling you these technologies are AWESOME and if you are like me and you consider yourself a polyglot the fact that they are JVM based should not phase you. They are AWESOME and you should try and learn them in my humble opinion.
Anyway the rest of this article will talk you through what the various saving/retrievinf operations look like.
In order to deal with the saving/retrieving from Cassandra you will need to use the DataStax Cassandra Connector, which is a MUST. You can grab that from Maven Central Repository using a SBT (SBT is the scala build tool) file something like this:
name := "SparkCassandraDemoApp"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies ++=Seq(
"org.apache.spark" % "spark-core_2.10" % "1.4.1",
"com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.4.0"
)
This will bring in the Apache spark libraries, and also the Apache Cassandra Connector.
NOTE
The version of Scala and Spark/Cassandra connector are quite dependant so make sure you use the correct ones.
You can read more about the Apache Cassandra Connector here, it has great docs:
https://github.com/datastax/spark-cassandra-connector
As with normal Apache Spark development, we have a what is known as a driver program. This is the portion of code that orchestrates the distributed calculations across the workers normally (via the use of .Parallelize).
For this demo app I have not give you instructions of how to create a Apache Spark cluster or a Apache Cassandra cluster, but we will still parallalize occassionally, which for this demo simply means using the available cores on your machine running the driver program.
That said most of the demo code contained in this article will simple be showing you how to best use the DataStax Cassandra Connector for Spark.
I am using Scala as my labguage of choice here, where the following is the complete driver program:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf(true)
.setAppName("Simple Application")
.set("spark.cassandra.connection.host", "127.0.0.1")
conf.setMaster("local[2]")
implicit val sc = new SparkContext(conf)
val tableReader = new CassandraTableReader()
val tableWriter = new CassandraTableWriter()
tableWriter.initialise()
tableReader.readTestTableValues()
tableWriter.writeTestTableValues()
tableReader.readTestTableValues()
tableReader.foreachTestTableValues()
tableReader.getColumnAttributes()
tableReader.getSets()
tableReader.getUDT()
tableReader.foreachSelectedTableColumnValues()
tableReader.foreachFilteredTableColumnValues()
tableReader.foreachTableRowCount()
tableReader.foreachTableRowAsTuples()
tableReader.foreachTableRowAsCaseClasses()
tableReader.foreachTableRowAsCaseClassesUsingColumnAliases()
tableReader.foreachTableRowAsTuples()
tableWriter.saveCollectionOfTuples()
tableReader.foreachTableRowAsTuples()
tableWriter.saveCollectionOfCaseClasses()
tableWriter.saveUDT()
println("====== DONE ======")
readLine()
}
}
As can be seen there is nothing to fancy going on in there, we simply run some initialisation code and then we either call a function that expects to read some data from Cassandra into a Spark RDD, or writes some data from a Spark RDD into Cassandra.
As such I thought it would be a good idea to split the rest of the article up into 4 sections:
- SparkContext
- Initialisation
- Reading from Cassandra into Spark RDDs
- Writing from Spark RDDs into Cassandra
In order to carry out anything using the DataStax Cassandra Connector you will need have a SparxContext available. Both the classes that demo contains for reading/writing to Cassandra will need a SparkContext. So how does that happen? For the demo app this is as simple as using a scala implicit val in scope.
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
object SimpleApp {
def main(args: Array[String]) {
implicit val sc = new SparkContext(conf)
val tableReader = new CassandraTableReader()
val tableWriter = new CassandraTableWriter()
....
....
....
}
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext
class CassandraTableReader()(implicit val sc : SparkContext) {
....
....
....
}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext
class CassandraTableWriter()(implicit val sc : SparkContext) {
....
....
....
}
In order to ensure that we have some tables setup to play with there is an initialisation function that the driver program calls. That is essentially this code:
def initialise(): Unit = {
val conf = sc.getConf
CassandraConnector(conf).withSessionDo { session =>
session.execute("DROP KEYSPACE IF EXISTS test")
session.execute("CREATE KEYSPACE test WITH REPLICATION = " +
"{'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("DROP TABLE IF EXISTS test.kv")
session.execute("CREATE TABLE test.kv(key text PRIMARY KEY, value int)")
session.execute("INSERT INTO test.kv(key, value) VALUES ('key1', 1)")
session.execute("INSERT INTO test.kv(key, value) VALUES ('key2', 2)")
session.execute("DROP TABLE IF EXISTS test.words")
session.execute("CREATE TABLE test.words(word text PRIMARY KEY, count int)")
session.execute("INSERT INTO test.words(word, count) VALUES ('elephant', 1)")
session.execute("INSERT INTO test.words(word, count) VALUES ('tiger', 12)")
session.execute("INSERT INTO test.words(word, count) VALUES ('snake', 5)")
session.execute("DROP TABLE IF EXISTS test.users ")
session.execute("CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>)")
session.execute("INSERT INTO test.users (username, emails) " +
"VALUES ('sacha', {'sacha@email.com', 'sacha@hotmail.com'})")
session.execute("INSERT INTO test.users (username, emails) " +
"VALUES ('bill', {'bill23@email.com', 'billybob@hotmail.com'})")
session.execute("DROP TYPE IF EXISTS test.address")
session.execute("DROP TABLE IF EXISTS test.companies")
session.execute("CREATE TYPE test.address (city text, street text, number int)")
session.execute("CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>)")
session.execute("INSERT INTO test.companies (name, address) VALUES ('company1', " +
"{ city : 'London', street : 'broad street', number : 111 })")
session.execute("INSERT INTO test.companies (name, address) VALUES ('company2', " +
"{ city : 'Reading', street : 'rushmore road', number : 43 })")
}
}
This section outlines various ways of reading data from Cassandra.
We have all probably use record sets that support get column values by index or name. The DataStax connector also supports this, where you use a CassandraRow to get values.
Assuming we have this table
CREATE TABLE test.kv (
key text PRIMARY KEY,
value int
) ....
Here is an example
def readTestTableValues(): Unit = {
println(s"In method : readTestTableValues")
val rdd = sc.cassandraTable("test", "kv")
val count = rdd.count
val first = rdd.first()
val sum = rdd.map(_.getInt("value")).sum
println(s"============ RDD COUNT $count")
println(s"============ RDD FIRST $first")
println(s"============ RDD SUM $sum")
}
We can also run a foreach loop against the values we retrieve from Cassandra.
Assuming we have this table
CREATE TABLE test.kv (
key text PRIMARY KEY,
value int
) ....
Here is an example of that
def foreachTestTableValues(): Unit = {
println(s"In method : foreachTestTableValues")
val rdd = sc.cassandraTable("test", "kv")
rdd.foreach(println)
}
The CassandraRow object also exposes a number of useful attributes such as columnNames, size which we can use.
Assuming we have this table
CREATE TABLE test.kv (
key text PRIMARY KEY,
value int
) ....
def getColumnAttributes(): Unit = {
println(s"In method : getColumnValue")
val rdd = sc.cassandraTable("test", "kv")
val firstRow = rdd.first
val colNames = firstRow.columnNames
val colSize = firstRow.size
val firstKey = firstRow.get[String]("key")
val firstValue = firstRow.get[Int]("value")
println(s"============ RDD COLUMN NAMES $colNames")
println(s"============ RDD COLUMN SIZE $colSize")
println(s"============ RDD FIRST KEY $firstKey")
println(s"============ RDD FISRT $firstValue")
}
Cassandra supports SETs of objects within columns. Say we had this table definition in Cassandra
CREATE TABLE test.users (
username text PRIMARY KEY,
emails set<text>
) .....
Where it can be seen that we store a set<text> to store emails. How can we read sets? Luckily this is also very easy. Here is an example:
def getSets(): Unit = {
println(s"In method : getSets")
val rdd = sc.cassandraTable("test", "users")
val row = rdd.first()
PrintHelper.printIt("getSets", "List[String]", row.get[List[String]]("emails"))
PrintHelper.printIt("getSets", "IndexedSeq[String]", row.get[IndexedSeq[String]]("emails"))
PrintHelper.printIt("getSets", "Seq[String]", row.get[Seq[String]]("emails"))
PrintHelper.printIt("getSets", "Set[String]", row.get[Set[String]]("emails"))
}
Cassandra also supports User Defined Types (UDTs) as such the DataStax connector offers support for reading this
Assuming we have a table definition that looks like this
CREATE TYPE test.address (
city text,
street text,
number int
)
CREATE TABLE test.companies (
name text PRIMARY KEY,
address frozen<address>
) .....
We can then use this code to read from this table, and its UDT (address)
def getUDT(): Unit = {
println(s"In method : getUDT")
val rdd = sc.cassandraTable("test", "companies")
val row = rdd.first()
val address: UDTValue = row.getUDTValue("address")
PrintHelper.printIt("getUDT", "city", address.getString("city"))
PrintHelper.printIt("getUDT", "street", address.getString("street"))
PrintHelper.printIt("getUDT", "number", address.getString("number"))
}
Another popular idea is to use projections where we create new objects containing only the property/fields we are interested in. This is like a select(x => new { a = someProp, b = someOtherProp } type LINQ expression in .NET
Lets see an example of this shall we, where we query a table that has more columns than we want, but we specify via the projection that we ONLY want to use the "username" column
Lets assume we have this table
CREATE TABLE test.users (
username text PRIMARY KEY,
emails set<text>
) ....
def foreachSelectedTableColumnValues(): Unit = {
println(s"In method : foreachSelectedTableColumnValues")
val filteredColumnsRdd = sc.cassandraTable("test", "users")
.select("username")
filteredColumnsRdd.foreach(println)
val row = filteredColumnsRdd.first()
PrintHelper.printIt("foreachSelectedTableColumnValues", "username", row.getString("username"))
}
Another thing that one would expect to be able to do is to perform a server side filter (think where clause in SQL land) which would be applied at the Cassandra database level.
Lets assume we have this table
CREATE TABLE test.users (
username text PRIMARY KEY,
emails set<text>
) ....
This is actually fairly easy to do, which we can do as follows
def foreachFilteredTableColumnValues(): Unit = {
println(s"In method : foreachFilteredTableColumnValues")
val filteredColumnsRdd = sc.cassandraTable("test", "users")
.select("username")
.where("username = ?", "bill")
filteredColumnsRdd.foreach(println)
val row = filteredColumnsRdd.first()
PrintHelper.printIt("foreachFilteredTableColumnValues", "username", row.getString("username"))
}
Aggregation is also a typical use case, things like count/sum. This is not something you should try and do yourself you should most definately use the DataStax aggregation methods, as these may need to hit all the nodes in the Cassandra cluster to perform the aggregation. So leave this to the experts.
Lets assume we have this table
CREATE TABLE test.users (
username text PRIMARY KEY,
emails set<text>
) ....
Anyway here is an example of count, where we use the cassadraCount method that the DataStax connector gives us.
def tableRowCount(): Unit = {
println(s"In method : foreachTableRowCount")
val count = sc.cassandraTable("test", "users")
.select("username")
.cassandraCount()
PrintHelper.printIt("foreachTableRowCount", "all users count", count)
}
From time to time we may need to just grab a couple of columns and we don't want to create actual classes for these reads. Tuples are a great fit in these cases, and luckily this is well supported too
Assuming we have this table
CREATE TABLE test.words (
word text PRIMARY KEY,
count int
) ....
def foreachTableRowAsTuples(): Unit = {
println(s"In method : foreachTableRowAsTuples")
val rdd = sc.cassandraTable[(String, Int)]("test", "words")
.select("word", "count");
val items = rdd.take(rdd.count().asInstanceOf[Int])
items.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
"tuple(String, Int)", tuple))
val rdd2 = sc.cassandraTable[(Int, String)]("test", "words")
.select("count", "word")
val items2 = rdd2.take(rdd2.count().asInstanceOf[Int])
items2.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
"tuple(Int, String)", tuple))
}
If however the reads we need to do are a bit more permanent we could use classes to represent the retrieved data. Case classes are good for that.
Assuming we have this table
CREATE TABLE test.words (
word text PRIMARY KEY,
count int
) ....
Here is an example
case class WordCount(word: String, count: Int)
def foreachTableRowAsCaseClasses(): Unit = {
println(s"In method : foreachTableRowAsCaseClasses")
val items = sc.cassandraTable[WordCount]("test", "words")
.select("word", "count").take(3)
items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClasses",
"WordCount(word : String, count : Int)", wc))
}
Aliasing of columns may be quite useful. Here is an example of that, where we use the "select" and "as".
Assuming we have this table:
CREATE TABLE test.kv (
key text PRIMARY KEY,
value int
) ....
def foreachTableRowAsCaseClassesUsingColumnAliases(): Unit = {
println(s"In method : foreachTableRowAsCaseClassesUsingColumnAliases")
val items = sc.cassandraTable[WordCount]("test", "kv")
.select("key" as "word", "value" as "count").take(1)
items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClassesUsingColumnAliases",
"WordCount(word : String, count : Int)", wc))
}
This section outlines various ways of writing data to Cassandra.
Scala supports the concept of a tuple which may be something like this (1,"cat") which would be a tuple of (Int,String). Tuples can have variable lengths, but for this demo we will be sticking with tuples with an arrity of 2.
So if had a table definition that looked like this
CREATE TABLE test.words (
word text PRIMARY KEY,
count int
) ....
We would then be able to save our tuple (with arrity of 2) to this table as follows:
def saveCollectionOfTuples(): Unit = {
println(s"In method : saveCollectionOfTuples")
val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}
Scala also has the concept of case classes which are classes that are kind of magical and implement a whole bunch of stuff for us (such as HasCode, Equals etc etc). So these are quite often nicer to work with than Tuples. As they are stronly typed well known classes.
Say we have this scala case class
case class WordCount(word: String, count: Int)
And we had a table definition that looked like this
CREATE TABLE test.words (
word text PRIMARY KEY,
count int
) ....
We could then save a collection of these using this code
def saveCollectionOfCaseClasses(): Unit = {
println(s"In method : saveCollectionOfCaseClasses")
val collection = sc.parallelize(Seq(WordCount("shark", 50), WordCount("panther", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}
Cassandra also supports the notion of UDTs, which can be seen in the table definition below where we have a "address" UDT column
CREATE TABLE test.companies (
name text PRIMARY KEY,
address frozen<address>
) ....
So we should be able to store these UDTs using Scala. The way we work with UDTs in Scala is by using case classes. So for this example we would have the following case class
case class Address(city: String, street: String, number: Int)
Which we could then save to Cassandra like this:
def saveUDT(): Unit = {
println(s"In method : saveUDT")
val address = Address(city = "Oakland", street = "Broadway", number = 90210 )
val col = Seq(("Oakland cycles", address))
sc.parallelize(col).saveToCassandra("test", "companies", SomeColumns("name", "address"))
}
So that's it, I have now given a run through on how to use Apache Spark to perform general purpose grid computing. I have also shown you how to install a local cassandra instance, and this article talked you through how to use the DataStax cassandra spark connector, to save/retreive data to/from Cassandra into into/from Spark Resilient Distributed Datasets.
I would urge you all to try these tools out for yourself. Cassandra + Spark are tools that are becoming very much in demand certainly in the UK anyway. So from that point of view it is a no brainer.