I recently worked on a project involving Apache Pig. I’ve
been using Hadoop for quite some time (~3 years), and this was the first time I
was actually delving into Pig. Here are some of my notes and comments about the
tool.
The Good
A tool for developers
Pig is really a great tool for developers; it is a scripting
language that lets you code your ETL pipeline as an abstraction layer above
Hadoop. It is better than Hive in the fact that you can debug your script
step-by-step, and are being helped by plug-ins such as PigPen. It is the scripting
language missing in Datameer, when the number of worksheets in your ETL becomes
unmanageable (IBM’s BigSheets at least translates your sheets into Pig).
Pig is truly integrated in the Hadoop ecosystem, i.e. I can
use the latest storage format such as Parquet,
can be coded in the Hue
environment, and is even ported into other frameworks like Spark (Spork).
An example of Productionizing Pig that is just a good
example of how integrated Pig is with the rest of the tools:
1. Create/Load your data in HDFS, i.e.:
1|2002|matt|lieber|..
and save it in say, /hdfs/data/mydata/mydata.txt (or move it from your local directory via hadoop fs -copyFromLocal). In this case this is pipe-separated data.
3. Run it: hive -f mytable.hql -hiveconf SCHEMANAME=myschema -hiveconf HDFSDIR='/hdfs/data/mydata';
4. You can now access the data in this Hive table via your Pig script, assuming HCatalog is installed (it is by default enabled in both CDH and HDP these days):
2. Create your Hive table script, in say mytable.hql:
drop table if exists ${hiveconf:SCHEMANAME}.mytable;
create external table ${hiveconf:SCHEMANAME}.table (
a_m string,
name_sk string,
...
spcl_pgm_cd string
) row format delimited fields terminated by '|' location '${hiveconf:HDFSDIR}';
3. Run it: hive -f mytable.hql -hiveconf SCHEMANAME=myschema -hiveconf HDFSDIR='/hdfs/data/mydata';
4. You can now access the data in this Hive table via your Pig script, assuming HCatalog is installed (it is by default enabled in both CDH and HDP these days):
pig -f myscript.pig -p SCHEMANAME=myschema -p PROCESS_DATE=20140416 -useHCatalog
Pig versions
I used Pig 0.12, which contains a lot of new built in
functions and operations, and I was able to completely avoid writing UDFs (which
is a dicey choice in my opinion: you don’t necessarily know how to optimize
these, and any changes in your business requirements needs a
recompilation/retesting of your Java/streaming code).
Obviously the latest version of Pig supports Hadoop 2.0.
Documentation is pretty good, as long as you get the right
version! A Google search doesn’t necessarily points you to the right version..
For reference 0.12 is here.
The Bad
Operations and Built-in functions
There are surprisingly few built-in common functions that
you truly need in Pig: 90% of the time for my ETL, I didn’t need any of the
fancy new operations; instead, I used FOREACH, condition statements (?), JOIN,
COGROUP, GROUP. That’s it.
Which is good and bad: it is simple to learn, but it can
get amazingly complex and cumbersome to use.
An example: my project had to do with performing ETL
across some tables from an old Teradata system and ingesting the result into
HDFS files instead.
One specific requirement had to do with performing a join
between 2 tables, deduplicating, then compare the column (c1)’s values between each other; if
equal, map column c2’s value to a new table, if not, map sum of c3’s column
values to that new table.
Here is the code in Pig:
-- Join A_SRVC and A_SRVC_FIN on A_SK, A_SRVC_SK, ID, MOD_ID, SRVC_LINE_NUM
join = JOIN srvc by (a_sk, a_srvc_sk, id, mod_id, srvc_line_num)
, srvc_fin by (a_sk, a_srvc_sk, id, mod_id, srvc_line_num);
join = JOIN srvc by (a_sk, a_srvc_sk, id, mod_id, srvc_line_num)
, srvc_fin by (a_sk, a_srvc_sk, id, mod_id, srvc_line_num);
-- If multiple records present, pick the one with the latest A_SRVC_FIN.SRC_UPDT_DTTM
uniq_ltst_clm_detail_srvc_fin_grp = GROUP join BY (srvc_fin::a_srvc_fin_sk,
srvc::a_sk, srvc::clm_id,
srvc::mod_id, _srvc::srvc_line_num);
derive_unique_fin_dedup = FOREACH uniq_ltst_detail_srvc_fin_grp {
ordered_data = ORDER join by srvc_fin::src_updt_dttm DESC;
limit_data = LIMIT ordered_data 1;
GENERATE FLATTEN(limit_data);
};
-- First Calculate cast to int to be able to perform the sum
uniq_ltst_clm_detail_srvc_fin_grp = GROUP join BY (srvc_fin::a_srvc_fin_sk,
srvc::a_sk, srvc::clm_id,
srvc::mod_id, _srvc::srvc_line_num);
derive_unique_fin_dedup = FOREACH uniq_ltst_detail_srvc_fin_grp {
ordered_data = ORDER join by srvc_fin::src_updt_dttm DESC;
limit_data = LIMIT ordered_data 1;
GENERATE FLATTEN(limit_data);
};
derive_unique_srvc_fin_dedup2 = FOREACH derive_unique_fin_dedup
GENERATE *, (int)srvc_fin::srvc_line_item_chrg_amt as Chrg_amt_int;
derive_unique_srvc_fin_dedup_grp = group derive_unique_srvc_fin_dedup2 BY (limit_data::srvc_fin::a_srvc_fin_sk,
limit_data::srvc::a_sk,
limit_data::srvc::id,
limit_data::srvc::mod_id,
limit_data::srvc::srvc_line_num);
-- Calculate the sum ahead of time
-- Calculate the sum ahead of time
Srvc_line_item_amt = FOREACH derive_unique_srvc_fin_dedup_grp
GENERATE FLATTEN($1),SUM(derive_unique_srvc_fin_dedup2.Chrg_amt_int) AS Chrg_amt;
-- Calculate whether or not we have all the values equal to each other, and flatten everything
-- Calculate whether or not we have all the values equal to each other, and flatten everything
Compare_Charge_amt_dst_grp = GROUP Srvc_line_item_amt BY (srvc_fin::a_srvc_fin_sk,
med_claim_srvc::a_med_clm_sk, srvc::clm_id,
srvc::mod_id, srvc::srvc_line_num);
Compare_Charge_amt = FOREACH Compare_Charge_amt_dst_grp
{
Compare_Charge_amt_dst = DISTINCT Srvc_line_item_amt.(derive_unique_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt);
GENERATE FLATTEN($1),
(COUNT(Compare_Charge_amt_dst)==1 ? 'y' : 'n') as Charge,
FLATTEN(Srvc_line_item_amt.(derive_unique_srvc_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt)), FLATTEN(Srvc_line_item_amt.Chrg_amt);
};
Compare_Charge_amt2 = GROUP Compare_Charge_amt ALL;
Compare_Charge_amt3 = FOREACH Compare_Charge_amt2
GENERATE FLATTEN($1);
-- Final calculation/output
-- Final calculation/output
Compare_Charge_amt4 = FOREACH Compare_Charge_amt3
GENERATE (Compare_Charge_amt.Charge=='y' ?
Compare_Charge_amt::null::derive_unique_srvc_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt :
(chararray)Compare_Charge_amt::null::Chrg_amt);
As you may see in the code, I needed to first calculate the SUM ahead of time, then calculate whether the values were equal to each other (via a Count==1 comparison, neat trick!), and FLATTEN everything prior, otherwise not everything would be at the same 'level': some data was in a bag, some were tuples, some were scalars. Then re-FLATTEN everything again, and finish the calculation. If there is a better/more elegant way to do this, I am all ears, but essentially my problem stems from the fact that after multiple groupings and joins, the data is highly dereferenced and is not easy to get to in Pig. FLATTEN does not take any arguments other than a straight bag/tuple, *not* an operation on them (i.e. cannot do FLATTEN(A.a == 3 ? 'y' : 'n'), which makes things difficult.
As you may see in the code, I needed to first calculate the SUM ahead of time, then calculate whether the values were equal to each other (via a Count==1 comparison, neat trick!), and FLATTEN everything prior, otherwise not everything would be at the same 'level': some data was in a bag, some were tuples, some were scalars. Then re-FLATTEN everything again, and finish the calculation. If there is a better/more elegant way to do this, I am all ears, but essentially my problem stems from the fact that after multiple groupings and joins, the data is highly dereferenced and is not easy to get to in Pig. FLATTEN does not take any arguments other than a straight bag/tuple, *not* an operation on them (i.e. cannot do FLATTEN(A.a == 3 ? 'y' : 'n'), which makes things difficult.
In any case, DESCRIBE is your friend to let you understand what sort of data structure you are ending up with.
The Ugly
The Ugly
Error messages
Error messages in Pig are in 50% of the cases, unfriendly and unhelpful. If it's beyond a syntax error or a wrong disembiguation problem, Pig will throw a cryptic message that just tells you: "something is wrong, figure it out".
Example of an error:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1200: Pig script failed to parse:
<line 37, column 32> expression is not a project expression: (Name: UserFunc(org.apache.pig.builtin.CONCAT) Type: null Uid: null)
That actually meant that my CONCAT function was not taking the right set of arguments ..
Example of an error:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1200: Pig script failed to parse:
<line 37, column 32> expression is not a project expression: (Name: UserFunc(org.apache.pig.builtin.CONCAT) Type: null Uid: null)
<line 37, column 32> expression is not a project expression: (Name: UserFunc(org.apache.pig.builtin.CONCAT) Type: null Uid: null)
That actually meant that my CONCAT function was not taking the right set of arguments ..
Dereferencing
It is mostly a pain to work with dereferenciation; see my previous code. Again, plugins in Eclipse will help, but still, it makes the code rather less readable. And consider this code:
Compare_Charge_amt_dst = DISTINCT Srvc_line_item_amt.(derive_unique_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt);
Here I am disambiguating my fields via '::', by way of using the tuple dereferencing operator '('. It took me *a whole day* to find out how to write this syntax..
Compare_Charge_amt_dst = DISTINCT Srvc_line_item_amt.(derive_unique_fin_dedup2::limit_data::srvc_fin::tot_chrg_amt);
Here I am disambiguating my fields via '::', by way of using the tuple dereferencing operator '('. It took me *a whole day* to find out how to write this syntax..
Pig Unit
Seriously, I have to go back to Java to implement my Pig
Unit tests ?
And surprise, Pig Unit doesn’t support Hcatalog .. Which
means that I have to load my complete schema “by hand”, all 12 tables with ~50
fields each.. Not great.
Error Handling
There is no good way to handle errors in Pig (other than by using the nice SPLIT operator based on conditions). I thought that Apache Falcon, the new data management solution would help, but it is only useful for general coordination of data pipelines (i.e. CRON-like workflow via Oozie) and lifecycle management. It cannot even take in error messages from Pig and create notifications via JMS at this point ! (May, 2014).
Conclusion
It has been a nice but steep road to Pig enlightenment; my ETL pipeline works, but I wonder how Cascading or similar packages might have been a more elegant solution. Don't get me wrong, Pig is a nice tool - but like anything,
Hi..could you point me on how to write pigunit for a script that uses Hcatalog for loading? Any examples that I can refer to?
ReplyDeleteAbhijit, last time i checked, you can't :-(
ReplyDeleteSo what i last did is, to load the data in Pig by entering the full path on HDFS. Unfortunately that is ugly, but it works ..
So lets say, I have :
ReplyDeleteA = LOAD '$DB_AND_TABLE' USING org.apache.hcatalog.pig.HCatLoader();
You changed that to what?
Something like this:
ReplyDeletea= LOAD '/path/to/File.txt' using PigStorage(',') as (c1:chararray,c2:chararray,c3:chararray);