Re: partitioner aware subtract

2016-05-10 Thread Raghava Mutharaju
Thank you for the response.

This does not work on the test case that I mentioned in the previous email.

val data1 = Seq((1 -> 2), (1 -> 5), (2 -> 3), (3 -> 20), (3 -> 16))
val data2 = Seq((1 -> 2), (3 -> 30), (3 -> 16), (5 -> 12))
val rdd1 = sc.parallelize(data1, 8)
val rdd2 = sc.parallelize(data2, 8)
val diff = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
diff.collect().foreach(println)
(1,5)
(2,3)
(3,20)
(3,16)

(3, 16) shouldn't be in the diff. I guess this shows up because rdd2 is
smaller than rdd1 and rdd2's iterator (rightItr) would have completed
before leftIter?

Anyway, we did the subtract in the following way:

using mapPartitions, group the values by key as a set in rdd2. Then do a
left outer join of rdd1 with rdd2 and filter it. This preserves
partitioning and also takes into account that both RDDs are already hash
partitioned.

Regards,
Raghava.


On Tue, May 10, 2016 at 11:44 AM, Rishi Mishra 
wrote:

> As you have same partitioner and number of partitions probably you can use
> zipPartition and provide a user defined function to substract .
>
> A very primitive  example being.
>
> val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
> val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
> val rdd1 = sc.parallelize(data1, 2)
> val rdd2 = sc.parallelize(data2, 2)
> val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
>   leftItr.filter(p => !rightItr.contains(p))
> }
> sum.foreach(println)
>
>
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> We tried that but couldn't figure out a way to efficiently filter it.
>> Lets take two RDDs.
>>
>> rdd1:
>>
>> (1,2)
>> (1,5)
>> (2,3)
>> (3,20)
>> (3,16)
>>
>> rdd2:
>>
>> (1,2)
>> (3,30)
>> (3,16)
>> (5,12)
>>
>> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>>
>> (1,(2,Some(2)))
>> (1,(5,Some(2)))
>> (2,(3,None))
>> (3,(20,Some(30)))
>> (3,(20,Some(16)))
>> (3,(16,Some(30)))
>> (3,(16,Some(16)))
>>
>> case (x, (y, z)) => Apart from allowing z == None and filtering on y ==
>> z, we also should filter out (3, (16, Some(30))). How can we do that
>> efficiently without resorting to broadcast of any elements of rdd2?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:
>>
>>> How about outer join?
>>> On 9 May 2016 13:18, "Raghava Mutharaju" 
>>> wrote:
>>>
 Hello All,

 We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
 (number of partitions are same for both the RDDs). We would like to
 subtract rdd2 from rdd1.

 The subtract code at
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
 seems to group the elements of both the RDDs using (x, null) where x is the
 element of the RDD and partition them. Then it makes use of
 subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
 case, is both key and value combined). In our case, both the RDDs are
 already hash partitioned on the key of x. Can we take advantage of this by
 having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
 mapPartitions() for this?

 We tried to broadcast rdd2 and use mapPartitions. But this turns out to
 be memory consuming and inefficient. We tried to do a local set difference
 between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
 use destroy() on the broadcasted value, but it does not help.

 The current subtract method is slow for us. rdd1 and rdd2 are around
 700MB each and the subtract takes around 14 seconds.

 Any ideas on this issue is highly appreciated.

 Regards,
 Raghava.

>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: partitioner aware subtract

2016-05-10 Thread Rishi Mishra
As you have same partitioner and number of partitions probably you can use
zipPartition and provide a user defined function to substract .

A very primitive  example being.

val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
val rdd1 = sc.parallelize(data1, 2)
val rdd2 = sc.parallelize(data2, 2)
val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
sum.foreach(println)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju  wrote:

