Thursday, February 25, 2016

50 shades of Spark Streaming


           I want to share my recent experience with a Spark Streaming implementation. My (poor) attempt at a funny post title aims to convey that there are a lot of nuances in how to work with a streaming application that need to be thought through, and I want to describe this in my example.
My streaming use case is from a medical device company that produces implantable mobile cardiac telemetry wearables. These devices generate sensor data, which are collected, processed and stored. This data is continuously streaming and needs to be analyzed and stored in near real-time. During the screening and the monitoring period various body vitals, like heartbeat, are obtained for each patient being monitored. Average heartbeat value needs to be calculated for each patient for each stage (a fixed period of time).
The overall data needs to be appropriately persisted into a data store, but also coexisting with a front-end layer in order to give search and visualization access to business users. Also we need to use the proper Spark Streaming API to properly compute this. We will review these components one by one.

Storage component

Choice of architecture


         Firstly, an interesting tidbit of information is that the sensor data from patients is sent to a central NAS server. This data gets stored into files of the same fixed size on the server every hour. This is a legacy system that should probably be replaced by a distributed messaging system like Kafka in the future.
We first considered a batch script to process these data files from the NAS, running for example Hadoop. However in understanding more about the future considerations of this application with our customer, it appeared that the hourly frequency file rate was arbitrary, and as more patients would ramp up, the system would need to keep up with the data volume and increase the file generation frequency. So instead of a batch data processing system, we made the choice of using Spark Streaming for this architecture; I believe this is typical of the current trend that blend more and more the batch, or bounded data processing, with the unbounded data processing, essentially seeing the existing hourly batching requirements as a special case of a streaming architecture. So Spark Streaming was chosen instead to satisfy the requirements.
Spark makes it easy to get data files from a folder:


    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchSize))
    val lines = ssc.textFileStream("file://spark/test/data/")

Search layer


The next piece is to store the data in near real time. A search/indexing system is a good choice.
Elastic Search is a great component for this, and the integration point with Spark is well documented. So post Elastic Search configuration within Spark, our index-saving code looks like the following:

   trialRecordDStream.foreachRDD(lineRDD =>  
   {
        EsSpark.saveToEsWithMeta(lineRDD, "trials/trialdata")
   }
   )


Visualization


And finally we can put Kibana as a visualization layer on top of Elastic Search to represent the results. The overall architecture looks like the below.


Spark streaming specifics


         The next piece was to design various computations on the data to be calculated in near-real time, like maximum/minimum and mean for a given patient, on these biometrics. Maximum/Minimum is pretty trivial (you just need to keep the last seen biggest/smallest value), but calculating average requires an aggregation (you need to retain all of the values in order to compute, not just the last one).
This requires some thoughts as to how the data is streaming into your application: does the result of your calculation have to be bounded in a certain window of time and discarded afterwards? Or do you need to keep this calculation current at all times?
In our case, the use case implied the latter.
A first approach for calculating an aggregation would be to collect the values for a certain patient over time, store them all in to the storage layer, then read them back every time for aggregation. Unfortunately this approach is wrong in two ways: first, resources are wasted on sending the value to store then retrieve them back, second, this approach does not scale when the data is to big to recollect.

Instead, Spark Streaming has an API that offers a more elegant solution: updateStateByKey(), to calculate stateful aggregations. By applying the given function to the new value as well as the previous state for what you are calculating, the current calculation state is maintained. Also, the data from previous states is saved to disk for remediating potential failures, in a process called checkpointing. However, a number of downsides come with this: apparently every time this is called, the entire dataset is retained in memory, which leads to scalability issues.
So now a more scalable solution with Spark 1.6 is the new mapWithState() API, which only retains the delta/ net-new data, which is great. If you look at the example, you’ll also see that the new API also lets you add an initial state to the RDD you are processing, and have a mechanism for timeout.


