Re: partitioner aware subtract
Thank you for the response. This does not work on the test case that I mentioned in the previous email. val data1 = Seq((1 -> 2), (1 -> 5), (2 -> 3), (3 -> 20), (3 -> 16)) val data2 = Seq((1 -> 2), (3 -> 30), (3 -> 16), (5 -> 12)) val rdd1 = sc.parallelize(data1, 8) val rdd2 = sc.parallelize(data2, 8) val diff = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) => leftItr.filter(p => !rightItr.contains(p)) } diff.collect().foreach(println) (1,5) (2,3) (3,20) (3,16) (3, 16) shouldn't be in the diff. I guess this shows up because rdd2 is smaller than rdd1 and rdd2's iterator (rightItr) would have completed before leftIter? Anyway, we did the subtract in the following way: using mapPartitions, group the values by key as a set in rdd2. Then do a left outer join of rdd1 with rdd2 and filter it. This preserves partitioning and also takes into account that both RDDs are already hash partitioned. Regards, Raghava. On Tue, May 10, 2016 at 11:44 AM, Rishi Mishra wrote: > As you have same partitioner and number of partitions probably you can use > zipPartition and provide a user defined function to substract . > > A very primitive example being. > > val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7) > val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6) > val rdd1 = sc.parallelize(data1, 2) > val rdd2 = sc.parallelize(data2, 2) > val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) => > leftItr.filter(p => !rightItr.contains(p)) > } > sum.foreach(println) > > > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra > > On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju < > m.vijayaragh...@gmail.com> wrote: > >> We tried that but couldn't figure out a way to efficiently filter it. >> Lets take two RDDs. >> >> rdd1: >> >> (1,2) >> (1,5) >> (2,3) >> (3,20) >> (3,16) >> >> rdd2: >> >> (1,2) >> (3,30) >> (3,16) >> (5,12) >> >> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2): >> >> (1,(2,Some(2))) >> (1,(5,Some(2))) >> (2,(3,None)) >> (3,(20,Some(30))) >> (3,(20,Some(16))) >> (3,(16,Some(30))) >> (3,(16,Some(16))) >> >> case (x, (y, z)) => Apart from allowing z == None and filtering on y == >> z, we also should filter out (3, (16, Some(30))). How can we do that >> efficiently without resorting to broadcast of any elements of rdd2? >> >> Regards, >> Raghava. >> >> >> On Mon, May 9, 2016 at 6:27 AM, ayan guha wrote: >> >>> How about outer join? >>> On 9 May 2016 13:18, "Raghava Mutharaju" >>> wrote: >>> Hello All, We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key (number of partitions are same for both the RDDs). We would like to subtract rdd2 from rdd1. The subtract code at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala seems to group the elements of both the RDDs using (x, null) where x is the element of the RDD and partition them. Then it makes use of subtractByKey(). This way, RDDs have to be repartitioned on x (which in our case, is both key and value combined). In our case, both the RDDs are already hash partitioned on the key of x. Can we take advantage of this by having a PairRDD/HashPartitioner-aware subtract? Is there a way to use mapPartitions() for this? We tried to broadcast rdd2 and use mapPartitions. But this turns out to be memory consuming and inefficient. We tried to do a local set difference between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did use destroy() on the broadcasted value, but it does not help. The current subtract method is slow for us. rdd1 and rdd2 are around 700MB each and the subtract takes around 14 seconds. Any ideas on this issue is highly appreciated. Regards, Raghava. >>> >> >> >> -- >> Regards, >> Raghava >> http://raghavam.github.io >> > > -- Regards, Raghava http://raghavam.github.io
Re: partitioner aware subtract
As you have same partitioner and number of partitions probably you can use zipPartition and provide a user defined function to substract . A very primitive example being. val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7) val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6) val rdd1 = sc.parallelize(data1, 2) val rdd2 = sc.parallelize(data2, 2) val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) => leftItr.filter(p => !rightItr.contains(p)) } sum.foreach(println) Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju wrote: > We tried that but couldn't figure out a way to efficiently filter it. Lets > take two RDDs. > > rdd1: > > (1,2) > (1,5) > (2,3) > (3,20) > (3,16) > > rdd2: > > (1,2) > (3,30) > (3,16) > (5,12) > > rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2): > > (1,(2,Some(2))) > (1,(5,Some(2))) > (2,(3,None)) > (3,(20,Some(30))) > (3,(20,Some(16))) > (3,(16,Some(30))) > (3,(16,Some(16))) > > case (x, (y, z)) => Apart from allowing z == None and filtering on y == z, > we also should filter out (3, (16, Some(30))). How can we do that > efficiently without resorting to broadcast of any elements of rdd2? > > Regards, > Raghava. > > > On Mon, May 9, 2016 at 6:27 AM, ayan guha wrote: > >> How about outer join? >> On 9 May 2016 13:18, "Raghava Mutharaju" >> wrote: >> >>> Hello All, >>> >>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key >>> (number of partitions are same for both the RDDs). We would like to >>> subtract rdd2 from rdd1. >>> >>> The subtract code at >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >>> seems to group the elements of both the RDDs using (x, null) where x is the >>> element of the RDD and partition them. Then it makes use of >>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our >>> case, is both key and value combined). In our case, both the RDDs are >>> already hash partitioned on the key of x. Can we take advantage of this by >>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use >>> mapPartitions() for this? >>> >>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to >>> be memory consuming and inefficient. We tried to do a local set difference >>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did >>> use destroy() on the broadcasted value, but it does not help. >>> >>> The current subtract method is slow for us. rdd1 and rdd2 are around >>> 700MB each and the subtract takes around 14 seconds. >>> >>> Any ideas on this issue is highly appreciated. >>> >>> Regards, >>> Raghava. >>> >> > > > -- > Regards, > Raghava > http://raghavam.github.io >
Re: partitioner aware subtract
We tried that but couldn't figure out a way to efficiently filter it. Lets take two RDDs. rdd1: (1,2) (1,5) (2,3) (3,20) (3,16) rdd2: (1,2) (3,30) (3,16) (5,12) rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2): (1,(2,Some(2))) (1,(5,Some(2))) (2,(3,None)) (3,(20,Some(30))) (3,(20,Some(16))) (3,(16,Some(30))) (3,(16,Some(16))) case (x, (y, z)) => Apart from allowing z == None and filtering on y == z, we also should filter out (3, (16, Some(30))). How can we do that efficiently without resorting to broadcast of any elements of rdd2? Regards, Raghava. On Mon, May 9, 2016 at 6:27 AM, ayan guha wrote: > How about outer join? > On 9 May 2016 13:18, "Raghava Mutharaju" > wrote: > >> Hello All, >> >> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key >> (number of partitions are same for both the RDDs). We would like to >> subtract rdd2 from rdd1. >> >> The subtract code at >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >> seems to group the elements of both the RDDs using (x, null) where x is the >> element of the RDD and partition them. Then it makes use of >> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our >> case, is both key and value combined). In our case, both the RDDs are >> already hash partitioned on the key of x. Can we take advantage of this by >> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use >> mapPartitions() for this? >> >> We tried to broadcast rdd2 and use mapPartitions. But this turns out to >> be memory consuming and inefficient. We tried to do a local set difference >> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did >> use destroy() on the broadcasted value, but it does not help. >> >> The current subtract method is slow for us. rdd1 and rdd2 are around >> 700MB each and the subtract takes around 14 seconds. >> >> Any ideas on this issue is highly appreciated. >> >> Regards, >> Raghava. >> > -- Regards, Raghava http://raghavam.github.io
Re: partitioner aware subtract
How about outer join? On 9 May 2016 13:18, "Raghava Mutharaju" wrote: > Hello All, > > We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key > (number of partitions are same for both the RDDs). We would like to > subtract rdd2 from rdd1. > > The subtract code at > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala > seems to group the elements of both the RDDs using (x, null) where x is the > element of the RDD and partition them. Then it makes use of > subtractByKey(). This way, RDDs have to be repartitioned on x (which in our > case, is both key and value combined). In our case, both the RDDs are > already hash partitioned on the key of x. Can we take advantage of this by > having a PairRDD/HashPartitioner-aware subtract? Is there a way to use > mapPartitions() for this? > > We tried to broadcast rdd2 and use mapPartitions. But this turns out to be > memory consuming and inefficient. We tried to do a local set difference > between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did > use destroy() on the broadcasted value, but it does not help. > > The current subtract method is slow for us. rdd1 and rdd2 are around 700MB > each and the subtract takes around 14 seconds. > > Any ideas on this issue is highly appreciated. > > Regards, > Raghava. >
partitioner aware subtract
Hello All, We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key (number of partitions are same for both the RDDs). We would like to subtract rdd2 from rdd1. The subtract code at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala seems to group the elements of both the RDDs using (x, null) where x is the element of the RDD and partition them. Then it makes use of subtractByKey(). This way, RDDs have to be repartitioned on x (which in our case, is both key and value combined). In our case, both the RDDs are already hash partitioned on the key of x. Can we take advantage of this by having a PairRDD/HashPartitioner-aware subtract? Is there a way to use mapPartitions() for this? We tried to broadcast rdd2 and use mapPartitions. But this turns out to be memory consuming and inefficient. We tried to do a local set difference between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did use destroy() on the broadcasted value, but it does not help. The current subtract method is slow for us. rdd1 and rdd2 are around 700MB each and the subtract takes around 14 seconds. Any ideas on this issue is highly appreciated. Regards, Raghava.