Monday, April 4, 2016

Review of the Strata San Jose 2016 conference in the context of IoT

IoT at Strata San Jose 2016

Introduction

I had a different focus this year in attending Strata San Jose - not following the usual Hadoop-related news or products, but instead focusing on the Internet of Things (IoT) and by extension, streaming technology. As such, it was pretty clear from the different talks and in talking to attendees that batch-oriented processing is becoming less of a focus and an artifact of history, as Tyler Akidau says.
I attended as many IoT-related talks that i could, and in capturing my findings below, I separated these in two parts: the business level/overview and key points that need to be architected in the IoT system, and the technical streaming frameworks of choice. I tried to blend the insights from the speakers together, and mention which talk they come from.

The talks

I attended first the talks by Dunning regarding messaging systems, as well as the No Lambda talk , as well as took a look at what Pivotal says about IoT .  I then attended Moty's talk from Intel, the talk about robots from Microsoft, then 
On Thursday I attended the second Intel talk , the talk from Ryft, Capital one's talk regarding their architecture which was unfortunately at the same time as Twitter talking about Heron and Google talking about Dataflow .. I then had to attend the talk (like a million other people) about "Streaming done right" by Flink !

The state of IoT

Everyone addresses the state of the Internet of Things market first, with the same figure: 50 billion devices projected to be connected in 2020. The usual explanations behind the growth in IoT are known: 
- Moore's law, making the chips smaller with more  components every year
- Price decrease of components
- Distributed systems that parallelize tasks across multiple machines.
- AI algorithms are entering a golden age.
- Data growth/explosion

The hype curve?

Is IoT all hype? Well, most of sensors (85%) are unconnected as of now, but 5.5MM devices will get connected daily in 2016..Also, the "data as the new oil" adage seems to continue to take hold: more and more data gets created, and its volume doubles every 2 years. So it seems like IoT is no hype. In addition, the use cases are many.

IoT systems definition

The uses cases are endless, with having in common: energy savings, security, monitoring, across all verticals.
Intel's definition of IoT is pretty fair: IoT use cases are usually time-series oriented, and are made up of 3 silos:
- The "things": end-points at the edge, like cars, wearables, gateway networks, sensors. This is where data acquisition happens.
- The network
- The cloud: where most of the analytics takes place. This is where real-time visualization, analysis, alerts take place.

The actual set up of IoT is in its process management workflow layer, where the rules are actually written. This is usually in the form of a SQL-like descriptive language for the most part.
The end goal of an IoT system is to create a predictive, and sometimes prescriptive platform.

The importance of edge nodes

Interestingly Intel says that some of the analytics in an IoT architecture needs to happen at the edges (Ryft said the same thing): "40% of IOT generated data will be stored, processed, analyzed & acted upon at the edge."
The new velocity of data needs to be supported by providing near real-time responsiveness between capturing the data and driving an action. Ryft, as an example, talked about brick-and-mortar retail stores that needs to correlate data about the shopper as fast as possible while he is in the store, in order to send him a relevant coupon.
Also, edge compute processing is required for handling intermittent connectivity from decentralized data capture. Edge compute must help filter signal from the noise in order to optimize bandwidth and responsiveness. Edge nodes should have the ability to receive updates from the cloud about what to filter out and changing definition of anomalies or data of interest.

Data Direction

The direction of the data generally needs to be bidirectional in an IoT system: from the edge nodes, to the centralized cloud for aggregation, and back out. Not all IoT frameworks offer this at the moment I believe.

Latency

Ryft emphasized the standard question of "what is the definition of near-real time?"; in other words, near-real time can mean a couple of ms to 5 minute response, according to the use case definition. Capital One mentioned that they had a throughput of about 2,000 events/second, which may sound small in the context of Big Data, but is not when you take into consideration that these events becomes features in a Machine learning model, that grow to 10M data dimensions to deal with. The Capital One team was looking for a latency of sub-40ms in their real-time fraud analysis use case.

The importance of Machine learning

