Thursday, September 24, 2015

Apache Hive: the SQL Count of Monte Cristo


Apache Hive: the SQL Count of Monte Cristo


Project


            I have been working on an industrial-type Hive project at a major Fortune 50 company lately, and wanted to share my experience. In some ways, I believe my experience is representative of what is going on as far as the current Hadoop adoption is concerned: the environment I was in is nor a bleeding edge startup trying all possible new tools out there, nor the technology lagger typical from the Mid-west companies.

Technical environment


First, the state of the work environment I was in:
-       Hadoop environments: adopted a major Hadoop vendor. Typical Development / Production separated clusters, of several 100 nodes.
-       Hadoop 1.x installed ; however, not on the latest build: using Hive/Pig .13 on Map Reduce 1.0, i.e. no YARN. Also, no Spark in sight..
-       Heavy users of Hive and Pig, no custom Map Reduce. Java shop, with a little big of Python. Installed Datameer and Platfora, and evaluating other tools, like Alation. Not going to the cloud anytime soon, very concerned about data security.
-       Data analysts not very aware or curious of market tools. More about SQL than Map Reduce. So essentially the work was no longer about Big Data, but more about translating the requirements to technical specs in a correct manner; all optimization techniques and specifics about Hadoop being deferred to the Map Reduce platform (i.e. using default parameters in place).

Problems


I encountered a few technical issues, and wanted to note this, since it can be a common occurence.

I encountered an issue with a complex query involving a few joins, that was overwriting on top of itself (‘INSERT OVERWRITE TABLE A … SELECT * FROM A ..’). The issue I got was that there were an intermittent problem that sometimes gave me a cryptic error upon overwriting (the SELECT query part was running fine). Coincidently my colleague also encountered a similar problem with Pig, where the query refused to run to completion.
The solution to this was to save the result of the query in a temporary table/set of tuples, and then save it back to the actual table afterwards. I.e. :


-        -- to remedy a bug!
-        drop table mytable_temp;
-        create table IF NOT EXISTS mytable_temp (
-        .. – same schema as my actual table giving issues.
-        )
-         ROW FORMAT DELIMITED
-        FIELDS TERMINATED BY '\001'
-        ESCAPED BY '\n'
-        LINES TERMINATED BY '\n'
-        STORED AS TEXTFILE
-        ;

-- Now run the actual query , inserting into our temp table
-        insert into table card_member_temp
-        select  …
-         
-        -- to remedy the bug ..
-        insert overwrite table mytable
-        select * from mytable_temp;


Best Practices


Some best practices I learned from working with and churning a lot of code:

-       Try not to pass variables directly in the code, but rather upstream from your scheduler of choice:
-        <hdp:hive-server host="some-host" port="10001" properties-location="classpath:hive-dev.properties" configuration-ref="hadoopConfiguration">
-           someproperty=somevalue   hive.exec.scratchdir=/tmp/mydir
-        </hdp:hive-server>
-       Separate the Hive clauses (SELECT, FROM) from the variable names (typically on a different line) for readability ; same as in SQL
-       Ensure that after any manipulation of a column, you give it its name, that is qualify the column; i.e. below; you might get away with not doing it, but I encountered some cryptic issues in UNIONs because I hadn’t declared all variables.
-        COALESCE(n.vendor_info_id, f.OPEN_Customer) AS OPEN_Customer
-       Some say that they don’t “trust” Hives’ built-in functions, and would rather just declare Hive tables and work within Pig (via HCatalog) for the most part. Some other engineers do work in Hive QL, but for anything complicated, would rather go to UDF/UDTFs in a different language. I don’t really agree with these views, and was pleased to see that most if not all of my requirements could be done in pure Hive QL. Take a look at this for example:
-         
-        -- Lets only take 1 unique record for each account_id when there are multiple, we only need one .
-        -- we take whichever. 
-        insert into table memberdata
-        select T.cm_id, T.customer_id, T.record_id, T.account_id 
-        from
-        (
-        select 
-         row_number() over (partition by n.account_id order by n.record_id) as RANK,
-          reflect("java.util.UUID", "randomUUID") AS cm_id,
-          n.cus_id AS customer_id,
-          n.record_id AS record_id,
-          n.account_id as account_id
-        from datasource n
-        ) T
-        where T.rank = 1
-       ;The code above uses the Row_number function to essentially get a value out of many repeated ones in a particular column, retaining the rest of the data, by only getting the first value found (rank = 1). A unique id is then generated for the primary key for this record via the UUID Java code.
-       As a gotcha from the last tidbit of code : do not attempt to use the Hash algorithm (Hash() function) in Hive: it yields to collision very rapidly (after a few 100 rows).
-       Another example: example:
-        IF ( n2.final_spend is not null and opp.frequency is not null AND opp.average_transaction_size is null AND (vil.vendor is not null AND vil.stage <> 'SPEND FULFILLED'),
-           CASE opp.frequency WHEN 'ANNUAL'  THEN n2.final_spend / 1
-                          WHEN 'SEMI-ANNUAL' then n2.final_spend / 2
-                          WHEN 'QUARTERLY' THEN n2.final_spend / 4
-                          WHEN 'BI-MONTHLY' THEN n2.final_spend / 6
-                          WHEN 'MONTHLY' THEN n2.final_spend / 12
-                          WHEN 'BI-WEEKLY' THEN n2.final_spend /  26
-                          WHEN 'WEEKLY' THEN n2.final_spend /  52
-                          WHEN 'DAILY' THEN n2.final_spend /  365 END,
-            opp.average_transaction_size) as average_transaction_size
-       When joining on multiple tables at once, the order of the joins counts, as stipulated in the doc (“Joins are NOT commutative! “https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins ). So either the order has to follow LEFT JOINs first, followed by INNER JOINs, or as an alternative use subqueries:
-        Select ..
-        FROM
-        ( select
-        n.vendor_id, n.location_id, n.cm_id, inc.final_spend as final_spend, 
-          endor.endorsement_id as endorsement_id, inc.ap_file_id,
-         pers.person_id as person_id
-        from
-        myrecords inc
-        -- there should always be a match, so no OUTER JOIN
-        JOIN
-        (select o.cm_id, v.record_id, v.vendor_id, v.location_id from rim v
-        join members o
-        on v.record_id = o.record_id) n
-        ON inc.record_id = n.record_id
-        LEFT OUTER JOIN vendor_contact_person pers
-        ON
-        -- for checking if this vendor has an endorsement that exists, for 'top vendor flag'
-        LEFT OUTER JOIN
-        Endorsement endor
-        on (n.vendor_id = endor.vendor_id and n.location_id = endor.location_id and n.cm_id = endor.cm_id)
-        ) n2
-        FULL OUTER JOIN
-        ..
As you can see the FROM clauses queries against a table composed of different JOINs, called n2.

