Thursday, February 26, 2015

Hadoop world / Strata 2015 overview

A few notes about Strata 2015

I have been going to Strata for a few years now; so I am pretty familiar with the Hadoop vendors and offerings that are shown. Here are a few general thoughts about the event and what I've noted.


A lot more companies/players in the Big Data space in general. Of note, in addition to the "regulars", there are a lot more niche players, and a few behemoths (HP, Intel, Microsoft) trying to capitalize on Hadoop.

Trending this year

Big data in the cloud

A few companies now offer Hadoop-as-a-service (as well as other frameworks) in the cloud, in addition to IT or application-level features: Altiscale, Qubole, Datameer, etc. Apparently they are all mostly doing good, and there is enough space to accommodate everyone. Heard Qubole in particular is doing good.

Separation of concerns/Specialization of Hadoop tools

It seems like vendors offer either a one-stop shop to Hadoop, like Business Intelligence/Analytics tools (Platfora, Pentaho, etc) with the standard advantages and shortcomings that an off-the-shelf product may imply, or very specialized tools, like data discovery (Tamr), data cleansing (Paxata) or visualization (Zoomdata). Pick your weapon!
Of note: why was Google not there?

Stream processing

More interestingly, batch analytics is becoming commoditized, with a number of tools available to perform these kind of processes. A newer type of application that is proposed is the kind that offers NRT stream processing. Data Torrent, RapidMinder, and especially Interana are amongst these companies. This to counteract the fact that open source tools like Storm and Spark Streaming are not for the faint of heart to implement..

Data discovery

This is a new offering among startups: the ability to auto-discover your sources of data and manage them automatically; what used to be called MDM and CDC, essentially in the "old" datawarehouse world, and that is partially solved via tools like Apache Falcon in the downstream ecosystem of tools. See my post on this.
Instead, these companies (Tamr, Alation, Attivio) offer the ability to expose your data, expose their relationships, all of this by a combination of automation and machine learning tools.

Data Science/Machine Learning

I was stunned by the proliferation of startups around data science: H20, Dato,, Skytree, Dataiku, etc. It seems like there is a lot of redundancy in the space. One company seemingly out of the pack: DataRobot, which apparently won some Kaggle competition.

Of note, but you knew that already: Spark is omnipresent.

My personal Awards

Best T-shirt: Datameer, Databricks
Best toys: DataRobot
Biggest booth for the smallest funding in a company: Tamr

Tuesday, February 17, 2015

Introduction to Prediction.IO, an open-source Machine Learning framework

 What is Prediction.IO in a nutshell?

            Building machine learning an application from scratch is hard; you need to have the ability to work with your own data and train your algorithm with it, build a layer to serve the prediction results, manage the different algorithms you are running, their evaluations, deploy your application in production, manage the dependencies with your other tools, etc. is an open source Machine Learning server that addresses these concerns. It aims to be the “LAMP stack” for data analytics.

Current state of Machine Learning frameworks

            Lets first review some of the tools that are popular currently in the Machine Learning (ML) community. Some widely used tools are: Mahout in the Hadoop ecosystem, MLLib in the Spark community, H2o, DeepLearning4j.

These APIs generally work great and provide implementations of the main ML algorithms. However, what is missing from a general standpoint in order to use them in a Production environment?
-       An integration layer to bring your data sources
-       A framework to roll a prototype into production
-       A simple API to query the results


            Let’s take a classic recommender as an example; usually predictive modeling is based on users’ behaviors to predict product recommendations.

We will convert the data (in Json) into binary Avro format.
// Read training data
val trainingData = sc.textFile(“trainingData.txt”).map(_.split(‘,’) match {..})

which yields something like:
user1 purchases product1, product2
user2 purchases product2

Then build a predictive model with an algorithm:
// collaborative filtering algorithm
val model = ALS.train(trainingData, 10, 20, 0.01)

Then start using the model:
// collaborative filtering algorithm
allUsers.foreach { user => model.recommendProducts(user, 5) }

This recommends 5 products for each user.

This code will work in development environment, but wouldn’t work in production. Why?
- How do you integrate with your existing data?
- How do you unify the data from multiple sources?
- How to deploy a scalable service that responds to dynamic prediction query?
- How do you persist the predictive model, in a distributed environment?
- How to make your storage layer, Spark, and the algorithms talk to each other?
- How to prepare the data for model training?
- How to update the model with new data, without downtime?
- Where does the business logic get added?
- How to make the code configurable, reusable and manageable?
- How do we build these with separation of concern (SOC), like the web development side of things?
- How to make things work in a real time environment?
- How do I customize the recommender on a per-location basis? How to discard data that is out of inventory?
- How about performing different tests on the algorithms you selected?

Prediction IO to the rescue!

Let’s address the above questions. boasts an event server for storage, that collects data (say, from a mobile app, web, etc) in a unified way, from multiple channels.

You can plug multiple engines within; each engine represents a type of prediction problem. Why is that important?
In a Production system, you will typically use multiple engines. I.e. the archetypal example of Amazon: if you bought this, recommend that. But you may also run a different algorithm on the front page for article discovery, and another one for email campaign based on what you browsed for retargeting purposes. does that very well.

How to deploy a predictive model service? In a typical mobile app, the user behavior data will send user actions. Your prediction model will be trained on these, and the engine will be deployed as a Web service. So now your mobile app can communicate wit h the engine via a REST API interface. If this was not sufficient, there are other SDKs available in different languages. The engine will return a list of results in JSON format. interaction w/ a mobile app manages the dependencies of Spark and Hbase and the algorithms automatically. You can launch it with a one-line command.

When using the framework, it doesn’t act as a a black box – is one of the most popular ML product on Github (5000+ contributors).

The framework is open-source, and is written in Scala, to take advantage of the JVM support and is a natural fit for distributed computing. R in comparison is not so easy to scale. Also uses Spark, currently one of the best-distributed system framework to use, and is proven to scale in Production. Algorithms are implemented via MLLib. Lastly, events are store in Apache HBase as the NoSQL storage layer.

Preparing the data for model training is a matter of running the Event server (launched via (‘pio eventserver’) and interacting with it, by defining the action (i.e. change the product price), product (i.e. give a rating A for product x), product name, attribute name, all in free format. 

Building the engine is made easy because offers templates for recommendation and classification. The engine is built on an MVC architecture, and has the following components:

- Data source: data comes from any data source, and is preprocessed automatically into the desired format. Data is prepared and cleansed according to what the engine expects. This follows the Separation of Concerns concept.
- Algorithms: ML algorithms at your disposal to do what you need; ability to combine multiple algorithms.
- Serving layer: ability to serve results based on predictions, and add custom business logic to them.
- Evaluator layer: ability to evaluate the performance of the prediction to compare algorithms.

Of note, MLLib has made some improvements on the API lately to address some of the concerns (i.e. creating a ML pipeline).

In summary, believes the functions of an engine should be to:
-       Train deployable predictive model(s)
-       Respond to dynamic queries
-       Evaluate the algorithm being used

How to get started?

The best way is to start is to:
-       Download the code from github
-        Get one of the templates, everything you need will be laid out and set up already that way, and the template can be modified according to your needs.

The whole stack can be installed in one line of code. You can then start and deploy the event server, and update the engine model with new data.

Thursday, February 5, 2015

Navigating from Scala to Spark for distributed programming

In this post I will review Scala in a little more depth, and attempt to demonstrate why it is a natural fit to use with Spark in the context of distributed systems.
This post is derived from what I learned in particular in this video, as well different other places, and aims to capture and reconcile this knowledge.

Why is Scala a natural fit for distributed programming?

Scala offers concurrency programming, Java interoperability and functional programming out-of-the-box. In addition, the Collections API is a first-class citizen in Scala, with its comprehensive list of available data structures, ability to perform functional transformations, and immutability. Another perk vis-à-vis distributed programming is that Scala can essentially go from sequential, to parallel, to distributed programming in the same API.

Here is an example:

Example of sequential code (in Scala):
Scala> List(1,3,5,7).map(_ * 2)
Res0: List[Int] = List(2,6,10,14)

Map() (a pure Scala API) is inherently a parallizable operation. It divides and conquers the data into splits and apply the map algorithm on to them.

Example of parallel code (in Scala):
Scala> List(1,3,5,7) * 2)
Res0: scala.collection.parallel.immutable.ParSeq[Int]= ParVector(2,6,10,14)

