Wednesday, May 16, 2018

Quick overview of Kubernetes

These are the notes that i took while learning more about Kubernetes.

High-level overview

Most software today runs multiple processes, and is written across distributed systems, and keeping track of all this is a challenge.  At a high level, Kubernetes helps runs your software and processes on a cluster of computers, and runs them as one entity. Kubernetes manages the processes and ensures they stay running.
Kubernetes (K8s) is inspired by Google's Borg system, and originated there. Google had already built this infrastructure, and released K8s as an open-source project.


K8s runs Docker as the primary container format (among others, less popular ones). The container gives the developer a hermetically sealed container, i.e. a box for our processes. The context for these processes is always the same, which allows the package/container to be run on different machines and always give the same result.  K8s' role is to keep track of these processes, ensures they stay up, and helps them find each other.
K8s can be run on different environments: in any major cloud provider (GCP, Azure, AWS), but also on premise or in a hybrid environment, with consistency, as K8s is open source software. So in theory there is no vendor lock-in, and K8s workloads can be moved (gradually or not) from one provider to another.


The way to set up K8s is done in a declarative way, in a config file (i.e. version of the software to run, # of instances, desired state, etc). Dial or knob for the number of processes can be changed for scaling purposes, and is just a matter of changing the config file.


 Scheduling in a distributed environment is running copies of an instance consistently. Scheduling ensures loading the service on a machine that's not too busy, which as a metaphor equates to playing a multi-dimensional game of Tetris with resources: this create oddly shaped combinations of disk / CPU / disk requirements that are the nodes you run your software on, and that are set up for maximum efficiency.

Rolling updates can be managed this way: the developer can roll out a new definition of her software for a container, and slowly add the new definition/version of her app, while dialing down the older version to slowly replace the previous version. So atomic upgrades are possible, as well as rollbacks, in a seamless way.
Interestingly, prior existing data infrastructures like Hadoop that tried to do it all, including managing the infrastructure now go through Docker ,  running a dockerized application on Apache Hadoop YARN.

Service discovery

However K8s is more than a scheduler as it also performs service discovery. K8s intelligently routes to services, which are often tagged (i.e. #backend, #frontend, etc) services that you can target, which is a very powerful concept.
An example of this is a load balancer which is also managed by K8s. Usually static names for the different parts of your system are given, and thus can be handled easily.


From its inception, Docker encouraged the design of stateless services.
Persistence and statefulness are an afterthought in the world of containers. This design works in favor of workload scalability and portability.
It is one of the reasons why containers are fueling cloud-native architectures, microservices, and web-scale deployments.
So, given that either the host can abruptly terminate, or the container itself can fail, the state needs to be stored usually somewhere else via a networked volume independent of the host or the container.

A pod is the logical unit of Deployment in Kubernetes. 
A K8s volume is attached to the pod that encloses it. Data in the volume is preserved across container restarts. If the pod dies, the volume is gone. The K8s volume is a directory with some data that is accessible to all the containers of a pod.

Google Container Engine

GCE is the hosted version of K8s managed by Google.  Thus K8s is being upgraded, and the cluster handled for you, for example. What you get out of a hosted environment is:
- Dynamic creation and removal of machines
- APIs for controlling the cluster and the network.

Autoscaling is a major feature of GCP (and is also offered in other providers).

Monday, April 4, 2016

Review of the Strata San Jose 2016 conference in the context of IoT

IoT at Strata San Jose 2016


I had a different focus this year in attending Strata San Jose - not following the usual Hadoop-related news or products, but instead focusing on the Internet of Things (IoT) and by extension, streaming technology. As such, it was pretty clear from the different talks and in talking to attendees that batch-oriented processing is becoming less of a focus and an artifact of history, as Tyler Akidau says.
I attended as many IoT-related talks that i could, and in capturing my findings below, I separated these in two parts: the business level/overview and key points that need to be architected in the IoT system, and the technical streaming frameworks of choice. I tried to blend the insights from the speakers together, and mention which talk they come from.

The talks

I attended first the talks by Dunning regarding messaging systems, as well as the No Lambda talk , as well as took a look at what Pivotal says about IoT .  I then attended Moty's talk from Intel, the talk about robots from Microsoft, then 
On Thursday I attended the second Intel talk , the talk from Ryft, Capital one's talk regarding their architecture which was unfortunately at the same time as Twitter talking about Heron and Google talking about Dataflow .. I then had to attend the talk (like a million other people) about "Streaming done right" by Flink !

The state of IoT

Everyone addresses the state of the Internet of Things market first, with the same figure: 50 billion devices projected to be connected in 2020. The usual explanations behind the growth in IoT are known: 
- Moore's law, making the chips smaller with more  components every year
- Price decrease of components
- Distributed systems that parallelize tasks across multiple machines.
- AI algorithms are entering a golden age.
- Data growth/explosion

The hype curve?

Is IoT all hype? Well, most of sensors (85%) are unconnected as of now, but 5.5MM devices will get connected daily in 2016..Also, the "data as the new oil" adage seems to continue to take hold: more and more data gets created, and its volume doubles every 2 years. So it seems like IoT is no hype. In addition, the use cases are many.

IoT systems definition

The uses cases are endless, with having in common: energy savings, security, monitoring, across all verticals.
Intel's definition of IoT is pretty fair: IoT use cases are usually time-series oriented, and are made up of 3 silos:
- The "things": end-points at the edge, like cars, wearables, gateway networks, sensors. This is where data acquisition happens.
- The network
- The cloud: where most of the analytics takes place. This is where real-time visualization, analysis, alerts take place.

The actual set up of IoT is in its process management workflow layer, where the rules are actually written. This is usually in the form of a SQL-like descriptive language for the most part.
The end goal of an IoT system is to create a predictive, and sometimes prescriptive platform.

The importance of edge nodes

Interestingly Intel says that some of the analytics in an IoT architecture needs to happen at the edges (Ryft said the same thing): "40% of IOT generated data will be stored, processed, analyzed & acted upon at the edge."
The new velocity of data needs to be supported by providing near real-time responsiveness between capturing the data and driving an action. Ryft, as an example, talked about brick-and-mortar retail stores that needs to correlate data about the shopper as fast as possible while he is in the store, in order to send him a relevant coupon.
Also, edge compute processing is required for handling intermittent connectivity from decentralized data capture. Edge compute must help filter signal from the noise in order to optimize bandwidth and responsiveness. Edge nodes should have the ability to receive updates from the cloud about what to filter out and changing definition of anomalies or data of interest.

Data Direction

The direction of the data generally needs to be bidirectional in an IoT system: from the edge nodes, to the centralized cloud for aggregation, and back out. Not all IoT frameworks offer this at the moment I believe.


Ryft emphasized the standard question of "what is the definition of near-real time?"; in other words, near-real time can mean a couple of ms to 5 minute response, according to the use case definition. Capital One mentioned that they had a throughput of about 2,000 events/second, which may sound small in the context of Big Data, but is not when you take into consideration that these events becomes features in a Machine learning model, that grow to 10M data dimensions to deal with. The Capital One team was looking for a latency of sub-40ms in their real-time fraud analysis use case.

The importance of Machine learning

Microsoft's talk was really about Machine learning. The analytic output is no longer business intelligence (BI) based aimed at human consumption, but increasingly machine learning for near real-time performance that optimizes the output of an ecosystem of smart devices. The new pipelines of data need to support predictive analytics and machine learning, with a flow back to the source devices, in order to optimize how an ecosystem of connected devices operates. The speaker approached how to choose the proper ML algorithm to choose with 4 questions:
1/ How much? I.e. "What will be the total of sales next week?" aka regression algorithm.                  
2/ Which category? I.e. "Is is a cat or a dog?" aka classification algorithm.
3/ Which groups? I.e. "Which shoppers have similar tastes?" aka clustering.
4/ Is it weird? I.e. "Is this pressure unusual?" aka  anomaly detection.
5/ Which action to take? I.e. "Should I brake or accelerate in response to that yellow light?" aka reinforcement learning.

- The algorithm assumes the world doesn't change.
- Sensors/actors could themselves change.
- Reinforcement algorithm doesn't handle keeping goals.
- Typically the algorithm takes a lot of time to learn.
- Also, it doesn't always scale.

The speaker postulated that this is a major problem in IoT: we have all the components like the actuators/sensors, but are still missing the central brain to make this work.

More generally, Machine Learning was a major addition in most IoT systems described in talks, allowing to reduce the need for manual rules, allowing advanced predicitve analysis. An interesting point was that ML allowed to pinpoint slow changes over time, as opposed to "simple" anomalies (mentioned by Intel). The way to do this being in looking at a combination (as opposed to a single) of sensors.

Systems evolution

The systems built for IoT usually increase in complexity as they become more mature: they go from being able to visualize what is happening with the connected objects, to simple alerting, to complex rules, to predictive and then prescriptive analytics. The best systems (a la Microsoft) expose a model-as-a-service, ready to deploy. Intel talked about a marketplace of components in their solution, acting as an analytics toolkit.

Deep dive into architectures

Ted Dunning emphasized that developers need a reliable way to move data as it is generated across different systems, one event at a time; this is generally done via Kafka. Kafka was shown present in almost all IoT architectures diagrams.

However the real-time processing frameworks differed widely across companies; Intel's Moty was a big proponent of Akka. for bidirectionnal communication between devices and the central processing, while Capital One's Ganelin preferred Apache Apex. He said they used some kind of "scientific method" approach to actually come to this conclusion, which is refreshing. Their criteria for a real-time processing platform were:
- Performance: under their specific conditions, they needed sub-40ms latency.
- Roadmap: they evaluated future roadmap of the product. For example, Databricks has said that Spark Streaming may consider non-micro batching in a future version..
- Community: community support had to be strong, development cannot happen in a vacuum.
- Enterprise readiness: the framework of choice had to support enterprise features, like security.

With that, Ganelin quickly listed what went wrong with frameworks other than Apex:
- Spark streaming is a non-starter as it uses micro-batching and thus is too slow.
- In Storm, lack of scalability (non elastic nodes), failure handling not well supported, at least-once processing guarantees, non dynamic topologies. Acknowledgements are sent from spout (source) to sink, which works well until a failure occur: in that case the rollback starts from the last good tuple, which delays the new data, and creates cascading failures. Twitter doesn't use Storm anymore, and created Heron (different talk at the same time at Strata :-( ). Also the community support seem to be waning, even though Hortonworks says they will support Storm.

- Flink: has added usability to Storm and Spark Streaming. Failure handling doesn't use acknowledgements, but checkpointing instead, which works better (like Spark Streaming), and keeps stateful snapshots. It also has exactly once processing guarantees. However it still lacks dynamic topologies, and is a young project with a lack of support at this point (April 2016).

- Apache Apex on the other hand, is a real-time computation system based on YARN. Operability is a first-class citizen, with Enterprise features.It supports dynamic topology modifications and deployments. It is the only project that has durable messaging queues between operators, checkpointed in memory as well as on disk. It supports dynamic scaling, where you choose between latency, throuput.

Intel developped a home-grown IoT system made out of open source components.
Interestingly Mody from Intel says, their customers pushed back on cloud-based systems and instead insisted on on -premise implementations. What made implementation a breeze was to take advantage of a modular architecture comprised of Docker and Core OS, which made the application very portable across customer IT infrastructure. These enabled the duplication of components in architecture: Mody called it a smart data pipe IoT platform..
Akka, mentioned earlier, was also a major benefit in their system, allowing for back pressure using reactive streams. Akka being the highly concurrent application framwork, for micro-service oriented architectures.
Intel having started over 2 years ago, they incorporated Spark Streaming only later, and implement their rules system, set up via a self-service UI.
Another talk refered to the SMACK architecture, aiming to replace Lambda architectures.

Downstream out-of-order processing is necessary if buffering prevents completely sequential delivery of sensor data. Hence the importance of frameworks like Flink.

Data format

JSON format has emerged as the preferred way to represent IoT data, as it's flexible for the variety of machine-generated information, and because  it comes with a description of its own structure. New data stores generally work well with JSON (HBase, Cassandra, Dynamo  DB, Mongo).


New security model must enable authentication and authorization of devices and encryption, according to Intel. However Ryft says, security is almost always talked about but actually not really implemented in practice.

Anyway, thanks to the speakers for these enlightning talks!

Thursday, February 25, 2016

50 shades of Spark Streaming

           I want to share my recent experience with a Spark Streaming implementation. My (poor) attempt at a funny post title aims to convey that there are a lot of nuances in how to work with a streaming application that need to be thought through, and I want to describe this in my example.
My streaming use case is from a medical device company that produces implantable mobile cardiac telemetry wearables. These devices generate sensor data, which are collected, processed and stored. This data is continuously streaming and needs to be analyzed and stored in near real-time. During the screening and the monitoring period various body vitals, like heartbeat, are obtained for each patient being monitored. Average heartbeat value needs to be calculated for each patient for each stage (a fixed period of time).
The overall data needs to be appropriately persisted into a data store, but also coexisting with a front-end layer in order to give search and visualization access to business users. Also we need to use the proper Spark Streaming API to properly compute this. We will review these components one by one.

Storage component

Choice of architecture

         Firstly, an interesting tidbit of information is that the sensor data from patients is sent to a central NAS server. This data gets stored into files of the same fixed size on the server every hour. This is a legacy system that should probably be replaced by a distributed messaging system like Kafka in the future.
We first considered a batch script to process these data files from the NAS, running for example Hadoop. However in understanding more about the future considerations of this application with our customer, it appeared that the hourly frequency file rate was arbitrary, and as more patients would ramp up, the system would need to keep up with the data volume and increase the file generation frequency. So instead of a batch data processing system, we made the choice of using Spark Streaming for this architecture; I believe this is typical of the current trend that blend more and more the batch, or bounded data processing, with the unbounded data processing, essentially seeing the existing hourly batching requirements as a special case of a streaming architecture. So Spark Streaming was chosen instead to satisfy the requirements.
Spark makes it easy to get data files from a folder:

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchSize))
    val lines = ssc.textFileStream("file://spark/test/data/")

