Monday, November 17, 2014

Productionalization of Hadoop : Data governance


Now that Hadoop has matured, it is natural to ask how it can grow from being a POC project to actually integrate it further in the enterprise. In this post we will review what it takes to “productionalize” Hadoop and the typical tasks involved in doing this.
Implementing a new product in the enterprise usually implicates adhering to the rules your IT team has laid out, often for regulatory compliance. Outside of the specific constraints of Hadoop, a non-comprehensive list of needs goes like this:

- Provides error handling and failure recovery.
- Has logging and monitoring.
- Data security (encryption, access authorization...).
- Deployment into Production vs. UAT environment or data folder structure and deployment automation.
- Provide easy access to data for production support for investigation purposes.
- Disaster recovery (DR).
- Master Data management: policies around data frequencies, source availability
-   Concepts of Data Quality: enforcement through metadata driven rules, hierarchies/attributes.
-   Have a testing and integration procedure cycle: from unit testing to user acceptance testing.
-   Multi-tenancy, with the assumption that the product of choice is shared across projects. Attention must be given to storage and processing capacity planning.
-   Business process integration: policies around data frequencies, source availability.
-   Lifecycle management: data retention, purge schedule, storage, archival.
-   Metadata: data definition, catalog, lineage.

Now let’s review this list in the context of Hadoop; actually a number of these items is answered by the underlying framework (regardless of the vendor). Other items will be managed by additional 3rd-party components. Some concepts are newer than others. Here is a list of these in the form of a table:

Error handling
Hadoop Core, Falcon
Logging and monitoring
Ambari, Cloudera Manager
Sentry, Kerberos, Knox, Ranger, Dataguise
Automated deployment
Ambari, Cloudera Manager
Disaster recovery
Cloudera Backup and Disaster Recovery, Falcon
Master data management
Pentaho Kettle, Talend
Data quality
Testing and integration
Apache MRUnit, PigUnit
Multi tenancy support
YARN, Hadoop schedulers
Data impact monitoring
Cloudera Navigator, Falcon, Ambari, Cloudera Manager
Infrastructure monitoring
Ganglia, Nagios, Ambari and Cloudera Manager.
Life cycle management
Falcon, Cloudera Navigator
Metadata management
HCatalog, Cloudera Navigator (search, classification), Falcon (tag search)
Tagging and search
Cloudera Navigator, Falcon

As you see, some components are common across vendors for the most part (like Security or Monitoring), and can be leveraged from the vendor’s UI offering (Cloudera Manager, Ambari). Some are offered by 3rd party vendors, like MDM and Data Quality. However a slew of these features is covered by a tool unique to the vendor (although sometimes offered as an open source product). In this post we will cover Apache Falcon as it relates to some unique aspects of productionalisation of the Hadoop cluster; unsurprisingly its concepts are somewhat also similar to what Cloudera Navigator offers.

Why Falcon?

After a POC, data has landed, scripts in Hive and Pig have been written, data has been joined between disparate systems, and is showing some aggregation in some BI tool. The pipeline of work manifests itself as a set of data flows.
However once this data pipeline needs to be put into Production, it generally needs to follow data governance requirements. This materializes generally in the form of some Oozie workflows, with in addition the need to orchestrate with other tools, like distcp or Sqoop.

Apache Falcon attempts to answer data governance requirements such as:
-       Data impact analysis: what happens if some data feed gets moved? What if someone changes these files, who is going to be impacted by this?
-       Monitoring: there is a need to monitor not just the infrastructure, but also the full data pipeline, as well as the ownership of it.
-       Late data handling: data never arrive perfectly on time, due to the variety of different sources; how to deal with this?
-       Replication, retention of data: there are generally different replication policies for the different data sets, i.e. raw data vs. cleansed data vs. production data.
-       Compliance for auditing

These typically complicate the simple data pipeline. These requirements cannot be simply translated into a simple one-size-fits-all template; there needs to be some higher-level tool to answer these questions. (Of note, Cloudera Navigator handles most of these requirements except late-data handling).