Par() in Scala lets you parallelize your collection on your machine cores, essentially swapping out the sequential work to core-distributed processing on that collection.

Example of distributed code (in Spark).
Scala> * 2).take(100)

The code is essentially the same, but the collection is an RDD this time. An RDD can be obtained from applying parallelize() (which is a Spark API, different from par() ) onto a simple Scala collection, this time taking advantage of the distributed environment. See the section on Laziness for an example.
Also, an action (take() in our example) needs to be taken on the data in order to get results, due to Scala’s inherent laziness (more on this later).

In distributed mode, map() produces a new RDD from the result.
Map functions must be serializatble over the network.  Note that RDD is immutable, i.e. its state cannot be changed over time. It can be discarded if needed to free up some memory. Note that an RDD has the following properties: immutability, iterability, serializability, distributed-mode, and laziness.

Spark operations

Within an RDD:
Map, filter, groupBy, sample

Cross RDDs:
Join, Cartesian,  cogroup (similar to Pig’s CoGroup; essentially a groupBy+Join on 2 RDDs)

RDD actions:
Reduce, count, collect, take, foreach

RDD Optimizations:
Coalesce (similar to SQL’s operand), pipe, repartition

Laziness of Scala

Scala is inherently lazy; that is, the computation is not triggered until you ask for results. The key to laziness is that the tail of the data is not evaluated.
For example, a stream is a lazy list:

Scala> List(1,3,5,7).toStream
Res0: scala.collection.immutable.Stream[Int]= Stream(1,?)
Scala> * 2)
 Res1: scala.collection.immutable.Stream[Int]= Stream(2,?)
i.e. the tail is not evaluated until you ask for it (via for example toList ).

This is really useful in Spark. It allows it to do as little work as possible; less memory is utilized for intermediate results. Also, Spark can optimize the execution plan when all the transformations are known. E.g., all the map steps can be executed within the same phase, very much like Tez does in the M/R world.
As an aside, this is also a problem: if the computation runs as a highly optimized bundle, it does not make it easy to debug it.. Thankfully, some Spark profilers are starting to hit the market.

parallelize will convert its argument into an RDD. Example:

Scala> sparkContext. parallelize(1 to 10).map( x => x_ * i)
 Res6: MappedRDD[2]

i.e. you don’t get the result, just the mapped RDD to be evaluated. Each transformation is a wrapper RDD, and is a step within Spark’s lineage for recovery purposes.
As said earlier, an RDD action function such as count will force the computation.

Types of caching

Different configuration settings let you can serialize to memory (default mode) and/or disk, or a combination of the two.
On the other hand Tachyon, a memory-centric distributed file system,
 is an experimental configuration mode that works off-heap and is resilient to worker failures and is showing lots of promises.

Spark SQL boasts a different caching mechanism altogether, working as an efficient columnar compressed in memory cache (like dictionary compression, a la Parquet/ORC). It uses less memory than Java serialization, and is faster.

Also, Spark caching has a TTL that is configurable for data, after which old references are cleaned up.

Grouping and sorting

How to implement TopK  in pure Scala:

Scala> val words = Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c")
Words: Seq[String] = List(Apple, Bear, Tahoe, a, b, c, Apple, Apple, Bear, c, c)