Search layer

The next piece is to store the data in near real time. A search/indexing system is a good choice.
Elastic Search is a great component for this, and the integration point with Spark is well documented. So post Elastic Search configuration within Spark, our index-saving code looks like the following:

   trialRecordDStream.foreachRDD(lineRDD =>  
        EsSpark.saveToEsWithMeta(lineRDD, "trials/trialdata")


And finally we can put Kibana as a visualization layer on top of Elastic Search to represent the results. The overall architecture looks like the below.

Spark streaming specifics

         The next piece was to design various computations on the data to be calculated in near-real time, like maximum/minimum and mean for a given patient, on these biometrics. Maximum/Minimum is pretty trivial (you just need to keep the last seen biggest/smallest value), but calculating average requires an aggregation (you need to retain all of the values in order to compute, not just the last one).
This requires some thoughts as to how the data is streaming into your application: does the result of your calculation have to be bounded in a certain window of time and discarded afterwards? Or do you need to keep this calculation current at all times?
In our case, the use case implied the latter.
A first approach for calculating an aggregation would be to collect the values for a certain patient over time, store them all in to the storage layer, then read them back every time for aggregation. Unfortunately this approach is wrong in two ways: first, resources are wasted on sending the value to store then retrieve them back, second, this approach does not scale when the data is to big to recollect.

Instead, Spark Streaming has an API that offers a more elegant solution: updateStateByKey(), to calculate stateful aggregations. By applying the given function to the new value as well as the previous state for what you are calculating, the current calculation state is maintained. Also, the data from previous states is saved to disk for remediating potential failures, in a process called checkpointing. However, a number of downsides come with this: apparently every time this is called, the entire dataset is retained in memory, which leads to scalability issues.
So now a more scalable solution with Spark 1.6 is the new mapWithState() API, which only retains the delta/ net-new data, which is great. If you look at the example, you’ll also see that the new API also lets you add an initial state to the RDD you are processing, and have a mechanism for timeout.

In our case, we are calculating the average for each record, which is a data collection of what represents a unique record, in our case a combination of a patient id, and visit. Our value is a SummaryRecord, defined as below:

   case class SummaryRecord(private var _patientId: String, private var _visitName:String, private var _max:Int, private var _min:Int, private var _mean:Int)

Note that we store our aggregation values in this SummaryRecord, for each PatientId_VisitName which is our key. There are a number of different ways to design these aggregations, but this way we have one clean (K,V) pair at the end, that we can directly store into our datastore.
So, first, we group the records we want to aggregate together by our key:
var summaryRecordDStream: DStream[(String, Option[SummaryRecord])] = trialRecordTuple =>
      //Generate the summary records pair from study trial records dstream
      val trialRecord = trialRecordTuple._2;  
      val summaryRecord = new SummaryRecord(trialRecord.trialId, trialRecord.patientId, trialRecord.visitName, trialRecord.value  );
      val key:String = trialRecord.trialId + "_"+ trialRecord.patientId + "_" + trialRecord.visitName + "_" //+ trialRecord.device
      println("Summary Key: " + key)
          (key, Some(summaryRecord));
    //Group the summary dstream by key for aggregation
    val summaryGroupedDStream: DStream[(String,Iterable[Option[SummaryRecord]])] = summaryRecordDStream.groupByKey();

Then we call our mapWithState() function which calls in turn the groupSummaryRecords function:

    //Calling mapWithState to maintain the state of the summary object  - only from the stream
    var stateSpec = StateSpec.function(groupSummaryRecords2 _)
    var summaryRecordUpdatedDStream =     trialRecordDStream.mapWithState(stateSpec)
This returns a summaryRecord instance, where the average is calculated for its given key.
Below is the gist of the function implementation that we will pass to mapWithState().
The signature (this is Scala) looks a bit confusing at first; batchtime, the key/value pair (our key being one entity of a unique Patient), value as a Trial record for that patient, the state which is the updated state of our SummaryRecord over time, and the function returns an array of a pair of the same key associated with the new SummaryRecord for that key.
In the function, we essentially add the new value with the previous sum, augment the count, and store everything into our intermediary summary record class.
def groupSummaryRecords2(batchTime: Time, key: String, value1: Option[TrialRecord],  optionSummary:State[SummaryRecord]):Option[(String, SummaryRecord)] = {
               var min=Integer.MAX_VALUE;
               var max=0;
               var total=0;
        val sum = ) 
        val sum2: Option[Int] =
        // just summing 2 option traits
        val sum3: Option[Int] = (sum :: sum2 :: Nil).flatten.reduceLeftOption(_ + _)
        if (optionSummary.exists)
          if (value1.get.value > optionSummary.get.max)
            max = value1.get.value
          if (value1.get.value < optionSummary.get.min)
            min = value1.get.value           
         val intermediaryOutput = new SummaryRecord(value1.get.trialId,
                      value1.get.patientId ,
        val output = (key, intermediaryOutput)
        println("updated output: " + key.toString() + " : " + intermediaryOutput.patientId.toString() + intermediaryOutput.sum)
Notice that in this code, we are always recomputing the mean, for every new sequence of data that comes in.
Once we are done, we store the raw data (Trial data) and summary data into ES, and later on can visualize the results with Kibana.

This is just one example of a transformation that Spark Stream offers.  The complete code is at . There are a number of other helper functions that Spark Streaming has in store as part of its API.
As always, please let me know if any questions about this.

Thursday, September 24, 2015

Apache Hive: the SQL Count of Monte Cristo

Apache Hive: the SQL Count of Monte Cristo


            I have been working on an industrial-type Hive project at a major Fortune 50 company lately, and wanted to share my experience. In some ways, I believe my experience is representative of what is going on as far as the current Hadoop adoption is concerned: the environment I was in is nor a bleeding edge startup trying all possible new tools out there, nor the technology lagger typical from the Mid-west companies.

Technical environment

First, the state of the work environment I was in:
-       Hadoop environments: adopted a major Hadoop vendor. Typical Development / Production separated clusters, of several 100 nodes.
-       Hadoop 1.x installed ; however, not on the latest build: using Hive/Pig .13 on Map Reduce 1.0, i.e. no YARN. Also, no Spark in sight..
-       Heavy users of Hive and Pig, no custom Map Reduce. Java shop, with a little big of Python. Installed Datameer and Platfora, and evaluating other tools, like Alation. Not going to the cloud anytime soon, very concerned about data security.
-       Data analysts not very aware or curious of market tools. More about SQL than Map Reduce. So essentially the work was no longer about Big Data, but more about translating the requirements to technical specs in a correct manner; all optimization techniques and specifics about Hadoop being deferred to the Map Reduce platform (i.e. using default parameters in place).


I encountered a few technical issues, and wanted to note this, since it can be a common occurence.

I encountered an issue with a complex query involving a few joins, that was overwriting on top of itself (‘INSERT OVERWRITE TABLE A … SELECT * FROM A ..’). The issue I got was that there were an intermittent problem that sometimes gave me a cryptic error upon overwriting (the SELECT query part was running fine). Coincidently my colleague also encountered a similar problem with Pig, where the query refused to run to completion.
The solution to this was to save the result of the query in a temporary table/set of tuples, and then save it back to the actual table afterwards. I.e. :

-        -- to remedy a bug!
-        drop table mytable_temp;
-        create table IF NOT EXISTS mytable_temp (
-        .. – same schema as my actual table giving issues.
-        )
-        FIELDS TERMINATED BY '\001'
-        ESCAPED BY '\n'
-        ;

-- Now run the actual query , inserting into our temp table
-        insert into table card_member_temp
-        select  …
-        -- to remedy the bug ..
-        insert overwrite table mytable
-        select * from mytable_temp;

Best Practices

Some best practices I learned from working with and churning a lot of code:

-       Try not to pass variables directly in the code, but rather upstream from your scheduler of choice:
-        <hdp:hive-server host="some-host" port="10001" properties-location="" configuration-ref="hadoopConfiguration">
-           someproperty=somevalue   hive.exec.scratchdir=/tmp/mydir
-        </hdp:hive-server>
-       Separate the Hive clauses (SELECT, FROM) from the variable names (typically on a different line) for readability ; same as in SQL
-       Ensure that after any manipulation of a column, you give it its name, that is qualify the column; i.e. below; you might get away with not doing it, but I encountered some cryptic issues in UNIONs because I hadn’t declared all variables.
-        COALESCE(n.vendor_info_id, f.OPEN_Customer) AS OPEN_Customer
-       Some say that they don’t “trust” Hives’ built-in functions, and would rather just declare Hive tables and work within Pig (via HCatalog) for the most part. Some other engineers do work in Hive QL, but for anything complicated, would rather go to UDF/UDTFs in a different language. I don’t really agree with these views, and was pleased to see that most if not all of my requirements could be done in pure Hive QL. Take a look at this for example:
-        -- Lets only take 1 unique record for each account_id when there are multiple, we only need one .
-        -- we take whichever. 
-        insert into table memberdata
-        select T.cm_id, T.customer_id, T.record_id, T.account_id 
-        from
-        (
-        select 
-         row_number() over (partition by n.account_id order by n.record_id) as RANK,
-          reflect("java.util.UUID", "randomUUID") AS cm_id,
-          n.cus_id AS customer_id,
-          n.record_id AS record_id,
-          n.account_id as account_id
-        from datasource n
-        ) T
-        where T.rank = 1
-       ;The code above uses the Row_number function to essentially get a value out of many repeated ones in a particular column, retaining the rest of the data, by only getting the first value found (rank = 1). A unique id is then generated for the primary key for this record via the UUID Java code.
-       As a gotcha from the last tidbit of code : do not attempt to use the Hash algorithm (Hash() function) in Hive: it yields to collision very rapidly (after a few 100 rows).
-       Another example: example:
-        IF ( n2.final_spend is not null and opp.frequency is not null AND opp.average_transaction_size is null AND (vil.vendor is not null AND vil.stage <> 'SPEND FULFILLED'),
-           CASE opp.frequency WHEN 'ANNUAL'  THEN n2.final_spend / 1
-                          WHEN 'SEMI-ANNUAL' then n2.final_spend / 2
-                          WHEN 'QUARTERLY' THEN n2.final_spend / 4
-                          WHEN 'BI-MONTHLY' THEN n2.final_spend / 6
-                          WHEN 'MONTHLY' THEN n2.final_spend / 12
-                          WHEN 'BI-WEEKLY' THEN n2.final_spend /  26
-                          WHEN 'WEEKLY' THEN n2.final_spend /  52
-                          WHEN 'DAILY' THEN n2.final_spend /  365 END,
-            opp.average_transaction_size) as average_transaction_size
-       When joining on multiple tables at once, the order of the joins counts, as stipulated in the doc (“Joins are NOT commutative! “ ). So either the order has to follow LEFT JOINs first, followed by INNER JOINs, or as an alternative use subqueries:
-        Select ..
-        FROM
-        ( select
-        n.vendor_id, n.location_id, n.cm_id, inc.final_spend as final_spend, 
-          endor.endorsement_id as endorsement_id, inc.ap_file_id,
-         pers.person_id as person_id
-        from
-        myrecords inc
-        -- there should always be a match, so no OUTER JOIN
-        JOIN
-        (select o.cm_id, v.record_id, v.vendor_id, v.location_id from rim v
-        join members o
-        on v.record_id = o.record_id) n
-        ON inc.record_id = n.record_id
-        LEFT OUTER JOIN vendor_contact_person pers
-        ON
-        -- for checking if this vendor has an endorsement that exists, for 'top vendor flag'
-        Endorsement endor
-        on (n.vendor_id = endor.vendor_id and n.location_id = endor.location_id and n.cm_id = endor.cm_id)
-        ) n2
-        ..
As you can see the FROM clauses queries against a table composed of different JOINs, called n2.

UDFs and UDTFs

            A coworker had to create a UDTF (UDF that generates a table) and chose Python to do so; I hadn’t realized that a UDF/UDTF can be written in pretty much any language now for Hive, as long as it uses the constraints of Hadoop Streaming – the program must essentially be able to be its own mapper or reducer in the pipeline, writing its output or accepting input within the Hadoop constraints. However be warned that performance won’t be as good as writing in Java.

Workflow for batch processing

            There are a few options for this, mentioned everywhere. On this project we used Spring Batch, which was just essentially a wrapper in xml for the scripts that we were writing, either in Pig, Hive or shell scripts (called scriptlets). It is of the form:
<hdp:hive-tasklet id="hive-script">
    <hdp:script location="classpath:org/company/hive/script.q" />
    <hdp:parameters> set hive.metastore.warehouse.dir=/opt/hive/warehouse;/warehouse;

What it essentially gives you is a way to control the flow of the pipeline. It has built-in retrial of a tasklet on error, but 99% of the time this is completely useless, as the cause of the error is because of the data or the script, not the infrastructure itself since Hadoop already reruns on a different node by default in case of failure. The same is to be said of  Hortonworks’ Oozie however. The other pain was to have to comment out tasklets (in the config file in xml) in case we wanted to test a subset of the tasklets, which is probably not the optimal way to do this.


            Which brings the conversation to testing: after a few projects in Big Data batch systems, I am still not aware of a good testing tool that alleviates some of the pain in order to do system testing of Hadoop pipelines.
From my experience, testing should be done 2-fold:
-       With a small subset of data, to test each script against the business rules, and ensure each column returns the appropriate value (functional testing).
-       With a larger data set, to test the counts of records, ensuring JOINS and FILTERs as well as OVERWRITEs (in Hive tables). Proper SELECT COUNT(primary_key) should be run on each table since no integrity constraint is typically set up in Hive or Pig.
-       Following this, results should be shared with a business analyst that should be the SME on the data domain.


            Typically on Big Data systems, the code has to be run and tested on the cluster itself, there is no way (or very limited, like for ensuring the syntax of a function, or when writing a UDF) to write code locally. The way we wrote code fell in two categories:

- Write code in Emacs/Vim! Directly on the cluster. This works relatively well, but lacks any of the capacities of an IDE to correct the syntax.
- Use of the Sublime Text editor, with Hive/Pig syntax checking, on local machines, with a plug-in to ftp the code directly to the cluster. This proved relatively elegant and worked pretty well once installed. The installation was not for the faint of heart however..
On a related note, in my numerous projects, I have unfortunately never seen anyone use GUIs like Hue for writing Big Data code. Which brings me to the fact that the data frame concept of Panda or Zeppelin is really useful!