Microsoft's talk was really about Machine learning. The analytic output is no longer business intelligence (BI) based aimed at human consumption, but increasingly machine learning for near real-time performance that optimizes the output of an ecosystem of smart devices. The new pipelines of data need to support predictive analytics and machine learning, with a flow back to the source devices, in order to optimize how an ecosystem of connected devices operates. The speaker approached how to choose the proper ML algorithm to choose with 4 questions:
1/ How much? I.e. "What will be the total of sales next week?" aka regression algorithm.                  
2/ Which category? I.e. "Is is a cat or a dog?" aka classification algorithm.
3/ Which groups? I.e. "Which shoppers have similar tastes?" aka clustering.
4/ Is it weird? I.e. "Is this pressure unusual?" aka  anomaly detection.
5/ Which action to take? I.e. "Should I brake or accelerate in response to that yellow light?" aka reinforcement learning.

Challenges:
- The algorithm assumes the world doesn't change.
- Sensors/actors could themselves change.
- Reinforcement algorithm doesn't handle keeping goals.
- Typically the algorithm takes a lot of time to learn.
- Also, it doesn't always scale.

The speaker postulated that this is a major problem in IoT: we have all the components like the actuators/sensors, but are still missing the central brain to make this work.

More generally, Machine Learning was a major addition in most IoT systems described in talks, allowing to reduce the need for manual rules, allowing advanced predicitve analysis. An interesting point was that ML allowed to pinpoint slow changes over time, as opposed to "simple" anomalies (mentioned by Intel). The way to do this being in looking at a combination (as opposed to a single) of sensors.

Systems evolution

The systems built for IoT usually increase in complexity as they become more mature: they go from being able to visualize what is happening with the connected objects, to simple alerting, to complex rules, to predictive and then prescriptive analytics. The best systems (a la Microsoft) expose a model-as-a-service, ready to deploy. Intel talked about a marketplace of components in their solution, acting as an analytics toolkit.

Deep dive into architectures

Ted Dunning emphasized that developers need a reliable way to move data as it is generated across different systems, one event at a time; this is generally done via Kafka. Kafka was shown present in almost all IoT architectures diagrams.

However the real-time processing frameworks differed widely across companies; Intel's Moty was a big proponent of Akka. for bidirectionnal communication between devices and the central processing, while Capital One's Ganelin preferred Apache Apex. He said they used some kind of "scientific method" approach to actually come to this conclusion, which is refreshing. Their criteria for a real-time processing platform were:
- Performance: under their specific conditions, they needed sub-40ms latency.
- Roadmap: they evaluated future roadmap of the product. For example, Databricks has said that Spark Streaming may consider non-micro batching in a future version..
- Community: community support had to be strong, development cannot happen in a vacuum.
- Enterprise readiness: the framework of choice had to support enterprise features, like security.

With that, Ganelin quickly listed what went wrong with frameworks other than Apex:
- Spark streaming is a non-starter as it uses micro-batching and thus is too slow.
- In Storm, lack of scalability (non elastic nodes), failure handling not well supported, at least-once processing guarantees, non dynamic topologies. Acknowledgements are sent from spout (source) to sink, which works well until a failure occur: in that case the rollback starts from the last good tuple, which delays the new data, and creates cascading failures. Twitter doesn't use Storm anymore, and created Heron (different talk at the same time at Strata :-( ). Also the community support seem to be waning, even though Hortonworks says they will support Storm.

- Flink: has added usability to Storm and Spark Streaming. Failure handling doesn't use acknowledgements, but checkpointing instead, which works better (like Spark Streaming), and keeps stateful snapshots. It also has exactly once processing guarantees. However it still lacks dynamic topologies, and is a young project with a lack of support at this point (April 2016).

- Apache Apex on the other hand, is a real-time computation system based on YARN. Operability is a first-class citizen, with Enterprise features.It supports dynamic topology modifications and deployments. It is the only project that has durable messaging queues between operators, checkpointed in memory as well as on disk. It supports dynamic scaling, where you choose between latency, throuput.

Intel developped a home-grown IoT system made out of open source components.
Interestingly Mody from Intel says, their customers pushed back on cloud-based systems and instead insisted on on -premise implementations. What made implementation a breeze was to take advantage of a modular architecture comprised of Docker and Core OS, which made the application very portable across customer IT infrastructure. These enabled the duplication of components in architecture: Mody called it a smart data pipe IoT platform..
Akka, mentioned earlier, was also a major benefit in their system, allowing for back pressure using reactive streams. Akka being the highly concurrent application framwork, for micro-service oriented architectures.
Intel having started over 2 years ago, they incorporated Spark Streaming only later, and implement their rules system, set up via a self-service UI.
Another talk refered to the SMACK architecture, aiming to replace Lambda architectures.

