Tuesday, April 29, 2014

Monday, April 14, 2014

ETL with Hadoop



ETL with Hadoop

Here are a few pointers about how to do ETL with the common set of Hadoop tools.

Extraction / Ingestion

First, comes the ingestion of data from the various data sources. The data will be stored raw in HDFS, known as the Data Lake (Hortonworks) or the Enterprise Data Hub (Cloudera) and we will simply project/point a descriptive schema onto it (“schema on read” concept (Cloudera),  also called “Late binding” at Hortonworks). Caution: tools like Hive will simply bypass data that doesn’t match the schema, instead of warning you or stop in case of an error like a traditional RDBMS or ETL tool would.
Different tools come to mind for ingestion:
-       Use Flume (Flume NG) for event-driven data (i.e. web logs, say with a use case of a collection of web servers logs to be aggregated into HDFS for later analysis). Roughly equivalent to using Apache Kafka with Camus.
-       Use Sqoop for RDBMS data, generally via a JDBC connector. There are also special connectors for Teradata (with the FastExport utility) and mainframes.
-       Web HDFS:  creates REST-endpoint to move data into Hadoop. Typical use case involves ESB bus.

Load

A common problem with loading data from RDBMS’s has to do with full load vs. incremental loads.

The easiest case is when we can use a full data load. In the case of an incremental load however, there are 3 outstanding issues:


Seeding

-       The initial load, or seeding of the data. If the initial data volume is big, using Sqoop will overwhelm the RDMBS by opening too many connections to parallelize the data, especially if that database is servicing some application that is under some strict SLA’s. Instead it is better to take an initial data dump from the RDBMS and feed it into Hadoop into chunks.

Scheduling

-       When do you know when the scheduled batch is ready? Based on the delta changes that have happened during that day? Based on a certain time (e.g. “5am should be enough time for the data to be ready”)? Or, polling the data source? These are not ideal schemes. A better model is to use the event-driven mechanism built in to HCatalog to let you know when the data is ready. You can even chain processing events!

Appends

-       Appending data in HDFS. As live data gets updated, how to reflect this in HDFS which holds static data by definition? Firstly, Hive supports appends, via dynamic partitions: INSERT INTO will append to the table or partition keeping the existing data intact. If a partition column value is given, we call this a static partition; otherwise it is a dynamic partition, driven by the corresponding input column from the select statement (the value of the input column). Secondly, updates can be supported in Hive by utilizing HBase along with it: Periodic loads will come from HBase, continuously being updated; Hive queries will sit on top of them.  E.g., create your Hive table like this:
 create table ...
 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
 WITH SERDEPROPERTIES ("hbase.columns.mapping" = ....);
Using INSERT OVERWRITE statements will update the rows for a given row key (given that the row keys are unique).

Orchestration

Orchestration today is done typically with Oozie; but Oozie is very XML verbose; its replacement (mainly supported by Hortonworks) is Falcon.

Transform


Tool

Transformation could be done in Hive or Pig – Pig is a little more flexible and convenient to use in a data pipeline, as it is a procedural language, as opposed to Hive which is more declarative. So Pig will let you do checkpoints on the data, which is convenient in case of failures. And secondly it is a little more convenient to integrate standard code with Pig than it is with Hive.
Or course there are also tools like Cascading, or Scalding for programming language frameworks.


Data format

Sometimes it makes more sense to use a different format that the original one (usually Text/csv or Json, or XML). For example for XML, an Apache Avro container is typically used as it is schema rich, and supports schema evolution. And ORC and Parquet are the trend these days, as columnar storage is the most efficient in terms of storage and efficiency. From testing, Parquet seems a bit faster, but ORC has statistics in it, and they are both portable across tools via HCatalog, so portability shouldn’t be an issue. However Parquet was also designed from the ground up to be language-independent.