Falcon automatically generates Oozie workflows to answer these requirements, based on higher-level XML workflows. This dramatically simplifies the process of answering these requirements.


Falcon poses the concepts of:

-       A Cluster, which represents the interfaces to the JT, NN.
-       A Feed which is the data itself
-       A Process, which consume or produce feeds.

So essentially the user needs to define his/her entities via these concepts in XML, then manipulate them to create modular, reusable pipelines of data.
The higher-order XML tags in Falcon currently cover OOTB policies like replication, retention, late data handling configuration. However they are also extensible, like allowing solutions for encrypting, or general external transformations. Also it allows engines other than the OOTB ones, like Spring batch instead of Oozie, or Cascading instead of Hive/Pig.


Monitoring of the data pipeline can be run as a combination of Falcon and Ambari. Ambari’s UI lets you supervise the infrastructure, but also the Falcon alerts for pipeline starts/end or error, pipeline run history, and scheduling.

Tracing: Lineage, Tagging, Search, Auditing

Tracing in Falcon allows you to visualize the different Falcon components (feeds, etc) of the pipeline linked together. This addresses the case when you want to make some changes to a certain step and need to see an overview of these steps visually; in Falcon this is called Lineage information, represented visually, and stored internally in GraphDb. This answers the impact analysis requirement.

Tagging provides a key/value pair to feeds and processes. I.e.: Owner = X. Data source=DW. This is widely used to show ownership, business value, and the external source system or the destination.
This also allows you to perform search, for example to retrieve all of the feeds that are tagged “secure” and are owned by X.

Auditing simply refers to logging all changes to the pipeline, i.e. the action taken along with the user who took it.

Falcon user flow

In a nutshell, the process and user flow to work in Falcon goes:

1/ Define your pipeline definition; feeds, processes, etc. Define the pipelines and flows in XML.
2/ Submit these pipelines from the Falcon CLI. The Falcon server validates the specifications given.
3/ Launch and schedule; the Oozie workflows are generated
5/ Manage the workflow: at this point the user can check, suspend, resume the pipelines for updates.


The Falcon server is essentially a centralized orchestration framework. It saves the XML definitions and handle JMS notification mechanisms (via Active MQ) to subscribe and get notifications about the pipeline. Ambari manages all this.

Closing thoughts on Falcon

Apache Falcon (and on the Cloudera side, Navigator) aims to answer the requirements of data governance mandated by data stewards. The higher-level definition and language of Falcon in regard to its entities (processes, feed) is a step in the right direction. In practice, Falcon is still relatively immature at this point (Nov, 2014) regarding lineage (very limited UI, REST API hard to use), JMS notifications limited to completed events only, late-data handling API too coarse, and no Sqoop support, but I believe these limitations are only temporary.

Monday, November 10, 2014

An overview of Apache Spark

Overview of Apache Spark


            It is a well-known fact that Map Reduce is not good at multi-stage queries, and is a rather cumbersome solution for applications that are iterative, such as interactive queries, and some machine learning algorithms. So the motivation to create Spark was centered around these areas. Spark’s goal is two-fold:

-       Have primitives for an in-memory Map Reduce-like engine for iterative algorithm support.
-       Not be restricted to just Map Reduce, and replace that model with a DAG engine instead of discrete M/R steps. Spark provides a set of operators that are higher-level, expressive, and offer clean APIs.

The philosophy of Spark (read: business model?) is that unlike other projects in open source, Spark was built and marketed to get maximum exposure and be popularized as a useful and notable distributed framework in replacement of Map Reduce, as opposed to just being a university research project made open-source after the fact with no real support.
In light of this, Spark offers powerful standard libraries, a relational data model, and canned machine learning algorithms that we’ll review in more details. A definite advantage is also Spark’s unified support across many environments, is cloud-friendly, resource managers (YARN, Mesos, also offers its own). One of the biggest advantage that Spark has to offer is the fact that it offers a unified API and framework across data analysis contexts: online, offline, machine learning, graph.

