How to load some Avro data into Spark
First, why use Avro?
The most basic format would be CSV, which is
non-expressive, and doesn’t have a
schema associated with the data.
A common format that got popular after this is XML, which
conveniently has a schema associated with the data; XML is commonly used in Web
Services and SOA architectures. Unfortunately it is very verbose, and parsing
XML is very memory intensive.
On the other end of the spectrum is JSON, which is very
popular to use as it is convenient and easy to learn.
These formats are not
splittable in the context of Big data, which makes them difficult to use. Using
a compression mechanism on top of it (Snappy, Gzip) does not solve the problem.
Hence different data formats have come out recently.
Avro is widely used
as a common serialization platform, as it interoperable across multiple
languages, offers a compact and fast binary format, supports dynamic schema
discovery (via its generic type) and
schema evolution, and is compressible and splittable. It also offers complex
data structures like nested types.
Example code
Let’s walk through an example, creating an Avro schema with
its IDL, and generating some data. In a real case example, organizations
usually have some data in a more mundane format such as XML, and they will need
to translate their data into Avro with tools like JAXB . Let’s use this
example , with this twitter.avsc schema:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.miguno.avro",
"fields" : [
{ "name" : "username",
"type" : "string",
"doc" : "Name of the user account
on Twitter.com" },
{
"name" : "tweet",
"type" : "string",
"doc" : "The content of the user's
Twitter message" },
{
"name" : "timestamp",
"type" : "long",
"doc" : "Unix epoch time in
seconds" } ],
"doc:" : "A basic schema for
storing Twitter messages" }
and some data in twitter.json:
{"username":"miguno","tweet":"Rock:
Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works
as intended. Terran is IMBA.","timestamp": 1366154481 }
We will convert the data (in Json) into binary Avro format.
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file
twitter.avsc twitter.json > twitter.avro
We will then convert the Avro data into Java:
$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema
/app/avro/data/twitter.avsc /app/avro/data/
Let’s now compile these classes, and package them in a Jar:
$CLASSPATH=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $CLASSPATH
/app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf Twitter.jar com/miguno/avro/*.class
We can now fire up Spark, passing in the Jar we just created
as well as the needed libraries (Hadoop and Avro):
$ ./bin/spark-shell
--jars
/app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/Twitter.jar
In the REPL, let’s then retrieve our data and create an RDD
from it, then retrieve an element of the data:
scala>
import
com.miguno.avro.twitter_schema
import
org.apache.avro.file.DataFileReader;
import
org.apache.avro.file.DataFileWriter;
import
org.apache.avro.io.DatumReader;
import
org.apache.avro.io.DatumWriter;
import
org.apache.avro.specific.SpecificDatumReader;
import
org.apache.avro.mapreduce.AvroKeyInputFormat
import
org.apache.avro.mapred.AvroKey
import
org.apache.hadoop.io.NullWritable
import
org.apache.avro.mapred.AvroInputFormat
import
org.apache.avro.mapred.AvroWrapper
import
org.apache.avro.generic.GenericRecord
import
org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import
org.apache.hadoop.io.NullWritable
val path =
"/app/avro/data/twitter.avro"
val avroRDD =
sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
AvroInputFormat[GenericRecord]](path)
avroRDD.map(l
=> new String(l._1.datum.g{ et("username").toString()) } ).first
This returns:
res2: String =
miguno
A couple of notes:
-
We are using the MR1 classes, but the MR2
classes work the same (with a slightly different API.
-
We are using GenericRecord as opposed to Specific
because we generated the Avro schema (and imported it). More on this at http://avro.apache.org/docs/current/gettingstartedjava.html
-
Note that even though the Avro classes were compiled
in Java, you can import them in Spark since Scala works on the JVM.
-
Avro lets you define as an option a way to
specify the type to deserialize to on a per element basis in the schema, via a
key/value pair, which is convenient. See http://stackoverflow.com/questions/27827649/trying-to-deserialize-avro-in-spark-with-specific-type/27859980?noredirect=1#comment44240726_27859980
-
There are plenty of other ways to do this, one
being with Kryo, an another one via Spark SQL. However this requires you to get
a Spark SQL context (see https://github.com/databricks/spark-avro) , as opposed
to a pure Spark/Scala approach. However this may be the best practice in the
future?
Could it be that there are a couple of typos?
ReplyDelete1st => instead of declaring CLASSPATH with $CLASSPATH, shouldnt it be just CLASSPATH=.....?
2nd => the last line of scala code doesn't work, it seems like there is a parenthesis out of place but my scala skills are extremelly poor :-)
yes Marc, you re probably right -
ReplyDeleteThanks for the comment!