In our case, we are calculating the average for each record, which is a data collection of what represents a unique record, in our case a combination of a patient id, and visit. Our value is a SummaryRecord, defined as below:


   case class SummaryRecord(private var _patientId: String, private var _visitName:String, private var _max:Int, private var _min:Int, private var _mean:Int)

Note that we store our aggregation values in this SummaryRecord, for each PatientId_VisitName which is our key. There are a number of different ways to design these aggregations, but this way we have one clean (K,V) pair at the end, that we can directly store into our datastore.
So, first, we group the records we want to aggregate together by our key:
var summaryRecordDStream: DStream[(String, Option[SummaryRecord])] = trialRecordDStream.map( trialRecordTuple =>
    {
      //Generate the summary records pair from study trial records dstream
      val trialRecord = trialRecordTuple._2;  
      val summaryRecord = new SummaryRecord(trialRecord.trialId, trialRecord.patientId, trialRecord.visitName, trialRecord.value  );
      val key:String = trialRecord.trialId + "_"+ trialRecord.patientId + "_" + trialRecord.visitName + "_" //+ trialRecord.device
      println("Summary Key: " + key)
          (key, Some(summaryRecord));
              
    }
      )
    //Group the summary dstream by key for aggregation
    val summaryGroupedDStream: DStream[(String,Iterable[Option[SummaryRecord]])] = summaryRecordDStream.groupByKey();
   

Then we call our mapWithState() function which calls in turn the groupSummaryRecords function:

    //Calling mapWithState to maintain the state of the summary object  - only from the stream
    var stateSpec = StateSpec.function(groupSummaryRecords2 _)
   
    var summaryRecordUpdatedDStream =     trialRecordDStream.mapWithState(stateSpec)
This returns a summaryRecord instance, where the average is calculated for its given key.
Below is the gist of the function implementation that we will pass to mapWithState().
The signature (this is Scala) looks a bit confusing at first; batchtime, the key/value pair (our key being one entity of a unique Patient), value as a Trial record for that patient, the state which is the updated state of our SummaryRecord over time, and the function returns an array of a pair of the same key associated with the new SummaryRecord for that key.
In the function, we essentially add the new value with the previous sum, augment the count, and store everything into our intermediary summary record class.
def groupSummaryRecords2(batchTime: Time, key: String, value1: Option[TrialRecord],  optionSummary:State[SummaryRecord]):Option[(String, SummaryRecord)] = {
          
               var min=Integer.MAX_VALUE;
               var max=0;
               var total=0;
        val sum = value1.map(_.value ) 
        val sum2: Option[Int] = optionSummary.getOption.map(_.sum)
        // just summing 2 option traits
        val sum3: Option[Int] = (sum :: sum2 :: Nil).flatten.reduceLeftOption(_ + _)
    
        if (optionSummary.exists)
        {
          if (value1.get.value > optionSummary.get.max)
            max = value1.get.value
          if (value1.get.value < optionSummary.get.min)
            min = value1.get.value           
         val intermediaryOutput = new SummaryRecord(value1.get.trialId,
                      value1.get.patientId ,
                      value1.get.visitName,
                      max,
                      min,
                      optionSummary.getOption.get.count+1,
                      sum3.getOrElse(0).asInstanceOf[Int])
        val output = (key, intermediaryOutput)
       
        optionSummary.update(intermediaryOutput)
        println("updated output: " + key.toString() + " : " + intermediaryOutput.patientId.toString() + intermediaryOutput.sum)
     Some(output)
        }
Notice that in this code, we are always recomputing the mean, for every new sequence of data that comes in.
Once we are done, we store the raw data (Trial data) and summary data into ES, and later on can visualize the results with Kibana.

This is just one example of a transformation that Spark Stream offers.  The complete code is at https://github.com/mlieber/sparkstreaming-es . There are a number of other helper functions that Spark Streaming has in store as part of its API.
As always, please let me know if any questions about this.