Mohit, if you want to end up with (1 .. N) , why don't you skip the logic for finding missing values, and generate it directly?
val max = myCollection.reduce(math.max) sc.parallelize((0 until max)) In either case, you don't need to call cache, which will force it into memory - you can do something like "count" which will not necessarily store the RDD in memory. Additionally, instead of an accumulable, you could consider mapping that value directly: rdd.mapPartitionWithIndex{case(index, partition) => index -> partition.reduce(math.max)}.collectAsMap() On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi <mohitja...@gmail.com> wrote: > Thanks Brian. This works. I used Accumulable to do the "collect" in step > B. While doing that I found that Accumulable.value is not a Spark "action", > I need to call "cache" in the underlying RDD for "value" to work. Not sure > if that is intentional or a bug. > The "collect" of Step B can be done as a new RDD too. > > > On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <bgaw...@gmail.com> wrote: > >> I don't think there's a direct way of bleeding elements across >> partitions. But you could write it yourself relatively succinctly: >> >> A) Sort the RDD >> B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( >> ) method. Map each partition to its partition ID, and its maximum element. >> Collect the (partID, maxElements) in the driver. >> C) Broadcast the collection of (partID, part's max element) tuples >> D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex( >> ). For each partition *K:* >> D1) Find the immediately-preceding partition* K -1 , *and its associated >> maximum value. Use that to decide how many values are missing between the >> last element of part *K-1 *and the first element of part *K*. >> D2) Step through part *K*'s elements and find the rest of the missing >> elements in that part >> >> This approach sidesteps worries you might have over the hack of using >> .filter to remove the first element, as well as the zipping. >> >> --Brian >> >> >> >> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <mohitja...@gmail.com>wrote: >> >>> Hi, >>> I am trying to find a way to fill in missing values in an RDD. The RDD >>> is a sorted sequence. >>> For example, (1, 2, 3, 5, 8, 11, ...) >>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11) >>> >>> One way to do this is to "slide and zip" >>> rdd1 = sc.parallelize(List(1, 2, 3, 5, 8, 11, ...)) >>> x = rdd1.first >>> rdd2 = rdd1 filter (_ != x) >>> rdd3 = rdd2 zip rdd1 >>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and >>> y } >>> >>> Another method which I think is more efficient is to use >>> mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each >>> partition. However, that leaves the boundaries of the partitions to be >>> "unfilled". *Is there a way within the function passed to >>> mapPartitions, to read the first element in the next partition?* >>> >>> The latter approach also appears to work for a general "sliding window" >>> calculation on the RDD. The former technique requires a lot of "sliding and >>> zipping" and I believe it is not efficient. If only I could read the next >>> partition...I have tried passing a pointer to rdd1 to the function passed >>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because >>> Spark cannot deal with a mapper calling another mapper (since it happens on >>> a worker not the driver) >>> >>> Mohit. >>> >>> >> > -- . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Austin Gibbons Research | quantiFind <http://www.quantifind.com/> | 708 601 4894 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .