Mohit, if you want to end up with (1 .. N) , why don't you skip the logic
for finding missing values, and generate it directly?

val max = myCollection.reduce(math.max)
sc.parallelize((0 until max))

In either case, you don't need to call cache, which will force it into
memory - you can do something like "count" which will not necessarily store
the RDD in memory.

Additionally, instead of an accumulable, you could consider mapping that
value directly:

rdd.mapPartitionWithIndex{case(index, partition) => index ->
partition.reduce(math.max)}.collectAsMap()


On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:

> Thanks Brian. This works. I used Accumulable to do the "collect" in step
> B. While doing that I found that Accumulable.value is not a Spark "action",
> I need to call "cache" in the underlying RDD for "value" to work. Not sure
> if that is intentional or a bug.
> The "collect" of Step B can be done as a new RDD too.
>
>
> On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <bgaw...@gmail.com> wrote:
>
>> I don't think there's a direct way of bleeding elements across
>> partitions. But you could write it yourself relatively succinctly:
>>
>> A) Sort the RDD
>> B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex(
>> ) method. Map each partition to its partition ID, and its maximum element.
>> Collect the (partID, maxElements) in the driver.
>> C) Broadcast the collection of (partID, part's max element) tuples
>> D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex(
>> ). For each partition *K:*
>> D1) Find the immediately-preceding partition* K -1 , *and its associated
>> maximum value. Use that to decide how many values are missing between the
>> last element of part *K-1 *and the first element of part *K*.
>> D2) Step through part *K*'s elements and find the rest of the missing
>> elements in that part
>>
>> This approach sidesteps worries you might have over the hack of using
>> .filter to remove the first element, as well as the zipping.
>>
>> --Brian
>>
>>
>>
>> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <mohitja...@gmail.com>wrote:
>>
>>> Hi,
>>> I am trying to find a way to fill in missing values in an RDD. The RDD
>>> is a sorted sequence.
>>> For example, (1, 2, 3, 5, 8, 11, ...)
>>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>>>
>>> One way to do this is to "slide and zip"
>>> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>>> x = rdd1.first
>>> rdd2 = rdd1 filter (_ != x)
>>> rdd3 = rdd2 zip rdd1
>>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and
>>> y }
>>>
>>> Another method which I think is more efficient is to use
>>> mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each
>>> partition. However, that leaves the boundaries of the partitions to be
>>> "unfilled". *Is there a way within the function passed to
>>> mapPartitions, to read the first element in the next partition?*
>>>
>>> The latter approach also appears to work for a general "sliding window"
>>> calculation on the RDD. The former technique requires a lot of "sliding and
>>> zipping" and I believe it is not efficient. If only I could read the next
>>> partition...I have tried passing a pointer to rdd1 to the function passed
>>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>>> Spark cannot deal with a mapper calling another mapper (since it happens on
>>> a worker not the driver)
>>>
>>> Mohit.
>>>
>>>
>>
>


-- 
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Austin Gibbons
Research | quantiFind <http://www.quantifind.com/> | 708 601 4894
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .

Reply via email to