Wednesday, May 21, 2014

Apache Pig: the good, the bad, the ugly

I recently worked on a project involving Apache Pig. I’ve been using Hadoop for quite some time (~3 years), and this was the first time I was actually delving into Pig. Here are some of my notes and comments about the tool.

The Good

A tool for developers


Pig is really a great tool for developers; it is a scripting language that lets you code your ETL pipeline as an abstraction layer above Hadoop. It is better than Hive in the fact that you can debug your script step-by-step, and are being helped by plug-ins such as PigPen. It is the scripting language missing in Datameer, when the number of worksheets in your ETL becomes unmanageable (IBM’s BigSheets at least translates your sheets into Pig).
Pig is truly integrated in the Hadoop ecosystem, i.e. I can use the latest storage format such as Parquet, can be coded in the Hue environment, and is even ported into other frameworks like Spark (Spork).

An example of Productionizing Pig that is just a good example of how integrated Pig is with the rest of the tools:



1. Create/Load your data in HDFS, i.e.:

1|2002|matt|lieber|..
 and save it in say, /hdfs/data/mydata/mydata.txt (or move it from your local directory via hadoop fs -copyFromLocal). In this case this is pipe-separated data.

2. Create your Hive table script, in say mytable.hql:


drop table if exists ${hiveconf:SCHEMANAME}.mytable;

create external table ${hiveconf:SCHEMANAME}.table
(

a_m string,

name_sk string,

...
spcl_pgm_cd string

)
 row format delimited fields terminated by '|' location '${hiveconf:HDFSDIR}';

3.  Run it:  hive -f mytable.hql -hiveconf SCHEMANAME=myschema -hiveconf HDFSDIR='/hdfs/data/mydata'; 

4. You can now access the data in this Hive table via your Pig script, assuming HCatalog is installed (it is by default enabled in both CDH and HDP these days):


pig -f myscript.pig -p SCHEMANAME=myschema -p PROCESS_DATE=20140416 -useHCatalog

Pig versions



I used Pig 0.12, which contains a lot of new built in functions and operations, and I was able to completely avoid writing UDFs (which is a dicey choice in my opinion: you don’t necessarily know how to optimize these, and any changes in your business requirements needs a recompilation/retesting of your Java/streaming code).
Obviously the latest version of Pig supports Hadoop 2.0.
Documentation is pretty good, as long as you get the right version! A Google search doesn’t necessarily points you to the right version.. For reference 0.12 is here.



The Bad


Operations and Built-in functions


There are surprisingly few built-in common functions that you truly need in Pig: 90% of the time for my ETL, I didn’t need any of the fancy new operations; instead, I used FOREACH, condition statements (?), JOIN, COGROUP, GROUP. That’s it.
Which is good and bad: it is simple to learn, but it can get amazingly complex and cumbersome to use.
An example: my project had to do with performing ETL across some tables from an old Teradata system and ingesting the result into HDFS files instead.
One specific requirement had to do with performing a join between 2 tables, deduplicating, then compare the column (c1)’s values between each other; if equal, map column c2’s value to a new table, if not, map sum of c3’s column values to that new table.
Here is the code in Pig:

-- Join
-- Join A_SRVC and A_SRVC_FIN on A_SK, A_SRVC_SK, ID, MOD_ID, SRVC_LINE_NUM
join = JOIN srvc by (a_sk, a_srvc_sk,     id,  mod_id, srvc_line_num)
                     ,  srvc_fin by (a_sk, a_srvc_sk, id, mod_id, srvc_line_num);        

-- If multiple records present, pick the one with the latest A_SRVC_FIN.SRC_UPDT_DTTM
uniq_ltst_clm_detail_srvc_fin_grp = GROUP join BY (srvc_fin::a_srvc_fin_sk,
                              srvc::a_sk, srvc::clm_id,
         srvc::mod_id, _srvc::srvc_line_num);
derive_unique_fin_dedup = FOREACH uniq_ltst_detail_srvc_fin_grp {
                         ordered_data = ORDER join by srvc_fin::src_updt_dttm DESC;
                         limit_data = LIMIT ordered_data 1;
                         GENERATE FLATTEN(limit_data);
};
-- First Calculate cast to int to be able to perform the sum


derive_unique_srvc_fin_dedup2 = FOREACH derive_unique_fin_dedup 
 GENERATE *, (int)srvc_fin::srvc_line_item_chrg_amt as Chrg_amt_int;
 


derive_unique_srvc_fin_dedup_grp = group derive_unique_srvc_fin_dedup2 BY (limit_data::srvc_fin::a_srvc_fin_sk, 
 limit_data::srvc::a_sk,
 limit_data::srvc::id,
 limit_data::srvc::mod_id,
 limit_data::srvc::srvc_line_num);


-- Calculate the sum ahead of time

Srvc_line_item_amt = FOREACH derive_unique_srvc_fin_dedup_grp
 