> We tried that but couldn't figure out a way to efficiently filter it. Lets
> take two RDDs.
>
> rdd1:
>
> (1,2)
> (1,5)
> (2,3)
> (3,20)
> (3,16)
>
> rdd2:
>
> (1,2)
> (3,30)
> (3,16)
> (5,12)
>
> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>
> (1,(2,Some(2)))
> (1,(5,Some(2)))
> (2,(3,None))
> (3,(20,Some(30)))
> (3,(20,Some(16)))
> (3,(16,Some(30)))
> (3,(16,Some(16)))
>
> case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
> we also should filter out (3, (16, Some(30))). How can we do that
> efficiently without resorting to broadcast of any elements of rdd2?
>
> Regards,
> Raghava.
>
>
> On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:
>
>> How about outer join?
>> On 9 May 2016 13:18, "Raghava Mutharaju" 
>> wrote:
>>
>>> Hello All,
>>>
>>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>>> (number of partitions are same for both the RDDs). We would like to
>>> subtract rdd2 from rdd1.
>>>
>>> The subtract code at
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>>> seems to group the elements of both the RDDs using (x, null) where x is the
>>> element of the RDD and partition them. Then it makes use of
>>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>>> case, is both key and value combined). In our case, both the RDDs are
>>> already hash partitioned on the key of x. Can we take advantage of this by
>>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>>> mapPartitions() for this?
>>>
>>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>>> be memory consuming and inefficient. We tried to do a local set difference
>>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>>> use destroy() on the broadcasted value, but it does not help.
>>>
>>> The current subtract method is slow for us. rdd1 and rdd2 are around
>>> 700MB each and the subtract takes around 14 seconds.
>>>
>>> Any ideas on this issue is highly appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


Re: partitioner aware subtract

2016-05-09 Thread Raghava Mutharaju
We tried that but couldn't figure out a way to efficiently filter it. Lets
take two RDDs.

rdd1:

(1,2)
(1,5)
(2,3)
(3,20)
(3,16)

rdd2:

(1,2)
(3,30)
(3,16)
(5,12)

rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):

(1,(2,Some(2)))
(1,(5,Some(2)))
(2,(3,None))
(3,(20,Some(30)))
(3,(20,Some(16)))
(3,(16,Some(30)))
(3,(16,Some(16)))

case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
we also should filter out (3, (16, Some(30))). How can we do that
efficiently without resorting to broadcast of any elements of rdd2?

Regards,
Raghava.


On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:

> How about outer join?
> On 9 May 2016 13:18, "Raghava Mutharaju" 
> wrote:
>
>> Hello All,
>>
>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>> (number of partitions are same for both the RDDs). We would like to
>> subtract rdd2 from rdd1.
>>
>> The subtract code at
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>> seems to group the elements of both the RDDs using (x, null) where x is the
>> element of the RDD and partition them. Then it makes use of
>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>> case, is both key and value combined). In our case, both the RDDs are
>> already hash partitioned on the key of x. Can we take advantage of this by
>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>> mapPartitions() for this?
>>
>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>> be memory consuming and inefficient. We tried to do a local set difference
>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>> use destroy() on the broadcasted value, but it does not help.
>>
>> The current subtract method is slow for us. rdd1 and rdd2 are around
>> 700MB each and the subtract takes around 14 seconds.
>>
>> Any ideas on this issue is highly appreciated.
>>
>> Regards,
>> Raghava.
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: partitioner aware subtract

2016-05-09 Thread ayan guha
How about outer join?
On 9 May 2016 13:18, "Raghava Mutharaju"  wrote:

> Hello All,
>
> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
> (number of partitions are same for both the RDDs). We would like to
> subtract rdd2 from rdd1.
>
> The subtract code at
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
> seems to group the elements of both the RDDs using (x, null) where x is the
> element of the RDD and partition them. Then it makes use of
> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
> case, is both key and value combined). In our case, both the RDDs are
> already hash partitioned on the key of x. Can we take advantage of this by
> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
> mapPartitions() for this?
>
> We tried to broadcast rdd2 and use mapPartitions. But this turns out to be
> memory consuming and inefficient. We tried to do a local set difference
> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
> use destroy() on the broadcasted value, but it does not help.
>
> The current subtract method is slow for us. rdd1 and rdd2 are around 700MB
> each and the subtract takes around 14 seconds.
>
> Any ideas on this issue is highly appreciated.
>
> Regards,
> Raghava.
>


partitioner aware subtract

2016-05-08 Thread Raghava Mutharaju
Hello All,

We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key (number
of partitions are same for both the RDDs). We would like to subtract rdd2
from rdd1.

The subtract code at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
seems to group the elements of both the RDDs using (x, null) where x is the
element of the RDD and partition them. Then it makes use of
subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
case, is both key and value combined). In our case, both the RDDs are
already hash partitioned on the key of x. Can we take advantage of this by
having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
mapPartitions() for this?

We tried to broadcast rdd2 and use mapPartitions. But this turns out to be
memory consuming and inefficient. We tried to do a local set difference
between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
use destroy() on the broadcasted value, but it does not help.

The current subtract method is slow for us. rdd1 and rdd2 are around 700MB
each and the subtract takes around 14 seconds.

Any ideas on this issue is highly appreciated.

Regards,
Raghava.