Thursday, February 5, 2015

Navigating from Scala to Spark for distributed programming




In this post I will review Scala in a little more depth, and attempt to demonstrate why it is a natural fit to use with Spark in the context of distributed systems.
This post is derived from what I learned in particular in this video, as well different other places, and aims to capture and reconcile this knowledge.

Why is Scala a natural fit for distributed programming?


Scala offers concurrency programming, Java interoperability and functional programming out-of-the-box. In addition, the Collections API is a first-class citizen in Scala, with its comprehensive list of available data structures, ability to perform functional transformations, and immutability. Another perk vis-à-vis distributed programming is that Scala can essentially go from sequential, to parallel, to distributed programming in the same API.

Here is an example:

Example of sequential code (in Scala):
Scala> List(1,3,5,7).map(_ * 2)
Res0: List[Int] = List(2,6,10,14)


Map() (a pure Scala API) is inherently a parallizable operation. It divides and conquers the data into splits and apply the map algorithm on to them.


Example of parallel code (in Scala):
Scala> List(1,3,5,7).par.map(_ * 2)
Res0: scala.collection.parallel.immutable.ParSeq[Int]= ParVector(2,6,10,14)


Par() in Scala lets you parallelize your collection on your machine cores, essentially swapping out the sequential work to core-distributed processing on that collection.


Example of distributed code (in Spark).
Scala> intRDD.map(_ * 2).take(100)

The code is essentially the same, but the collection is an RDD this time. An RDD can be obtained from applying parallelize() (which is a Spark API, different from par() ) onto a simple Scala collection, this time taking advantage of the distributed environment. See the section on Laziness for an example.
Also, an action (take() in our example) needs to be taken on the data in order to get results, due to Scala’s inherent laziness (more on this later).

In distributed mode, map() produces a new RDD from the result.
Map functions must be serializatble over the network.  Note that RDD is immutable, i.e. its state cannot be changed over time. It can be discarded if needed to free up some memory. Note that an RDD has the following properties: immutability, iterability, serializability, distributed-mode, and laziness.


Spark operations


Within an RDD:
Map, filter, groupBy, sample

Cross RDDs:
Join, Cartesian,  cogroup (similar to Pig’s CoGroup; essentially a groupBy+Join on 2 RDDs)

RDD actions:
Reduce, count, collect, take, foreach

RDD Optimizations:
Coalesce (similar to SQL’s operand), pipe, repartition


Laziness of Scala


Scala is inherently lazy; that is, the computation is not triggered until you ask for results. The key to laziness is that the tail of the data is not evaluated.
For example, a stream is a lazy list:

Scala> List(1,3,5,7).toStream
Res0: scala.collection.immutable.Stream[Int]= Stream(1,?)
Scala> Res0.map(_ * 2)
 Res1: scala.collection.immutable.Stream[Int]= Stream(2,?)
i.e. the tail is not evaluated until you ask for it (via for example toList ).

This is really useful in Spark. It allows it to do as little work as possible; less memory is utilized for intermediate results. Also, Spark can optimize the execution plan when all the transformations are known. E.g., all the map steps can be executed within the same phase, very much like Tez does in the M/R world.
As an aside, this is also a problem: if the computation runs as a highly optimized bundle, it does not make it easy to debug it.. Thankfully, some Spark profilers are starting to hit the market.

parallelize will convert its argument into an RDD. Example:

Scala> sparkContext. parallelize(1 to 10).map( x => x_ * i)
 Res6: MappedRDD[2]

i.e. you don’t get the result, just the mapped RDD to be evaluated. Each transformation is a wrapper RDD, and is a step within Spark’s lineage for recovery purposes.
As said earlier, an RDD action function such as count will force the computation.


Types of caching


Different configuration settings let you can serialize to memory (default mode) and/or disk, or a combination of the two.
On the other hand Tachyon, a memory-centric distributed file system,
 is an experimental configuration mode that works off-heap and is resilient to worker failures and is showing lots of promises.

Spark SQL boasts a different caching mechanism altogether, working as an efficient columnar compressed in memory cache (like dictionary compression, a la Parquet/ORC). It uses less memory than Java serialization, and is faster.

Also, Spark caching has a TTL that is configurable for data, after which old references are cleaned up.


Grouping and sorting


How to implement TopK  in pure Scala:

Scala> val words = Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c")
Words: Seq[String] = List(Apple, Bear, Tahoe, a, b, c, Apple, Apple, Bear, c, c)

