I wrote a spark streaming application in Java. It reads stock trades off of a
data feed receiver and converts them to Tick objects, and uses a microbatch
interval, window interval and sliding interval of 10 seconds. A
JavaPairDStream<String, Iterable<Tick>> is created where the key is the stock
The Tick objects are then stored in a JavaMapWithStateDStream using
mapWithState; analytics calculations are performed in the mapWithState callback
function using the Ticks as input. Everything works fine until I modified my
program to also call Rserve inside the mapWithState callback function in order
to perform additional analytics calculations in R.
When I started calling Rserve, every 10th window would take a long time to
process; this is the window that also writes to the checkpoint file (I am using
Hadoop). Every 10th window takes longer to process than the previous 10th
window (window 30 takes longer than window 20 which takes longer than window
10). All of the non-checkpoint windows finish well within 10 seconds, but the
checkpoint windows can eventually take minutes to complete, and the other
windows queue behind them.
I then tried to set the checkpoint interval on the JavaMapWithStateDStream to
24 hours in order to effectively disable checkpointing
(mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers
on the 3 server cluster with enough memory so that they would survive the
growing memory usage that would result.
The results that I outputted to the log were unexpected. Previously the
JavaPairDStream<String, Iterable<Tick>> was being populated with 5000 keys, and
it still was. But, previously 5000 keys were being passed to the mapWithState
callback function; now only 200 keys were being passed to it, and I see many
stages skipped in the Spark Streaming UI web page. When I run this in single
process mode on my MS Windows machine, 5000 keys are still passed to the
mapWithState callback function.
Does anyone have any idea of why calling Rserve would cause such a huge
increase in checkpointing time, or why calling
checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause
spark to not pass most of the tuples in the JavaPairDStream<String,
Iterable<Tick>> to the mapWithState callback function?
Question is also posted on