GENERATE FLATTEN($1),SUM(derive_unique_srvc_fin_dedup2.Chrg_amt_int) AS Chrg_amt;


-- Calculate whether or not we have all the values equal to each other, and flatten everything 


Compare_Charge_amt_dst_grp = GROUP Srvc_line_item_amt BY (srvc_fin::a_srvc_fin_sk, 
 med_claim_srvc::a_med_clm_sk, srvc::clm_id, 
 srvc::mod_id, srvc::srvc_line_num);
Compare_Charge_amt = FOREACH Compare_Charge_amt_dst_grp

{
 
 Compare_Charge_amt_dst = DISTINCT Srvc_line_item_amt.(derive_unique_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt);
 
 GENERATE FLATTEN($1),
(COUNT(Compare_Charge_amt_dst)==1 ? 'y' : 'n') as Charge, 
 
 FLATTEN(Srvc_line_item_amt.(derive_unique_srvc_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt)), FLATTEN(Srvc_line_item_amt.Chrg_amt); 

};

Compare_Charge_amt2 = GROUP Compare_Charge_amt ALL;

Compare_Charge_amt3 = FOREACH Compare_Charge_amt2 
GENERATE FLATTEN($1);



-- Final calculation/output

Compare_Charge_amt4 = FOREACH Compare_Charge_amt3
 
 GENERATE (Compare_Charge_amt.Charge=='y' ? 
 Compare_Charge_amt::null::derive_unique_srvc_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt :
 (chararray)Compare_Charge_amt::null::Chrg_amt);


As you may see in the code, I needed to first calculate the SUM ahead of time, then calculate whether the values were equal to each other (via a Count==1 comparison, neat trick!), and FLATTEN everything prior, otherwise not everything would be at the same 'level': some data was in a bag, some were tuples, some were scalars. Then re-FLATTEN everything again, and finish the calculation. If there is a better/more elegant way to do this, I am all ears, but essentially my problem stems from the fact that after multiple groupings and joins, the data is highly dereferenced and is not easy to get to in Pig. FLATTEN does not take any arguments other than a straight bag/tuple, *not* an operation on them (i.e. cannot do FLATTEN(A.a == 3 ? 'y' : 'n'), which makes things difficult.
In any case, DESCRIBE is your friend to let you understand what sort of data structure you are ending up with.


The Ugly

Error messages

Error messages in Pig are in 50% of the cases, unfriendly and unhelpful. If it's beyond a syntax error or a wrong disembiguation problem, Pig will throw a cryptic message that just tells you: "something is wrong, figure it out".
Example of an error:

 ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1200: Pig script failed to parse:
<line 37, column 32> expression is not a project expression: (Name: UserFunc(org.apache.pig.builtin.CONCAT) Type: null Uid: null)

That actually meant that my CONCAT function was not taking the right set of arguments ..

 Dereferencing

It is mostly a pain to work with dereferenciation; see my previous code. Again, plugins in Eclipse will help, but still, it makes the code rather less readable. And consider this code:

Compare_Charge_amt_dst = DISTINCT Srvc_line_item_amt.(derive_unique_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt);


Here I am disambiguating my fields via '::', by way of using the tuple dereferencing operator '('. It took me *a whole day* to find out how to write this syntax..


Pig Unit

Seriously, I have to go back to Java to implement my Pig Unit tests ?
And surprise, Pig Unit doesn’t support Hcatalog .. Which means that I have to load my complete schema “by hand”, all 12 tables with ~50 fields each.. Not great.


Error Handling

There is no good way to handle errors in Pig (other than by using the nice SPLIT operator based on conditions). I thought that Apache Falcon, the new data management solution would help, but it is only useful for general coordination of data pipelines (i.e. CRON-like workflow via Oozie) and lifecycle management. It cannot even take in error  messages from Pig and create notifications via JMS at this point ! (May, 2014).


Conclusion

It has been a nice but steep road to Pig enlightenment; my ETL pipeline works, but I wonder how Cascading or similar packages might have been a more elegant solution. Don't get me wrong, Pig is a nice tool - but like anything, 

4 comments:

  1. Hi..could you point me on how to write pigunit for a script that uses Hcatalog for loading? Any examples that I can refer to?

    ReplyDelete
  2. Abhijit, last time i checked, you can't :-(
    So what i last did is, to load the data in Pig by entering the full path on HDFS. Unfortunately that is ugly, but it works ..

    ReplyDelete
  3. So lets say, I have :
    A = LOAD '$DB_AND_TABLE' USING org.apache.hcatalog.pig.HCatLoader();

    You changed that to what?

    ReplyDelete
  4. Something like this:
    a= LOAD '/path/to/File.txt' using PigStorage(',') as (c1:chararray,c2:chararray,c3:chararray);

    ReplyDelete

Note: Only a member of this blog may post a comment.