RDD.map does not allowed to preservesPartitioning?
Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don’t know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.commailto:jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
This is just a deficiency of the api, imo. I agree: mapValues could definitely be a function (K, V)=V1. The option isn't set by the function, it's on the RDD. So you could look at the code and do this. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) }, preservesPartitioning = true) } One of the nice things about spark is that making such new operators is very easy :) 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don’t know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
I think we have a version of mapPartitions that allows you to tell Spark the partitioning is preserved: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639 We could also add a map function that does same. Or you can just write your map using an iterator. - Patrick On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote: This is just a deficiency of the api, imo. I agree: mapValues could definitely be a function (K, V)=V1. The option isn't set by the function, it's on the RDD. So you could look at the code and do this. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) }, preservesPartitioning = true) } One of the nice things about spark is that making such new operators is very easy :) 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don't know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don't know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
Thanks all for the quick response. Thanks. Zhan Zhang On Mar 26, 2015, at 3:14 PM, Patrick Wendell pwend...@gmail.com wrote: I think we have a version of mapPartitions that allows you to tell Spark the partitioning is preserved: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639 We could also add a map function that does same. Or you can just write your map using an iterator. - Patrick On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote: This is just a deficiency of the api, imo. I agree: mapValues could definitely be a function (K, V)=V1. The option isn't set by the function, it's on the RDD. So you could look at the code and do this. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) }, preservesPartitioning = true) } One of the nice things about spark is that making such new operators is very easy :) 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don't know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don't know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org