Downstream out-of-order processing is necessary if buffering prevents completely sequential delivery of sensor data. Hence the importance of frameworks like Flink.

Data format

JSON format has emerged as the preferred way to represent IoT data, as it's flexible for the variety of machine-generated information, and because  it comes with a description of its own structure. New data stores generally work well with JSON (HBase, Cassandra, Dynamo  DB, Mongo).

Security

New security model must enable authentication and authorization of devices and encryption, according to Intel. However Ryft says, security is almost always talked about but actually not really implemented in practice.


Anyway, thanks to the speakers for these enlightning talks!

Thursday, February 25, 2016

50 shades of Spark Streaming


           I want to share my recent experience with a Spark Streaming implementation. My (poor) attempt at a funny post title aims to convey that there are a lot of nuances in how to work with a streaming application that need to be thought through, and I want to describe this in my example.
My streaming use case is from a medical device company that produces implantable mobile cardiac telemetry wearables. These devices generate sensor data, which are collected, processed and stored. This data is continuously streaming and needs to be analyzed and stored in near real-time. During the screening and the monitoring period various body vitals, like heartbeat, are obtained for each patient being monitored. Average heartbeat value needs to be calculated for each patient for each stage (a fixed period of time).
The overall data needs to be appropriately persisted into a data store, but also coexisting with a front-end layer in order to give search and visualization access to business users. Also we need to use the proper Spark Streaming API to properly compute this. We will review these components one by one.

Storage component

Choice of architecture


         Firstly, an interesting tidbit of information is that the sensor data from patients is sent to a central NAS server. This data gets stored into files of the same fixed size on the server every hour. This is a legacy system that should probably be replaced by a distributed messaging system like Kafka in the future.
We first considered a batch script to process these data files from the NAS, running for example Hadoop. However in understanding more about the future considerations of this application with our customer, it appeared that the hourly frequency file rate was arbitrary, and as more patients would ramp up, the system would need to keep up with the data volume and increase the file generation frequency. So instead of a batch data processing system, we made the choice of using Spark Streaming for this architecture; I believe this is typical of the current trend that blend more and more the batch, or bounded data processing, with the unbounded data processing, essentially seeing the existing hourly batching requirements as a special case of a streaming architecture. So Spark Streaming was chosen instead to satisfy the requirements.
Spark makes it easy to get data files from a folder:


    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchSize))
    val lines = ssc.textFileStream("file://spark/test/data/")

Search layer


The next piece is to store the data in near real time. A search/indexing system is a good choice.
Elastic Search is a great component for this, and the integration point with Spark is well documented. So post Elastic Search configuration within Spark, our index-saving code looks like the following:

   trialRecordDStream.foreachRDD(lineRDD =>  
   {
        EsSpark.saveToEsWithMeta(lineRDD, "trials/trialdata")
   }
   )


Visualization


And finally we can put Kibana as a visualization layer on top of Elastic Search to represent the results. The overall architecture looks like the below.


Spark streaming specifics


         The next piece was to design various computations on the data to be calculated in near-real time, like maximum/minimum and mean for a given patient, on these biometrics. Maximum/Minimum is pretty trivial (you just need to keep the last seen biggest/smallest value), but calculating average requires an aggregation (you need to retain all of the values in order to compute, not just the last one).
This requires some thoughts as to how the data is streaming into your application: does the result of your calculation have to be bounded in a certain window of time and discarded afterwards? Or do you need to keep this calculation current at all times?
In our case, the use case implied the latter.
A first approach for calculating an aggregation would be to collect the values for a certain patient over time, store them all in to the storage layer, then read them back every time for aggregation. Unfortunately this approach is wrong in two ways: first, resources are wasted on sending the value to store then retrieve them back, second, this approach does not scale when the data is to big to recollect.

