I wrote a spark streaming application in Java. It reads stock trades off of a 
data feed receiver and converts them to Tick objects, and uses a microbatch 
interval, window interval and sliding interval of 10 seconds. A 
JavaPairDStream<String, Iterable<Tick>> is created where the key is the stock 

The Tick objects are then stored in a JavaMapWithStateDStream using 
mapWithState; analytics calculations are performed in the mapWithState callback 
function using the Ticks as input. Everything works fine until I modified my 
program to also call Rserve inside the mapWithState callback function in order 
to perform additional analytics calculations in R.

When I started calling Rserve, every 10th window would take a long time to 
process; this is the window that also writes to the checkpoint file (I am using 
Hadoop). Every 10th window takes longer to process than the previous 10th 
window (window 30 takes longer than window 20 which takes longer than window 
10). All of the non-checkpoint windows finish well within 10 seconds, but the 
checkpoint windows can eventually take minutes to complete, and the other 
windows queue behind them.

I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 
24 hours in order to effectively disable checkpointing 
(mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers 
on the 3 server cluster with enough memory so that they would survive the 
growing memory usage that would result.

The results that I outputted to the log were unexpected. Previously the 
JavaPairDStream<String, Iterable<Tick>> was being populated with 5000 keys, and 
it still was. But, previously 5000 keys were being passed to the mapWithState 
callback function; now only 200 keys were being passed to it, and I see many 
stages skipped in the Spark Streaming UI web page. When I run this in single 
process mode on my MS Windows machine, 5000 keys are still passed to the 
mapWithState callback function.

Does anyone have any idea of why calling Rserve would cause such a huge 
increase in checkpointing time, or why calling 
checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause 
spark to not pass most of the tuples in the JavaPairDStream<String, 
Iterable<Tick>> to the mapWithState callback function?

Question is also posted on 


Reply via email to