Spark aims to be compatible with Hadoop’s Input and Output format API, meaning a user can plug in Spark onto its existing Hadoop cluster and data and can directly use Spark on top of it; Spark supports HDFS, but also HBase, S3, Cassandra, etc.

In terms of support, Spark releases are pretty regimented (every 3 months), and are defined by time, not by scope; again this is is to give predictability to users and a sense of stability to the project, as opposed to Hadoop in its earlier years, probably to also drive user adoption of Spark. Suffice is to say Apache Spark is the most active project in the Apache foundation as of Nov, 2014, and currently boasts exponential growth. And for good reason: Spark broke the performance record for sorting benchmark of 100TB in 23 mins, 200 machines (Oct, 2014), as a mostly network-bound job mostly; comparatively Hadoop doesn’t take advantage of the hardware resources as efficiently. It is worth noting that this was not using only memory (100TB cannot easily be stored in memory), but actually spilling to disk, which shows Spark is also performant using disk.

Map Reduce versus Spark

            In Map Reduce (MR), data is written back to HDFS between iterations. Bear in mind that MR was designed 10+ years ago when memory was expensive. Consider a set of Hive queries on the same data:

HDFS input
read -> query1
read -> query2

-       the ‘read’ portion is considerably slow in MR, due to 3 phases: the replication, the serialization, and the disk I/O. In fact on average 90% of the time is spent on these phases, instead of computing the actual algorithm from the query itself! The same principle applies to some machine learning algorithms, like gradient descent (which essentially has a for-loop to descent to the local minimum).

Besides the in-memory primitives and the new operators, Spark’s architecture is such that it is often optimized to avoid multiple passes on the data; so sometimes a Map+Reduce (Group by) + Reduce (Sort) is done in one pass in Spark and faster than MR, even without using the in-memory paradigm.

Advantages of Spark

-       In practice, it is said that Spark is 10x faster than Hadoop on average.
-       Agility compared to the monolithic aspect of Hadoop: Spark allows rapid changes, thanks to loading the data into memory and interacting with it in a rapid manner. The shell (REPL) is great to test things out.
-       Data scientists & non-data engineers can use Spark through Python.
-       Newer platform with multiple tools like Machine learning, Graph and streaming included, with strong community support.
-       Scala is superior for data processing thanks to its higher level of abstraction. Although Spark supports Java, it is recommended to use Scala in Spark as a non-functional programming will make coding less intuitive and lower level. Also using Scala will make debugging will be easier. A combination of using an IDE (for API autocompletion and data typing) and REPL (interactive shell) s actually best for efficiency.
-       Databricks makes cluster provisioning very easy.

However Spark is definitely less mature than Hadoop, is more bug-prone, and doesn’t have a good solution for managing and monitoring the system.

Spark architecture

Architecturally Spark is made up of 2 concepts:

-       Resilient Distributed Datasets (RDD), which is a collection of Java objects spread within the cluster,  as a distributed collection/dataset. This allows to directly have access to a higher-level API. A RDD is split into partitions. Each partition must fit on one node. A node hosts multiple partitions. An RDD is typed like a Scala collection (i.e. RDD[Int]); for example reading a text file/line input in Spark returns a RDD[String].
-       A DAG execution engine, as a Master-Slave architecture. The master is called the Driver, the slaves the Executors/Workers (2 processes). Executors are where the computation is run and data is cached. These run even when there is no running jobs. This avoids the JVM start time like in the case of MR’s task trackers. The cons is that you get a fixed number of Executors, remedied only if you use YARN’S Resource Manager to allocate resources dynamically. The Driver controls the program flow and executes the steps.

It should be noted that Spark has also an elastic scaling feature for ETL jobs , a la EMR, where you set your configuration to allow a minimum/maximum of executors instances.

