Apache Hive: the SQL Count of Monte Cristo
Project
I have been
working on an industrial-type Hive project at a major Fortune 50 company
lately, and wanted to share my experience. In some ways, I believe my
experience is representative of what is going on as far as the current Hadoop
adoption is concerned: the environment I was in is nor a bleeding edge startup
trying all possible new tools out there, nor the technology lagger typical from
the Mid-west companies.
Technical environment
First, the state of the work environment I was in:
-
Hadoop environments: adopted a major Hadoop
vendor. Typical Development / Production separated clusters, of several 100
nodes.
-
Hadoop 1.x installed ; however, not on the
latest build: using Hive/Pig .13 on Map Reduce 1.0, i.e. no YARN. Also, no
Spark in sight..
-
Heavy users of Hive and Pig, no custom Map
Reduce. Java shop, with a little big of Python. Installed Datameer and
Platfora, and evaluating other tools, like Alation. Not going to the cloud
anytime soon, very concerned about data security.
-
Data analysts not very aware or curious of
market tools. More about SQL than Map Reduce. So essentially the work was no
longer about Big Data, but more about translating the requirements to technical
specs in a correct manner; all optimization techniques and specifics about
Hadoop being deferred to the Map Reduce platform (i.e. using default parameters
in place).
Problems
I encountered a few technical issues, and wanted to note
this, since it can be a common occurence.
I encountered an issue with a complex query involving a
few joins, that was overwriting on top of itself (‘INSERT OVERWRITE TABLE A …
SELECT * FROM A ..’). The issue I got was that there were an intermittent
problem that sometimes gave me a cryptic error upon overwriting (the SELECT
query part was running fine). Coincidently my colleague also encountered a
similar problem with Pig, where the query refused to run to completion.
The solution to this was to save the result of the query in
a temporary table/set of tuples, and then save it back to the actual table
afterwards. I.e. :
-
-- to remedy a bug!
-
drop table mytable_temp;
-
create table IF NOT EXISTS
mytable_temp (
-
.. – same schema as my actual table
giving issues.
-
)
-
ROW FORMAT DELIMITED
-
FIELDS TERMINATED BY '\001'
-
ESCAPED BY '\n'
-
LINES TERMINATED BY '\n'
-
STORED AS TEXTFILE
-
;
-- Now run the actual query , inserting into our
temp table
-
insert into table card_member_temp
-
select …
-
-
-- to remedy the bug ..
-
insert overwrite table mytable
-
select * from mytable_temp;
Best Practices
Some best practices I learned from working with and churning
a lot of code:
-
Try not to pass variables directly in the code,
but rather upstream from your scheduler of choice:
-
<hdp:hive-server host="some-host" port="10001" properties-location="classpath:hive-dev.properties" configuration-ref="hadoopConfiguration">
-
someproperty=somevalue hive.exec.scratchdir=/tmp/mydir
-
</hdp:hive-server>
-
Separate the Hive clauses (SELECT, FROM) from
the variable names (typically on a different line) for readability ; same as in
SQL
-
Ensure that after any manipulation of a column,
you give it its name, that is qualify the column; i.e. below; you might get
away with not doing it, but I encountered some cryptic issues in UNIONs because
I hadn’t declared all variables.
-
COALESCE(n.vendor_info_id,
f.OPEN_Customer) AS OPEN_Customer
-
Some say that they don’t “trust” Hives’ built-in
functions, and would rather just declare Hive tables and work within Pig (via
HCatalog) for the most part. Some other engineers do work in Hive QL, but for
anything complicated, would rather go to UDF/UDTFs in a different language. I
don’t really agree with these views, and was pleased to see that most if not
all of my requirements could be done in pure Hive QL. Take a look at this for
example:
-
-
-- Lets only take 1 unique record
for each account_id when there are multiple, we only need one .
-
-- we take whichever.
-
insert into table memberdata
-
select T.cm_id, T.customer_id,
T.record_id, T.account_id
-
from
-
(
-
select
-
row_number() over (partition
by n.account_id order by n.record_id) as RANK,
-
reflect("java.util.UUID", "randomUUID") AS cm_id,
-
n.cus_id AS customer_id,
-
n.record_id AS record_id,
-
n.account_id as account_id
-
from datasource n
-
) T
-
where T.rank = 1
-
;The code above uses the
Row_number function to essentially get a value out of many repeated ones in a
particular column, retaining the rest of the data, by only getting the first
value found (rank = 1). A unique id is then generated for the primary key for
this record via the UUID Java code.
-
As a gotcha from the last tidbit of code : do
not attempt to use the Hash algorithm (Hash() function) in Hive: it yields to
collision very rapidly (after a few 100 rows).
-
Another example: example:
-
IF ( n2.final_spend is not null and
opp.frequency is not null AND opp.average_transaction_size is null AND
(vil.vendor is not null AND vil.stage <> 'SPEND FULFILLED'),
-
CASE opp.frequency
WHEN 'ANNUAL' THEN n2.final_spend / 1
-
WHEN 'SEMI-ANNUAL' then n2.final_spend / 2
-
WHEN 'QUARTERLY' THEN n2.final_spend / 4
-
WHEN 'BI-MONTHLY' THEN n2.final_spend / 6
-
WHEN 'MONTHLY' THEN n2.final_spend / 12
-
WHEN 'BI-WEEKLY' THEN n2.final_spend / 26
-
WHEN 'WEEKLY' THEN n2.final_spend / 52
-
WHEN 'DAILY' THEN n2.final_spend / 365 END,
-
opp.average_transaction_size) as average_transaction_size
-
When joining on multiple tables at once, the
order of the joins counts, as stipulated in the doc (“Joins are NOT commutative! “https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
). So either the order has to follow LEFT JOINs first, followed by INNER
JOINs, or as an alternative use subqueries:
-
Select ..
-
FROM
-
( select
-
n.vendor_id, n.location_id,
n.cm_id, inc.final_spend as final_spend,
-
endor.endorsement_id as
endorsement_id, inc.ap_file_id,
-
pers.person_id as person_id
-
from
-
myrecords inc
-
-- there should always be a match,
so no OUTER JOIN
-
JOIN
-
(select o.cm_id, v.record_id, v.vendor_id,
v.location_id from rim v
-
join members o
-
on v.record_id = o.record_id) n
-
ON inc.record_id = n.record_id
-
LEFT OUTER JOIN
vendor_contact_person pers
-
ON
-
-- for checking if this vendor has
an endorsement that exists, for 'top vendor flag'
-
LEFT OUTER JOIN
-
Endorsement endor
-
on (n.vendor_id = endor.vendor_id
and n.location_id = endor.location_id and n.cm_id = endor.cm_id)
-
) n2
-
FULL OUTER JOIN
-
..
As you can see the FROM clauses queries against a table
composed of different JOINs, called n2.
UDFs and UDTFs
A coworker
had to create a UDTF (UDF that generates a table) and chose Python to do so; I
hadn’t realized that a UDF/UDTF can be written in pretty much any language now
for Hive, as long as it uses the constraints of Hadoop Streaming – the program
must essentially be able to be its own mapper or reducer in the pipeline,
writing its output or accepting input within the Hadoop constraints. However be
warned that performance won’t be as good as writing in Java.
Workflow for batch processing
There are a
few options for this, mentioned everywhere. On this project we used Spring Batch,
which was just essentially a wrapper in xml for the scripts that we were
writing, either in Pig, Hive or shell scripts (called scriptlets). It is of the
form:
<hdp:hive-tasklet id="hive-script">
<hdp:script>myHivescript.hql</hdp:script>
<hdp:script location="classpath:org/company/hive/script.q" />
<hdp:parameters> set hive.metastore.warehouse.dir=/opt/hive/warehouse;/warehouse;
</hdp:parameters>
</hdp:hive-tasklet>
What it essentially gives you is a way to control the flow
of the pipeline. It has built-in retrial of a tasklet on error, but 99% of the
time this is completely useless, as the cause of the error is because of the
data or the script, not the infrastructure itself since Hadoop already reruns
on a different node by default in case of failure. The same is to be said
of Hortonworks’ Oozie however. The other
pain was to have to comment out tasklets (in the config file in xml) in case we
wanted to test a subset of the tasklets, which is probably not the optimal way
to do this.
Testing
Which
brings the conversation to testing: after a few projects in Big Data batch
systems, I am still not aware of a good testing tool that alleviates some of
the pain in order to do system testing of Hadoop pipelines.
From my experience, testing should be done 2-fold:
-
With a small subset of data, to test each script
against the business rules, and ensure each column returns the appropriate
value (functional testing).
-
With a larger data set, to test the counts of
records, ensuring JOINS and FILTERs as well as OVERWRITEs (in Hive tables).
Proper SELECT COUNT(primary_key) should be run on each table since no integrity
constraint is typically set up in Hive or Pig.
-
Following this, results should be shared with a
business analyst that should be the SME on the data domain.
Tools
Typically
on Big Data systems, the code has to be run and tested on the cluster itself,
there is no way (or very limited, like for ensuring the syntax of a function,
or when writing a UDF) to write code locally. The way we wrote code fell in two
categories:
- Write code in Emacs/Vim! Directly on the cluster. This
works relatively well, but lacks any of the capacities of an IDE to correct the
syntax.
- Use of the Sublime Text editor, with Hive/Pig syntax
checking, on local machines, with a plug-in to ftp the code directly to the
cluster. This proved relatively elegant and worked pretty well once installed.
The installation was not for the faint of heart however..
On a related note, in my numerous projects, I have
unfortunately never seen anyone use GUIs like Hue for writing Big Data code.
Which brings me to the fact that the data frame concept of Panda or Zeppelin is
really useful!
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.