Hello everyone,

I have a spark application processing data iteratively within an RDD until
.isEmpty() is true. Now the loop is sort of like it follows

mainRDD = sc.parallelize(...) //initialize mainRDD
do {

rdd1 = mainRDD.flatMapToPair(advanceState)//advance state of element

rdd2 = rdd1.filter().mapToPair()
rdd3 = rdd1.filter().mapToPair()
rdd4 = rdd1.filter().mapToPair()

finalRDD = rdd3.leftOuterJoin(rdd4).map()

mainRDD = rdd2.union(rdd4).cache()
mainRDD.checkpoint()

} while(!mainRDD.isEmpty())

Eventually mainRDD will be empty and the computation will stop.
Functionally speaking, this loop is working, but I'm finding an issue with
a certain class of elements which could be contained within mainRDD.
These particular elements have an internal timer, a time constraint, which
causes the element to not advance in state, basically causing more than a
few empty loops.
Even though those timers function, in that when a timer terminates the
iterations continue, the last .isEmpty() action lasts more than the
previous iterations, causing a total execution time six times longer than
the value of the timer.
>From the application logs I can notice that this kind of slow down is
really caused by a longer than normal isEmpty execution, in which a greater
than normal number of tasks are executed (both local and cluster mode) and
it's particularly evident if I test locally with a single element within
the mainRDD.
Is it possible that Spark Streaming may be more recommended givent that I'm
dealing with some sort of time constraint?
Is there any way I can optimize the transformations flow I should look into?

Thank you,
Federico
-- 
Federico D'Ambrosio

Reply via email to