UDFs and UDTFs


            A coworker had to create a UDTF (UDF that generates a table) and chose Python to do so; I hadn’t realized that a UDF/UDTF can be written in pretty much any language now for Hive, as long as it uses the constraints of Hadoop Streaming – the program must essentially be able to be its own mapper or reducer in the pipeline, writing its output or accepting input within the Hadoop constraints. However be warned that performance won’t be as good as writing in Java.


Workflow for batch processing


            There are a few options for this, mentioned everywhere. On this project we used Spring Batch, which was just essentially a wrapper in xml for the scripts that we were writing, either in Pig, Hive or shell scripts (called scriptlets). It is of the form:
<hdp:hive-tasklet id="hive-script">
    <hdp:script>myHivescript.hql</hdp:script>
    <hdp:script location="classpath:org/company/hive/script.q" />
    <hdp:parameters> set hive.metastore.warehouse.dir=/opt/hive/warehouse;/warehouse;
    </hdp:parameters>
 </hdp:hive-tasklet>

What it essentially gives you is a way to control the flow of the pipeline. It has built-in retrial of a tasklet on error, but 99% of the time this is completely useless, as the cause of the error is because of the data or the script, not the infrastructure itself since Hadoop already reruns on a different node by default in case of failure. The same is to be said of  Hortonworks’ Oozie however. The other pain was to have to comment out tasklets (in the config file in xml) in case we wanted to test a subset of the tasklets, which is probably not the optimal way to do this.

Testing


            Which brings the conversation to testing: after a few projects in Big Data batch systems, I am still not aware of a good testing tool that alleviates some of the pain in order to do system testing of Hadoop pipelines.
From my experience, testing should be done 2-fold:
-       With a small subset of data, to test each script against the business rules, and ensure each column returns the appropriate value (functional testing).
-       With a larger data set, to test the counts of records, ensuring JOINS and FILTERs as well as OVERWRITEs (in Hive tables). Proper SELECT COUNT(primary_key) should be run on each table since no integrity constraint is typically set up in Hive or Pig.
-       Following this, results should be shared with a business analyst that should be the SME on the data domain.

Tools


            Typically on Big Data systems, the code has to be run and tested on the cluster itself, there is no way (or very limited, like for ensuring the syntax of a function, or when writing a UDF) to write code locally. The way we wrote code fell in two categories:

- Write code in Emacs/Vim! Directly on the cluster. This works relatively well, but lacks any of the capacities of an IDE to correct the syntax.
- Use of the Sublime Text editor, with Hive/Pig syntax checking, on local machines, with a plug-in to ftp the code directly to the cluster. This proved relatively elegant and worked pretty well once installed. The installation was not for the faint of heart however..
On a related note, in my numerous projects, I have unfortunately never seen anyone use GUIs like Hue for writing Big Data code. Which brings me to the fact that the data frame concept of Panda or Zeppelin is really useful!