Wednesday, January 14, 2015

How to load some Avro data into Spark


How to load some Avro data into Spark



First, why use Avro?


The most basic format would be CSV, which is non-expressive,  and doesn’t have a schema associated with the data.
A common format that got popular after this is XML, which conveniently has a schema associated with the data; XML is commonly used in Web Services and SOA architectures. Unfortunately it is very verbose, and parsing XML is very memory intensive. 
On the other end of the spectrum is JSON, which is very popular to use as it is convenient and easy to learn.
These formats are  not splittable in the context of Big data, which makes them difficult to use. Using a compression mechanism on top of it (Snappy, Gzip) does not solve the problem.
Hence different data formats have come out recently.
 Avro is widely used as a common serialization platform, as it interoperable across multiple languages, offers a compact and fast binary format, supports dynamic schema discovery  (via its generic type) and schema evolution, and is compressible and splittable. It also offers complex data structures like nested types.

Example code


Let’s walk through an example, creating an Avro schema with its IDL, and generating some data. In a real case example, organizations usually have some data in a more mundane format such as XML, and they will need to translate their data into Avro with tools like JAXB .  Let’s use this example , with this twitter.avsc schema:

{
   "type" : "record",
   "name" : "twitter_schema",
   "namespace" : "com.miguno.avro",
   "fields" : [
{     "name" : "username",
       "type" : "string",
      "doc"  : "Name of the user account on Twitter.com"   },
 {
     "name" : "tweet",
     "type" : "string",
     "doc"  : "The content of the user's Twitter message"   },
 {
     "name" : "timestamp",
     "type" : "long",
     "doc"  : "Unix epoch time in seconds"   } ],
   "doc:" : "A basic schema for storing Twitter messages" }

and some data in twitter.json:        

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }

We will convert the data (in Json) into binary Avro format.
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

We will then convert the Avro data into Java:

$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/


Let’s now compile these classes, and package them in a Jar:

$CLASSPATH=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $CLASSPATH /app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf Twitter.jar com/miguno/avro/*.class

We can now fire up Spark, passing in the Jar we just created as well as the needed libraries (Hadoop and Avro):

$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/Twitter.jar

In the REPL, let’s then retrieve our data and create an RDD from it, then retrieve an element of the data:
scala>
import com.miguno.avro.twitter_schema
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable


val path = "/app/avro/data/twitter.avro"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
avroRDD.map(l => new String(l._1.datum.g{ et("username").toString()) } ).first



This returns:

res2: String = miguno


A couple of notes:
-       We are using the MR1 classes, but the MR2 classes work the same (with a slightly different API.
-       We are using GenericRecord as opposed to Specific because we generated the Avro schema (and imported it). More on this at http://avro.apache.org/docs/current/gettingstartedjava.html
-       Note that even though the Avro classes were compiled in Java, you can import them in Spark since Scala works on the JVM.
-       Avro lets you define as an option a way to specify the type to deserialize to on a per element basis in the schema, via a key/value pair, which is convenient. See http://stackoverflow.com/questions/27827649/trying-to-deserialize-avro-in-spark-with-specific-type/27859980?noredirect=1#comment44240726_27859980
-       There are plenty of other ways to do this, one being with Kryo, an another one via Spark SQL. However this requires you to get a Spark SQL context (see https://github.com/databricks/spark-avro) , as opposed to a pure Spark/Scala approach. However this may be the best practice in the future?