Tuesday, March 31, 2015

How is fault tolerance handled in Spark streaming? An overview

Trying to get my head around fault tolerance in Spark streaming, and in light of the recent changes made to it, below is my high level understanding of it, based on conversations with a colleague.

First, the basics:

Spark Streaming components

Data model

         All data is modeled as RDDs, built by design with lineage of deterministic operations, i.e. any re-computation always leads to the same result. Essentially the same process (however with a different mechanism) as in Hadoop's fault-tolerance for slave failures.

  • An RDD is an immutable, deterministically re-computable, distributed dataset in Spark.
  • DStream is an abstraction used in Spark streaming over RDDs, which is essentially a stream of RDDs. A lot of the same APIs apply over DStreams.

Types of nodes

  • Worker node: slave nodes, running the application code on the cluster
  • Driver node: main program of the application. Similar to Application master in the Hadoop YARN world, the Driver owns the Spark context, hence all the state of application.

Main components in a streaming application

  • Driver: akin to the master node in a Storm application from a conceptual point of view.
  • Receiver: the Receiver, living in a worker node, is similar to a spout in Apache Storm, and consumes the data from source; there are already built-in receivers OOTB for the common ones.
  • Executor: this processes the data; similar to a bolt in Apache Storm from a conceptual point of view.

Main steps in a Streaming application

         There are essentially three steps in a streaming application, so understanding the record processing guarantees (at least once, at most once or exactly-once semantics) at each step is essential:
1.     Receiving the streaming data
  • Depending on the kind of input source, at this step reliable vs. unreliable receivers are used; e.g. a stream from a file (local or Hdfs) is reliable, a Kafka stream is reliable, but data directly from a socket connection is unreliable. 
  • In Spark streaming when the data is received from any receiver, it is by default replicated (in memory) to two worker nodes, after which if the receiver was reliable, the acknowledgement is sent. In case of an unreliable receiver, the data is lost (i.e. at least once scenario).
  • In the event of failure of the Driver node, the Spark context is lost and hence all the past data. The initial remedy is a mechanism of a Spark WAL (write ahead logs), but the cleaner way, and if the data sender allows for it, is to simply re-use and consume their WAL instead.

2.    Transform the data
  • At this stage we have a guarantee of exactly once semantics due to the underlying RDD guarantees; i.e. in case of a worker node failure, the transformation gets computed on other node where the data is replicated.

3.    Output the transformed data
  • Output operations have at least once semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. Additional effort may be necessary to achieve exactly-once semantics. There are two approaches.
  • Idempotent updates: Multiple attempts always write the same data. 
  • Transactional updates: All updates are made atomically so that updates are made exactly once. 


Lets say there is a batch of events, and one of the operations is maintaining ‘global count’, such that it keeps a counter of total events streamed so far. Consider that when the batch of events is being processed, mid-way during the processing the node that was processing goes down. What happens now:

Is the global count reflecting the ‘half way events’ processed? 

If strictly speaking of global count, there is built-in global counter available in Spark which takes care of this problem. But as this is just an example and for all other situations except counter, the lineage of transformation applied on the whole batch of data will remedy this. As mentioned, RDD transformations are deterministically re-computable, which means the re-computation will give the same resultant state. However if the result also needs to be stored externally, that logic needs to be handled independently.

Monday, March 2, 2015

Converting Avro data to Parquet format in Hadoop

Update: this post is now part of the Cloudera blog, found at ow.ly/KAKmz

A customer of mine wants to take advantage of both worlds: work with his existing Apache Avro data, with all of the advantages that it confers, but take advantage of the predicate push-down features that Parquet provides. How to reconcile the two?
For more information about combining these formats, see this.

For a quick recap on Avro, see my previous post. While you are at it, see why Apache Avro is currently the gold standard in the industry.

What we are going to demonstrate here: how to take advantage of existing tools to convert our existing Avro format into Parquet, and make sure we can query that transformed data.

Parquet data

First let’s try to convert text data to Parquet, and read it back. Fortunately there is some code already from Cloudera to do this in Map Reduce
The code from Cloudera: https://github.com/cloudera/parquet-examples , and doc here lets you read and write Parquet data. Let’s try this.

First, let’s create some Parquet data as input. We will use Hive for this, by directly converting Text data into Parquet.

Parquet conversion

1. Let’s create a csv data example, and create a text table (here, just 2 columns of integers) in HDFS pointing to it:

create table mycsvtable (x int, y int)
row format delimited

LOAD DATA LOCAL INPATH '/home/cloudera/test/' OVERWRITE INTO TABLE mycsvtable;

2. Create a Parquet table in Hive, and convert the data to it:

create table myparquettable (a INT, b INT)
LOCATION '/tmp/data';
insert overwrite table myparquettable select * from mycsvtable;

