Monday, March 2, 2015

Converting Avro data to Parquet format in Hadoop

Update: this post is now part of the Cloudera blog, found at ow.ly/KAKmz

A customer of mine wants to take advantage of both worlds: work with his existing Apache Avro data, with all of the advantages that it confers, but take advantage of the predicate push-down features that Parquet provides. How to reconcile the two?
For more information about combining these formats, see this.

For a quick recap on Avro, see my previous post. While you are at it, see why Apache Avro is currently the gold standard in the industry.

What we are going to demonstrate here: how to take advantage of existing tools to convert our existing Avro format into Parquet, and make sure we can query that transformed data.

Parquet data


First let’s try to convert text data to Parquet, and read it back. Fortunately there is some code already from Cloudera to do this in Map Reduce
The code from Cloudera: https://github.com/cloudera/parquet-examples , and doc here lets you read and write Parquet data. Let’s try this.

First, let’s create some Parquet data as input. We will use Hive for this, by directly converting Text data into Parquet.

Parquet conversion


1. Let’s create a csv data example, and create a text table (here, just 2 columns of integers) in HDFS pointing to it:

create table mycsvtable (x int, y int)
row format delimited
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH '/home/cloudera/test/' OVERWRITE INTO TABLE mycsvtable;

2. Create a Parquet table in Hive, and convert the data to it:

create table myparquettable (a INT, b INT)
STORED AS PARQUET
LOCATION '/tmp/data';
insert overwrite table myparquettable select * from mycsvtable;


3.     You will need to add Hadoop and Parquet libraries relevant to the project in say, Eclipse for the code needed to be built; therefore, all of the links to the proper libs needed to be added. We then export the code as a JAR (File->Export as Running Jar) and run it outside of Eclipse (otherwise, some Hadoop security issues ensue that prevent you to run the code).


4.   Run the program (you could also run java instead of Hadoop if you copy the data from hdfs to local disk). The arguments are: inputData as Parquet / outputData as csv. We just want to ensure that we can read the Parquet data and display it.
$ sudo hadoop -jar ./testparquet.jar hdfs:///home/cloudera/test/data/000000_0 hdfs:///home/cloudera/test/dataparquet

 See result: (csv file):
$ more test/dataparquet2/part-m-00000
  1,2 3,4 5,6


Avro data conversion

Avro data example

Let’s get some Avro data example working, from this post.

Avro data generation


Interestingly Hive doesn’t let you load/convert csv data into Avro like we did in the Parquet example.  
Let’s walk through an example of creating an Avro schema with its IDL, and generating some data. Let’s use this example , with this twitter.avsc schema:

{
   "type" : "record",
   "name" : "twitter_schema",
   "namespace" : "com.miguno.avro",
   "fields" : [
{     "name" : "username",
       "type" : "string",
      "doc"  : "Name of the user account on Twitter.com"   },
 {
     "name" : "tweet",
     "type" : "string",
     "doc"  : "The content of the user's Twitter message"   },
 {
     "name" : "timestamp",
     "type" : "long",
     "doc"  : "Unix epoch time in seconds"   } ],
   "doc:" : "A basic schema for storing Twitter messages" }

and some data in twitter.json:        

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }

We will convert the data (in Json) into binary Avro format.
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

Transformation from Avro to Parquet storage format


So essentially use the best of both worlds: take advantage of the Avro object model and serialization format of Avro, and combine it with the columnar storage format of Parquet.
First we will reuse our Avro data that was created earlier.

1. We will then take advantage of this code: https://github.com/laserson/avro2parquet to convert the Avro data to Parquet data. This is a map-only job that simply sets up the right input and output format according to what we want.

2. After compilation, let’s run the script on our existing Avro data:
$  hadoop jar avro2parquet.jar hdfs:///user/cloudera/twitter.avsc  hdfs:///user/cloudera/inputdir hdfs:///user/cloudera/outputdir

We get:

$ hadoop fs -ls /user/cloudera/outputdir
Found 3 items
-rw-r--r-- 
/user/cloudera/outputdir2/_SUCCESS
-rw-r--r--   1 cloudera cloudera
/user/cloudera/outputdir2/_metadata
-rw-r--r--   1 cloudera cloudera
/user/cloudera/outputdir2/part-m-00000.snappy.parquet

Note that the Avro schema is converted directly to a Parquet-compatible format.

3. Now let’s test our result in Hive. We first create a Parquet table (note the simple syntax in Hive 0.14+), then point to the data we just created via a LOAD command, and finally query our converted data directly.

hive> create table tweets_parquet (username string, tweet string, timestamp bigint)                                                                               STORED AS PARQUET;
OK
Ø  load data inpath '/user/cloudera/outputdir/part-m-00000.snappy.parquet' overwrite into table tweets_parquet;
Ø  Loading data to table default.tweets_parquet
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/tweets_parquet/part-m-00000.snappy.parquet': User does not belong to hive
Table default.tweets_parquet stats: [numFiles=1, numRows=0, totalSize=1075, rawDataSize=0]
OK
Time taken: 6.712 seconds
hive
Ø   
Ø   select * from tweets_parquet;
Ø  OK
Ø   
miguno Rock: Nerf paper, scissors is fine.   1366150681
BlizzardCS  Works as intended.  Terran is IMBA.      1366154481
Time taken: 1.107 seconds, Fetched: 2 row(s)
Parquet with Avro



Let’s see verify our Parquet schema now that it is converted; note that the schema still refers to Avro:

Ø   
Ø  $ hadoop parquet.tools.Main schema outputdir/part-m-00000.snappy.parquet
Ø   
Ø  message com.miguno.avro.Tweet {
Ø    required binary username (UTF8);
Ø    required binary tweet (UTF8);
Ø    required int64 timestamp;
}
Ø 
Ø   
Ø   
Ø  $ hadoop parquet.tools.Main meta outputdir/part-m-00000.snappy.parquet
Ø  creator:     parquet-mr
extra:
Ø         avro.schema = {"type":"record","name":"Tweet","namespace"
Ø  file schema: com.miguno.avro.Tweet
Ø   
------------------------------------------------------
Ø  username:
Ø      REQUIRED BINARY O:UTF8 R:0 D:0
tweet:
Ø      REQUIRED BINARY O:UTF8 R:0 D:0
timestamp:
Ø      REQUIRED INT64 R:0 D:0

row group 1: RC:2 TS:297
Ø  ---------------------------------------------------------username:
Ø       BINARY SNAPPY DO:0 FPO:4 SZ:67/65/0.97 VC:2 ENC:PLAIN,BIT_PACKED
Ø  tweet:
Ø       BINARY SNAPPY DO:0 FPO:71 SZ:176/175/0.99 VC:2 ENC:PLAIN
Ø  timestamp:
Ø      INT64 SNAPPY DO:0 FPO:247 SZ:59/57/0.97 VC:2 ENC:PLAIN,BIT_PACKED
Ø   
Ø  $

That concludes our exercise! Let me know if additional questions.

0 comments:

Post a Comment

Note: Only a member of this blog may post a comment.