Scala> val b = words.groupBy(x => x)
b: scala.collection.immutable.Map[String,Seq[String]] = Map(Bear -> List(Bear, Bear), a -> List(a), Apple -> List(Apple, Apple, Apple), b -> List(b), c -> List(c, c, c), Tahoe -> List(Tahoe))

Scala> val c ={ case (word, words) => (word, words.length) }
c: scala.collection.immutable.Map[String,Int] = Map(Bear -> 2, a -> 1, Apple -> 3, b -> 1, c -> 3, Tahoe -> 1)

Scala> val c1 = c.toSeq
c1: Seq[(String, Int)] = ArrayBuffer((Bear,2), (a,1), (Apple,3), (b,1), (c,3), (Tahoe,1))

scala> val d = c1.sortBy(_._2).reverse.take(2)
d: Seq[(String, Int)] = ArrayBuffer((c,3), (Apple,3))

I.e. starting from a sequence of words, first create a Map by way of a group by. Map words to their length, then sort them by length (argument #2 in the new Sequence) and reverse. Here we are taking the top 2.

In Spark – method 1

scala> val words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c") //sc refers to SparkContext
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val b =, 1)) //transforms each word into a tuple, K,V where K=word, V=1
b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]
scala> val c = b.groupByKey
c: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = MappedValuesRDD[5]

scala> val // counts and sorts
{ case (word, counts) => (words, counts.sum) }.
sortBy(_._2, false).take(2) //sortBy() is another network shuffle

First, parallelize the Seq into an RDD. Then transform each word into a (K,V) tuple where V=1. Group all instances of same word on the same node using groupByKey; network distributed shuffle happens. Then, using a Map, count and sort the entries.

It’s not “,1) “

scala> val b =,1)
<console>:14: error: missing parameter type for expanded function
((x$1) =>$1, 1))
val b =, 1)

But,1)), because that’s equivalent to x => (x,1). I.e.:

words: org.apache.spark.rdd.RDD[String] = "Apple", "Bear", "c", ParallelCollectionRDD[0] at parallelize at <console
scala> val b =, 1))
b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2]

In Spark – method 2

scala> val words = sc.parallelize(Seq(“Apple”, "Bear", "Tahoe", "a", "b", "c", "Apple", "Apple", "Bear", "c", "c") //sc refers to SparkContext
// parallelize() converts the Seq into an RDD
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> val b =,1)).reduceByKey(_ + _) //reduceByKey = groupByKey + local reduce
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14]

scala> val c = { case (word, count) => (count, word) }
c: org.apache.spark.rdd.RDD[(Int, String)] = MappedRDD[15]

scala> //top avoids the global sort by taking the top items from each node, and merging them at the driver.
res4: Array[(Int, String)] = Array((3,c), (3,Apple))

Similar, but more optimized, via the use of reduceByKey and top.

Perform some ETL

Let’s now review how to perform a simple ETL use case.

In Scala:

scala> io.Source.fromFile(/tmp/myfile.csv”)“,”)).
map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),
Iterator(String, String, String, String)
Read file, extract a few fields, filter by date

Parallel ETL in Scala:

scala> (1 to 4).par.flatMaps(a => io.Source.FromFile(“/tmp/myfile.csv”).getLines).
map(_.split(“,”)).map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),

// Breaks down the file into multiple chunks, converts this into a big stream, and then runs it in parallel
ParSeq(String, String, String, String)

Spark ETL

scala> sc.textFile((“/tmp/myfile.csv”).map(_.split(“,”)).map(a => (a(0),, a(2), a(5), a(6))),
filter(_._2 contains “2014”),
Code is the same! Except for loading the data initially.



This should provide a good explanation about why Scala and Spark are a good match. A nice tutorial is here: A good tutorial on Spark: Of note, a nice way to work with Spark, a la ipython in a notebook fashion, is to use this: