Hello, I'm trying to get a function to run on pairs of successive elements in an RDD. For example, if I have an RDD of Ints [5, 4, 3, 2, 1], and I run (_ - _) over pairs of successive elements, I want to get back an RDD [1, 1, 1, 1]. Basically it's a scan with window size 2, but run in parallel.
My first instinct was to do this with zip: copy the RDD, union a bogus element onto the front of one RDD to change its offset, and zip the original with the offset copy to get pairs of successive values. However, RDD zip doesn't support this: I end up trying to zip RDDs with differing numbers of partitions/numbers of elements in their partitions, and zip silently drops values. The docs seem to recommend zipPartitions for when you need to zip things with differing numbers of elements in each partition, but it seems that literally just gives you iterators over corresponding partitions, with no way to peek into the next partition if one RDD's partition runs out before the other one's does. So I can't use zipPartitions to construct a more traditional element-by-element zip. It seems like a traditional zip is probably not provided because it's not possible to efficiently implement it; in the general case, later partitions won't know how to match up their elements because they don't know how many elements were in prior partitions of each RDD. But the only other way I can think of to construct my scan operation is to count the whole input RDD, make an RDD of that many sequential integers (exatly matching the input RDD partitioning), zip those up, make a copy re-keyed by the index of the previous value, and join those two RDDs. This seems like a huge amount of work to do something that should be really simple: just scan through each partition individually, with a little logic to send each partition's first value to wherever the previous partition is being worked on. Is there some obvious way to implement a parallel scan operation that I'm missing? Or some reason it isn't as easy as I think it should be? Is this already in Spark somewhere where I haven't found it? Is there a better way to implement it than the count-and-zip-and-join method? Thanks, -Adam
