I
want to share my recent experience with a Spark Streaming implementation. My
(poor) attempt at a funny post title aims to convey that there are a lot of
nuances in how to work with a streaming application that need to be thought
through, and I want to describe this in my example.
My streaming use case is from a medical device
company that produces implantable mobile cardiac telemetry wearables.
These devices generate sensor data, which are collected, processed and
stored. This data is continuously streaming and needs to be analyzed and stored
in near real-time. During
the screening and the monitoring period various body vitals, like heartbeat,
are obtained for each patient being monitored. Average heartbeat value needs to
be calculated for each patient for each stage (a fixed period of time).
The
overall data needs to be appropriately persisted into a data store, but also
coexisting with a front-end layer in order to give search and visualization
access to business users. Also we need to use the proper Spark Streaming API to
properly compute this. We will review these components one by one.
Storage component
Choice of architecture
Firstly, an interesting tidbit of
information is that the sensor data from patients is sent to a central NAS
server. This data gets stored into files of the same fixed size on the server
every hour. This is a legacy system that should probably be replaced by a
distributed messaging system like Kafka in the future.
We first
considered a batch script to process these data files from the NAS, running for
example Hadoop. However in understanding more about the future considerations
of this application with our customer, it appeared that the hourly frequency file
rate was arbitrary, and as more patients would ramp up, the system would need
to keep up with the data volume and increase the file generation frequency. So
instead of a batch data processing system, we made the choice of using Spark
Streaming for this architecture; I believe this is typical of the current trend that blend more and
more the batch, or
bounded data processing, with the unbounded data processing, essentially seeing
the existing hourly batching requirements as a special case of a streaming
architecture. So Spark Streaming was chosen instead to satisfy the
requirements.
Spark
makes it easy to get data files from a folder:
val sc = new SparkContext(sparkConf)
val ssc = new
StreamingContext(sc, Seconds(batchSize))
val lines =
ssc.textFileStream("file://spark/test/data/")
Search layer
The next
piece is to store the data in near real time. A search/indexing system is a
good choice.
Elastic
Search is a great component for this, and the integration point with Spark is well documented. So post Elastic Search configuration
within Spark, our index-saving code looks like the following:
trialRecordDStream.foreachRDD(lineRDD
=>
{
EsSpark.saveToEsWithMeta(lineRDD,
"trials/trialdata")
}
)
Visualization
And
finally we can put Kibana as a visualization layer on top of
Elastic Search to represent the results. The overall architecture looks like
the below.
Spark streaming specifics
The next piece was to design various
computations on the data to be calculated in near-real time, like
maximum/minimum and mean for a given patient, on these biometrics.
Maximum/Minimum is pretty trivial (you just need to keep the last seen
biggest/smallest value), but calculating average requires an aggregation (you
need to retain all of the values in order to compute, not just the last one).
This
requires some thoughts as to how the data is streaming into your application:
does the result of your calculation have to be bounded in a certain window of
time and discarded afterwards? Or do you need to keep this calculation current
at all times?
In our
case, the use case implied the latter.
A first
approach for calculating an aggregation would be to collect the values for a
certain patient over time, store them all in to the storage layer, then read
them back every time for aggregation. Unfortunately this approach is wrong in
two ways: first, resources are wasted on sending the value to store then
retrieve them back, second, this approach does not scale when the data is to
big to recollect.
Instead,
Spark Streaming has an API that offers a more elegant solution: updateStateByKey(), to calculate stateful aggregations.
By applying the given function to the new value as well as the previous state for
what you are calculating, the current calculation state is maintained. Also, the
data from previous states is saved to disk for remediating potential failures,
in a process called checkpointing. However, a number of downsides
come with this: apparently every time this is called, the entire dataset is
retained in memory, which leads to scalability issues.
So now a
more scalable solution with Spark 1.6 is the new mapWithState() API, which only
retains the delta/ net-new data, which is great. If you look at the
example, you’ll also see that the new API also lets you add an initial state
to the RDD you are processing, and have a mechanism for timeout.
In our
case, we are calculating the average for each record, which is a data
collection of what represents a unique record, in our case a combination of a
patient id, and visit. Our value is a SummaryRecord, defined as below:
case class
SummaryRecord(private var _patientId: String, private var _visitName:String,
private var _max:Int, private var _min:Int, private var _mean:Int)
Note that
we store our aggregation values in this SummaryRecord, for each
PatientId_VisitName which is our key. There are a number of different ways to
design these aggregations, but this way we have one clean (K,V) pair at the
end, that we can directly store into our datastore.
So, first,
we group the records we want to aggregate together by our key:
var summaryRecordDStream: DStream[(String, Option[SummaryRecord])]
= trialRecordDStream.map( trialRecordTuple =>
{
//Generate the summary
records pair from study trial records dstream
val trialRecord =
trialRecordTuple._2;
val summaryRecord =
new SummaryRecord(trialRecord.trialId, trialRecord.patientId,
trialRecord.visitName, trialRecord.value
);
val key:String =
trialRecord.trialId + "_"+ trialRecord.patientId + "_" +
trialRecord.visitName + "_" //+ trialRecord.device
println("Summary
Key: " + key)
(key, Some(summaryRecord));
}
)
//Group the summary
dstream by key for aggregation
val
summaryGroupedDStream: DStream[(String,Iterable[Option[SummaryRecord]])] =
summaryRecordDStream.groupByKey();
Then we
call our mapWithState() function which calls in turn the groupSummaryRecords
function:
//Calling mapWithState
to maintain the state of the summary object
- only from the stream
var stateSpec = StateSpec.function(groupSummaryRecords2
_)
var
summaryRecordUpdatedDStream =
trialRecordDStream.mapWithState(stateSpec)
This returns
a summaryRecord instance, where the average is calculated for its given key.
Below is
the gist of the function implementation that we will pass to mapWithState().
The
signature (this is Scala) looks a bit confusing at first; batchtime, the
key/value pair (our key being one entity of a unique Patient), value as a Trial
record for that patient, the state which is the updated state of our SummaryRecord
over time, and the function returns an array of a pair of the same key
associated with the new SummaryRecord for that key.
In the
function, we essentially add the new value with the previous sum, augment the
count, and store everything into our intermediary summary record class.
def groupSummaryRecords2(batchTime: Time, key: String, value1:
Option[TrialRecord],
optionSummary:State[SummaryRecord]):Option[(String, SummaryRecord)] = {
var
min=Integer.MAX_VALUE;
var max=0;
var total=0;
val sum =
value1.map(_.value )
val sum2:
Option[Int] = optionSummary.getOption.map(_.sum)
// just summing 2
option traits
val sum3:
Option[Int] = (sum :: sum2 :: Nil).flatten.reduceLeftOption(_ + _)
if (optionSummary.exists)
{
if
(value1.get.value > optionSummary.get.max)
max =
value1.get.value
if
(value1.get.value < optionSummary.get.min)
min =
value1.get.value
val
intermediaryOutput = new SummaryRecord(value1.get.trialId,
value1.get.patientId ,
value1.get.visitName,
max,
min,
optionSummary.getOption.get.count+1,
sum3.getOrElse(0).asInstanceOf[Int])
val output = (key,
intermediaryOutput)
optionSummary.update(intermediaryOutput)
println("updated output: " + key.toString() + " : "
+ intermediaryOutput.patientId.toString() + intermediaryOutput.sum)
Some(output)
}
Notice that
in this code, we are always recomputing the mean, for every new sequence of
data that comes in.
Once we are
done, we store the raw data (Trial data) and summary data into ES, and later on
can visualize the results with Kibana.
This is
just one example of a transformation that Spark Stream offers. The complete code is at https://github.com/mlieber/sparkstreaming-es
. There are a number of other helper functions that Spark Streaming has in
store as part of its API.
As always,
please let me know if any questions about this.