3.     You will need to add Hadoop and Parquet libraries relevant to the project in say, Eclipse for the code needed to be built; therefore, all of the links to the proper libs needed to be added. We then export the code as a JAR (File->Export as Running Jar) and run it outside of Eclipse (otherwise, some Hadoop security issues ensue that prevent you to run the code).

4.   Run the program (you could also run java instead of Hadoop if you copy the data from hdfs to local disk). The arguments are: inputData as Parquet / outputData as csv. We just want to ensure that we can read the Parquet data and display it.
$ sudo hadoop -jar ./testparquet.jar hdfs:///home/cloudera/test/data/000000_0 hdfs:///home/cloudera/test/dataparquet

 See result: (csv file):
$ more test/dataparquet2/part-m-00000
  1,2 3,4 5,6

Avro data conversion

Avro data example

Let’s get some Avro data example working, from this post.

Avro data generation

Interestingly Hive doesn’t let you load/convert csv data into Avro like we did in the Parquet example.  
Let’s walk through an example of creating an Avro schema with its IDL, and generating some data. Let’s use this example , with this twitter.avsc schema:

   "type" : "record",
   "name" : "twitter_schema",
   "namespace" : "com.miguno.avro",
   "fields" : [
{     "name" : "username",
       "type" : "string",
      "doc"  : "Name of the user account on Twitter.com"   },
     "name" : "tweet",
     "type" : "string",
     "doc"  : "The content of the user's Twitter message"   },
     "name" : "timestamp",
     "type" : "long",
     "doc"  : "Unix epoch time in seconds"   } ],
   "doc:" : "A basic schema for storing Twitter messages" }

and some data in twitter.json:        

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }

We will convert the data (in Json) into binary Avro format.
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

Transformation from Avro to Parquet storage format

So essentially use the best of both worlds: take advantage of the Avro object model and serialization format of Avro, and combine it with the columnar storage format of Parquet.
First we will reuse our Avro data that was created earlier.

1. We will then take advantage of this code: https://github.com/laserson/avro2parquet to convert the Avro data to Parquet data. This is a map-only job that simply sets up the right input and output format according to what we want.

2. After compilation, let’s run the script on our existing Avro data:
$  hadoop jar avro2parquet.jar hdfs:///user/cloudera/twitter.avsc  hdfs:///user/cloudera/inputdir hdfs:///user/cloudera/outputdir

We get:

$ hadoop fs -ls /user/cloudera/outputdir
Found 3 items
-rw-r--r--   1 cloudera cloudera
-rw-r--r--   1 cloudera cloudera

Note that the Avro schema is converted directly to a Parquet-compatible format.

3. Now let’s test our result in Hive. We first create a Parquet table (note the simple syntax in Hive 0.14+), then point to the data we just created via a LOAD command, and finally query our converted data directly.

hive> create table tweets_parquet (username string, tweet string, timestamp bigint)                                                                               STORED AS PARQUET;
Ø  load data inpath '/user/cloudera/outputdir/part-m-00000.snappy.parquet' overwrite into table tweets_parquet;
Ø  Loading data to table default.tweets_parquet
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/tweets_parquet/part-m-00000.snappy.parquet': User does not belong to hive
Table default.tweets_parquet stats: [numFiles=1, numRows=0, totalSize=1075, rawDataSize=0]
Time taken: 6.712 seconds
Ø   select * from tweets_parquet;
miguno Rock: Nerf paper, scissors is fine.   1366150681
BlizzardCS  Works as intended.  Terran is IMBA.      1366154481
Time taken: 1.107 seconds, Fetched: 2 row(s)
Parquet with Avro

Let’s see verify our Parquet schema now that it is converted; note that the schema still refers to Avro:

Ø  $ hadoop parquet.tools.Main schema outputdir/part-m-00000.snappy.parquet
Ø  message com.miguno.avro.Tweet {
Ø    required binary username (UTF8);
Ø    required binary tweet (UTF8);
Ø    required int64 timestamp;
Ø  $ hadoop parquet.tools.Main meta outputdir/part-m-00000.snappy.parquet
Ø  creator:     parquet-mr
Ø         avro.schema = {"type":"record","name":"Tweet","namespace"
Ø  file schema: com.miguno.avro.Tweet
Ø  username:
Ø      REQUIRED INT64 R:0 D:0

row group 1: RC:2 TS:297
Ø  ---------------------------------------------------------username:
Ø  tweet:
Ø       BINARY SNAPPY DO:0 FPO:71 SZ:176/175/0.99 VC:2 ENC:PLAIN
Ø  timestamp:
Ø      INT64 SNAPPY DO:0 FPO:247 SZ:59/57/0.97 VC:2 ENC:PLAIN,BIT_PACKED
Ø  $

That concludes our exercise! Let me know if additional questions.