Hello everyone,
I have a spark application processing data iteratively within an RDD until
.isEmpty() is true. Now the loop is sort of like it follows
mainRDD = sc.parallelize(...) //initialize mainRDD
do {
rdd1 = mainRDD.flatMapToPair(advanceState)//advance state of element
rdd2 = rdd1.filter().mapToPair()
rdd3 = rdd1.filter().mapToPair()
rdd4 = rdd1.filter().mapToPair()
finalRDD = rdd3.leftOuterJoin(rdd4).map()
mainRDD = rdd2.union(rdd4).cache()
mainRDD.checkpoint()
} while(!mainRDD.isEmpty())
Eventually mainRDD will be empty and the computation will stop.
Functionally speaking, this loop is working, but I'm finding an issue with
a certain class of elements which could be contained within mainRDD.
These particular elements have an internal timer, a time constraint, which
causes the element to not advance in state, basically causing more than a
few empty loops.
Even though those timers function, in that when a timer terminates the
iterations continue, the last .isEmpty() action lasts more than the
previous iterations, causing a total execution time six times longer than
the value of the timer.
>From the application logs I can notice that this kind of slow down is
really caused by a longer than normal isEmpty execution, in which a greater
than normal number of tasks are executed (both local and cluster mode) and
it's particularly evident if I test locally with a single element within
the mainRDD.
Is it possible that Spark Streaming may be more recommended givent that I'm
dealing with some sort of time constraint?
Is there any way I can optimize the transformations flow I should look into?
Thank you,
Federico
--
Federico D'Ambrosio