Fault Tolerance

            Fault tolerance is provided by having the RDDs track the series of transformations making up the data processing, and recompute the lost data in case of failure, through the operation log called lineage. This is the 'resilient' part of the RDD acronym. It is worth noting that this lineage feature is nothing new: it actually comes from the M/R paradigm; i.e. if a map task fails, the whole Hadoop job doesn't fail but instead the task is recomputed on a different node. However whereas in M/R the computation is segregated by jobs  (i.e. where a data analysis is formed of multiple steps (filter, group, etc) that is each implemented by a M/R job), hence there is no resiliency built-in across jobs, in Spark the computation is much more fluid and resiliency works across the data analysis steps.
Also, the data is replicated into memory twice, in case of failure of an Executor. However if the replica is not fully done before the node fails, data may get lost.
Provisioning Spark on YARN will allow the Resource Manager to spin the Spark Master fail-over proxy (and the same applies in Mesos) and offer HA; another way is to set up Zookeeper for Spark’s Driver in standalone mode, to mitigate some of the problems of resiliency in case the Driver goes down.

Using memory

            Spark uses memory for fast computation. However if memory is unavailable, Spark will gracefully spill to disk. The strategy used for this is Least-Recently-Used: the dataset that has been less used will spill to disk.
Currently the Spark user has to specify what data set (once processed) has to be saved in memory. Automatic adaptive saving to memory is currently a subject of research and is not possible today.


            Security features like Kerberos can be set up on Spark as long as Spark is used in a YARN configuration.

Spark ecosystem

Spark essentially expended its ecosystem of tools to provide a one-stop shop for doing all kinds of analytics. Let’s review these components.

Spark SQL

Spark SQL is a complete rewrite (made faster) of what used to be Shark, a replacement of Hive, as a tool on top of Spark with relational semantics. Spark SQL is implemented to be compatible with Hive, and so existing Hive tables can still be used within Spark SQL.
Spark SQL does not cache Hive records as Java objects, which would incur too much overhead. Instead it uses column-oriented storage using primitive types (int, string, etc), similar to Parquet or ORC, with the same advantages of faster response time due to only scanning the needed columns, auto selection of best compression algorithm per column, etc.
The icing on the cake comes from the fact that Spark SQL code can be mixed up with “pure” Spark code, and one can be called from the other. Also the Spark/scala console makes this easy.
Spark SQL is compatible with Hive 0.13.

Spark Streaming

Spark was extended to perform stream computations. The way this works is, Spark Streaming runs as a series of small batch jobs during a period of (configurable) time, also called micro-batching. The state of the RDDs for these jobs is kept in memory.
Lowest latency in Spark Streaming is in order of seconds, not less; this accommodates 90% of streaming use cases usually. Storm on the other hand Storm can handle discrete events in a flow of steps, akin to a CEP system in terms of speed of processing.
Regarding fault-tolerance, Spark Streaming offers write-ahead log for full HA operation. Comparatively, in Storm, if the supervisor fails, the data gets reassigned and replayed, at the cost of having processing done twice.
Usually streaming frameworks are comprised of a pipeline of nodes; each node maintains a mutable state. However this state is lost if the node fails. So in Storm for example, each record in the worst case is processed at least once (could be multiple times). A remedy to this in Storm is to use the Trident API, which functions as micro-batching and offers transactions to update states. However this comes with a slower throughput.
Also Storm has a lower API than Spark Streaming does. And it has no built-in concept of look-back aggregation, nor a way to easily combine batch with streaming.

Technically Spark Streaming offers a new interface, DStream to deal with streaming, as well as a new operator to work on a timed-window: ‘reduceByWindow()’ for incremental computation, with arguments as the window length and the sliding interval. This can be run on a key basis also.
One of the advantages of Spark Streaming is code reuse, and intermixing of it with standard Spark code, i.e. the ability to mix batch (offline) with real-time (online) computing.
Some ML algorithms are also libraries, like K-means, to be available online.


Spark was extended to add a graph-processing library.