Scala> val b = words.groupBy(x => x)
b: scala.collection.immutable.Map[String,Seq[String]] = Map(Bear -> List(Bear, Bear), a -> List(a), Apple -> List(Apple, Apple, Apple), b -> List(b), c -> List(c, c, c), Tahoe -> List(Tahoe))

Scala> val c = b.map{ case (word, words) => (word, words.length) }
c: scala.collection.immutable.Map[String,Int] = Map(Bear -> 2, a -> 1, Apple -> 3, b -> 1, c -> 3, Tahoe -> 1)

Scala> val c1 = c.toSeq
c1: Seq[(String, Int)] = ArrayBuffer((Bear,2), (a,1), (Apple,3), (b,1), (c,3), (Tahoe,1))

scala> val d = c1.sortBy(_._2).reverse.take(2)
d: Seq[(String, Int)] = ArrayBuffer((c,3), (Apple,3))



I.e. starting from a sequence of words, first create a Map by way of a group by. Map words to their length, then sort them by length (argument #2 in the new Sequence) and reverse. Here we are taking the top 2.


In Spark – method 1

scala> val words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c") //sc refers to SparkContext
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val b = words.map((_, 1)) //transforms each word into a tuple, K,V where K=word, V=1
b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]
scala> val c = b.groupByKey
c: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = MappedValuesRDD[5]

scala> val d.map // counts and sorts
{ case (word, counts) => (words, counts.sum) }.
sortBy(_._2, false).take(2) //sortBy() is another network shuffle



First, parallelize the Seq into an RDD. Then transform each word into a (K,V) tuple where V=1. Group all instances of same word on the same node using groupByKey; network distributed shuffle happens. Then, using a Map, count and sort the entries.

Caution:
It’s not “words.map(_,1) “

scala> val b = words.map(_,1)
<console>:14: error: missing parameter type for expanded function
((x$1) => words.map(x$1, 1))
val b = words.map(_, 1)
                  ^


But word.map((_,1)), because that’s equivalent to words.map( x => (x,1). I.e.:

words: org.apache.spark.rdd.RDD[String] = "Apple", "Bear", "c", ParallelCollectionRDD[0] at parallelize at <console
scala> val b = words.map((_, 1))
b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]




In Spark – method 2

scala> val words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c") //sc refers to SparkContext
// parallelize() converts the Seq into an RDD
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> val b = words.map((_,1)).reduceByKey(_ + _) //reduceByKey = groupByKey + local reduce
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14]

scala> val c = b.map { case (word, count) => (count, word) }
c: org.apache.spark.rdd.RDD[(Int, String)] = MappedRDD[15]

scala> c.top(2) //top avoids the global sort by taking the top items from each node, and merging them at the driver.
res4: Array[(Int, String)] = Array((3,c), (3,Apple))

Similar, but more optimized, via the use of reduceByKey and top.


Perform some ETL


Let’s now review how to perform a simple ETL use case.

In Scala:

scala> io.Source.fromFile(/tmp/myfile.csv”).getLines.map(_.split(“,”)).
map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),
take(20)
Iterator(String, String, String, String)
Read file, extract a few fields, filter by date


Parallel ETL in Scala:

scala> (1 to 4).par.flatMaps(a => io.Source.FromFile(“/tmp/myfile.csv”).getLines).
map(_.split(“,”)).map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),
take(20)

// Breaks down the file into multiple chunks, converts this into a big stream, and then runs it in parallel
ParSeq(String, String, String, String)


Spark ETL

scala> sc.textFile((“/tmp/myfile.csv”).map(_.split(“,”)).map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),
take(20)
Code is the same! Except for loading the data initially.

 

Summary


This should provide a good explanation about why Scala and Spark are a good match. A nice tutorial is here: A good tutorial on Spark: http://databricks.com/spark/developer-resources. Of note, a nice way to work with Spark, a la ipython in a notebook fashion, is to use this:


2 comments:

  1. Great post, Matt!

    BTW, the URL for that Spark tutorial has changed. It is at http://training.databricks.com/workshop/itas_workshop.pdf

    And in general we keep those kinds of resources linked from
    http://databricks.com/spark/developer-resources

    ReplyDelete
    Replies
    1. thanks for reviewing, Paco!
      I will change the link.

      Delete

Note: Only a member of this blog may post a comment.