Instead, Spark Streaming has an API that offers a more elegant solution: updateStateByKey(), to calculate stateful aggregations. By applying the given function to the new value as well as the previous state for what you are calculating, the current calculation state is maintained. Also, the data from previous states is saved to disk for remediating potential failures, in a process called checkpointing. However, a number of downsides come with this: apparently every time this is called, the entire dataset is retained in memory, which leads to scalability issues.
So now a more scalable solution with Spark 1.6 is the new mapWithState() API, which only retains the delta/ net-new data, which is great. If you look at the example, you’ll also see that the new API also lets you add an initial state to the RDD you are processing, and have a mechanism for timeout.


In our case, we are calculating the average for each record, which is a data collection of what represents a unique record, in our case a combination of a patient id, and visit. Our value is a SummaryRecord, defined as below:


   case class SummaryRecord(private var _patientId: String, private var _visitName:String, private var _max:Int, private var _min:Int, private var _mean:Int)

Note that we store our aggregation values in this SummaryRecord, for each PatientId_VisitName which is our key. There are a number of different ways to design these aggregations, but this way we have one clean (K,V) pair at the end, that we can directly store into our datastore.
So, first, we group the records we want to aggregate together by our key:
var summaryRecordDStream: DStream[(String, Option[SummaryRecord])] = trialRecordDStream.map( trialRecordTuple =>
    {
      //Generate the summary records pair from study trial records dstream
      val trialRecord = trialRecordTuple._2;  
      val summaryRecord = new SummaryRecord(trialRecord.trialId, trialRecord.patientId, trialRecord.visitName, trialRecord.value  );
      val key:String = trialRecord.trialId + "_"+ trialRecord.patientId + "_" + trialRecord.visitName + "_" //+ trialRecord.device
      println("Summary Key: " + key)
          (key, Some(summaryRecord));
              
    }
      )
    //Group the summary dstream by key for aggregation
    val summaryGroupedDStream: DStream[(String,Iterable[Option[SummaryRecord]])] = summaryRecordDStream.groupByKey();
   

Then we call our mapWithState() function which calls in turn the groupSummaryRecords function:

    //Calling mapWithState to maintain the state of the summary object  - only from the stream
    var stateSpec = StateSpec.function(groupSummaryRecords2 _)
   
    var summaryRecordUpdatedDStream =     trialRecordDStream.mapWithState(stateSpec)
This returns a summaryRecord instance, where the average is calculated for its given key.
Below is the gist of the function implementation that we will pass to mapWithState().
The signature (this is Scala) looks a bit confusing at first; batchtime, the key/value pair (our key being one entity of a unique Patient), value as a Trial record for that patient, the state which is the updated state of our SummaryRecord over time, and the function returns an array of a pair of the same key associated with the new SummaryRecord for that key.
In the function, we essentially add the new value with the previous sum, augment the count, and store everything into our intermediary summary record class.
def groupSummaryRecords2(batchTime: Time, key: String, value1: Option[TrialRecord],  optionSummary:State[SummaryRecord]):Option[(String, SummaryRecord)] = {
          
               var min=Integer.MAX_VALUE;
               var max=0;
               var total=0;
        val sum = value1.map(_.value ) 
        val sum2: Option[Int] = optionSummary.getOption.map(_.sum)
        // just summing 2 option traits
        val sum3: Option[Int] = (sum :: sum2 :: Nil).flatten.reduceLeftOption(_ + _)
    
        if (optionSummary.exists)
        {
          if (value1.get.value > optionSummary.get.max)
            max = value1.get.value
          if (value1.get.value < optionSummary.get.min)
            min = value1.get.value           
         val intermediaryOutput = new SummaryRecord(value1.get.trialId,
                      value1.get.patientId ,
                      value1.get.visitName,
                      max,
                      min,
                      optionSummary.getOption.get.count+1,
                      sum3.getOrElse(0).asInstanceOf[Int])
        val output = (key, intermediaryOutput)
       
        optionSummary.update(intermediaryOutput)
        println("updated output: " + key.toString() + " : " + intermediaryOutput.patientId.toString() + intermediaryOutput.sum)
     Some(output)
        }
Notice that in this code, we are always recomputing the mean, for every new sequence of data that comes in.
Once we are done, we store the raw data (Trial data) and summary data into ES, and later on can visualize the results with Kibana.

This is just one example of a transformation that Spark Stream offers.  The complete code is at https://github.com/mlieber/sparkstreaming-es . There are a number of other helper functions that Spark Streaming has in store as part of its API.
As always, please let me know if any questions about this.