In this post I will review Scala in a little more depth, and
attempt to demonstrate why it is a natural fit to use with Spark in the context
of distributed systems.
This post is derived from what I learned in particular in this
video, as well different other places, and aims to capture and reconcile
this knowledge.
Why is Scala a natural fit for distributed programming?
Scala offers concurrency programming, Java interoperability
and functional programming out-of-the-box. In addition, the Collections API is
a first-class citizen in Scala, with its comprehensive list of available data
structures, ability to perform functional transformations, and immutability.
Another perk vis-à-vis distributed programming is that Scala can essentially go
from sequential, to parallel, to distributed programming in the same API.
Here is an example:
Example of sequential code (in Scala):
Scala> List(1,3,5,7).map(_ * 2)
Res0: List[Int] = List(2,6,10,14)
Map() (a pure Scala
API) is inherently a parallizable operation. It divides and conquers the data
into splits and apply the map algorithm on to them.
Example of parallel code (in Scala):
Scala> List(1,3,5,7).par.map(_ * 2)
Res0: scala.collection.parallel.immutable.ParSeq[Int]=
ParVector(2,6,10,14)
Par() in Scala
lets you parallelize
your collection on your machine cores, essentially swapping out the
sequential work to core-distributed processing on that collection.
Example of distributed code (in Spark).
Scala> intRDD.map(_ * 2).take(100)
The code is essentially the same, but the collection is an
RDD this time. An RDD can be obtained from applying parallelize() (which is a Spark API, different from par() ) onto a simple Scala collection,
this time taking advantage of the distributed environment. See the section on
Laziness for an example.
Also, an action (take() in our example) needs to be taken on
the data in order to get results, due to Scala’s inherent laziness (more on
this later).
In distributed mode, map()
produces a new RDD from the result.
Map functions must be serializatble over the network. Note that RDD is immutable, i.e. its state
cannot be changed over time. It can be discarded if needed to free up some
memory. Note that an RDD has the following properties: immutability,
iterability, serializability, distributed-mode, and laziness.
Spark operations
Within an RDD:
Map, filter, groupBy, sample
Cross RDDs:
RDD actions:
Reduce, count, collect, take, foreach
RDD Optimizations:
Coalesce (similar to SQL’s operand), pipe, repartition
Laziness of Scala
Scala is inherently lazy; that is, the computation is not triggered
until you ask for results. The key to laziness is that the tail of the data is
not evaluated.
For example, a stream is a lazy list:
Scala> List(1,3,5,7).toStream
Res0: scala.collection.immutable.Stream[Int]= Stream(1,?)
Scala> Res0.map(_ * 2)
Res1:
scala.collection.immutable.Stream[Int]= Stream(2,?)
i.e. the tail is not evaluated until you ask for it (via for
example toList ).
This is really useful in Spark. It allows it to do as little
work as possible; less memory is utilized for intermediate results. Also, Spark
can optimize the execution plan when all the transformations are known. E.g.,
all the map steps can be executed within the same phase, very much like Tez does in the M/R world.
As an aside, this is also a problem: if the computation runs
as a highly optimized bundle, it does not make it easy to debug
it.. Thankfully, some Spark profilers are starting to hit the
market.
parallelize will convert its argument into an RDD. Example:
Scala> sparkContext. parallelize(1 to 10).map( x =>
x_ * i)
Res6: MappedRDD[2]
i.e. you don’t get the result, just the mapped RDD to be
evaluated. Each transformation is a wrapper RDD, and is a step within Spark’s
lineage for recovery purposes.
As said earlier, an RDD action function such as count will
force the computation.
Types of caching
Different configuration settings let you can serialize to
memory (default mode) and/or disk, or a combination of the two.
On the other hand Tachyon,
a memory-centric distributed file system,
is an experimental configuration
mode that works off-heap and is resilient to worker failures and is showing
lots of promises.
Spark SQL boasts a different caching mechanism altogether,
working as an efficient columnar compressed in memory cache (like dictionary
compression, a la Parquet/ORC). It uses less memory than Java serialization,
and is faster.
Also, Spark caching has a TTL that is configurable for data,
after which old references are cleaned up.
Grouping and sorting
How to implement TopK in pure Scala:
Scala> val words = Seq(“Apple”, "Bear",
"Tahoe", "a", "b", "c",
"Apple", "Apple", "Bear", "c",
"c")
Words:
Seq[String] = List(Apple, Bear, Tahoe, a, b, c, Apple, Apple, Bear, c, c)
Scala> val
b = words.groupBy(x => x)
b:
scala.collection.immutable.Map[String,Seq[String]] = Map(Bear -> List(Bear,
Bear), a -> List(a), Apple -> List(Apple, Apple, Apple), b -> List(b),
c -> List(c, c, c), Tahoe -> List(Tahoe))
Scala> val
c = b.map{ case (word, words) => (word, words.length) }
c:
scala.collection.immutable.Map[String,Int] = Map(Bear -> 2, a -> 1, Apple
-> 3, b -> 1, c -> 3, Tahoe -> 1)
Scala> val
c1 = c.toSeq
c1:
Seq[(String, Int)] = ArrayBuffer((Bear,2), (a,1), (Apple,3), (b,1), (c,3),
(Tahoe,1))
scala> val
d = c1.sortBy(_._2).reverse.take(2)
d:
Seq[(String, Int)] = ArrayBuffer((c,3), (Apple,3))
I.e. starting from a sequence of words, first create a Map
by way of a group by. Map words to their length, then sort them by length
(argument #2 in the new Sequence) and reverse. Here we are taking the top 2.
In Spark – method 1
scala> val
words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe",
"a", "b", "c", "Apple",
"Apple", "Bear", "c", "c") //sc refers
to SparkContext
words:
org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at
<console>:12
scala> val
b = words.map((_, 1)) //transforms each word into a tuple, K,V where K=word,
V=1
b:
org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]
scala> val
c = b.groupByKey
c:
org.apache.spark.rdd.RDD[(String, Iterable[Int])] = MappedValuesRDD[5]
scala> val
d.map // counts and sorts
{ case (word,
counts) => (words, counts.sum) }.
sortBy(_._2,
false).take(2) //sortBy() is another network shuffle
First, parallelize the Seq into an RDD. Then transform each
word into a (K,V) tuple where V=1. Group all instances of same word on the same
node using groupByKey; network distributed shuffle happens. Then, using a Map,
count and sort the entries.
Caution:
It’s not “words.map(_,1) “
scala> val
b = words.map(_,1)
<console>:14:
error: missing parameter type for expanded function
((x$1) =>
words.map(x$1, 1))
val b =
words.map(_, 1)
^
But word.map((_,1)), because that’s equivalent to words.map(
x => (x,1). I.e.:
words:
org.apache.spark.rdd.RDD[String] = "Apple", "Bear",
"c", ParallelCollectionRDD[0] at parallelize at <console
scala> val
b = words.map((_, 1))
b:
org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]
In Spark – method 2
scala> val
words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe",
"a", "b", "c", "Apple",
"Apple", "Bear", "c", "c") //sc refers
to SparkContext
//
parallelize() converts the Seq into an RDD
words:
org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at
<console>:12
scala> val
b = words.map((_,1)).reduceByKey(_ + _) //reduceByKey = groupByKey + local reduce
b:
org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14]
scala> val
c = b.map { case (word, count) => (count, word) }
c:
org.apache.spark.rdd.RDD[(Int, String)] = MappedRDD[15]
scala>
c.top(2) //top avoids the global sort by taking the top items from each node,
and merging them at the driver.
res4:
Array[(Int, String)] = Array((3,c), (3,Apple))
Similar, but more optimized, via the use of reduceByKey and
top.
Perform some ETL
Let’s now review how to perform a simple ETL use case.
In Scala:
scala> io.Source.fromFile(/tmp/myfile.csv”).getLines.map(_.split(“,”)).
map(a => (a(0),,
a(2), a(5), a(6))),
filter(_._2 contains
“2014”),
take(20)
Iterator(String,
String, String, String)
Read file, extract a few fields, filter by date
Parallel ETL in Scala:
scala> (1
to 4).par.flatMaps(a => io.Source.FromFile(“/tmp/myfile.csv”).getLines).
map(_.split(“,”)).map(a
=> (a(0),, a(2), a(5), a(6))),
filter(_._2 contains
“2014”),
take(20)
// Breaks down the
file into multiple chunks, converts this into a big stream, and then runs it in
parallel
ParSeq(String, String,
String, String)
Spark ETL
scala> sc.textFile((“/tmp/myfile.csv”).map(_.split(“,”)).map(a
=> (a(0),, a(2), a(5), a(6))),
filter(_._2 contains
“2014”),
take(20)
Code is the same! Except for loading the data initially.
Summary
This should provide a good explanation about why Scala and
Spark are a good match. A nice tutorial is here: A good tutorial on Spark: http://databricks.com/spark/developer-resources. Of note, a nice way to work with Spark, a la ipython in a notebook fashion,
is to use this:
Great post, Matt!
ReplyDeleteBTW, the URL for that Spark tutorial has changed. It is at http://training.databricks.com/workshop/itas_workshop.pdf
And in general we keep those kinds of resources linked from
http://databricks.com/spark/developer-resources
thanks for reviewing, Paco!
DeleteI will change the link.