RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at console:23 []
 |  MapPartitionsRDD[33] at mapValues at console:23 []
 |  ShuffledRDD[32] at reduceByKey at console:23 []
 +-(8) MapPartitionsRDD[31] at map at console:23 []
|  ParallelCollectionRDD[30] at parallelize at console:23 []

The difference is that spark has no way to know that your map closure
doesn't change the key. if you only use mapValues, it does. Pretty cool
that they optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:

 Hi Folks,

 Does anybody know what is the reason not allowing preserverPartitioning in
 RDD.map? Do I miss something here?

 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?

  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
  val r2 = r1.map((_, 1))
  val r3 = r2.reduceByKey(_+_)
  val r4 = r3.map(x=(x._1, x._2 + 1))
  val r5 = r4.reduceByKey(_+_)
  r5.collect.foreach(println)

 scala r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at console:29 []
  +-(8) MapPartitionsRDD[3] at map at console:27 []
 |  ShuffledRDD[2] at reduceByKey at console:25 []
 +-(8) MapPartitionsRDD[1] at map at console:23 []
|  ParallelCollectionRDD[0] at parallelize at console:21 []

 Thanks.

 Zhan Zhang

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks Jonathan. You are right regarding rewrite the example.

I mean providing such option to developer so that it is controllable. The 
example may seems silly, and I don’t know the use cases.

But for example, if I also want to operate both the key and value part to 
generate some new value with keeping key part untouched. Then mapValues may not 
be able to  do this.

Changing the code to allow this is trivial, but I don’t know whether there is 
some special reason behind this.

Thanks.

Zhan Zhang



On Mar 26, 2015, at 2:49 PM, Jonathan Coveney 
jcove...@gmail.commailto:jcove...@gmail.com wrote:

I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at console:23 []
 |  MapPartitionsRDD[33] at mapValues at console:23 []
 |  ShuffledRDD[32] at reduceByKey at console:23 []
 +-(8) MapPartitionsRDD[31] at map at console:23 []
|  ParallelCollectionRDD[30] at parallelize at console:23 []

The difference is that spark has no way to know that your map closure doesn't 
change the key. if you only use mapValues, it does. Pretty cool that they 
optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com:
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
This is just a deficiency of the api, imo. I agree: mapValues could
definitely be a function (K, V)=V1. The option isn't set by the function,
it's on the RDD. So you could look at the code and do this.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

 def mapValues[U](f: V = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
  preservesPartitioning = true)
  }

What you want:

 def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) },
  preservesPartitioning = true)
  }

One of the nice things about spark is that making such new operators is
very easy :)

2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:

  Thanks Jonathan. You are right regarding rewrite the example.

  I mean providing such option to developer so that it is controllable.
 The example may seems silly, and I don’t know the use cases.

 But for example, if I also want to operate both the key and value part to
 generate some new value with keeping key part untouched. Then mapValues may
 not be able to  do this.

  Changing the code to allow this is trivial, but I don’t know whether
 there is some special reason behind this.

  Thanks.

  Zhan Zhang




  On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote:

  I believe if you do the following:


 sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

  (8) MapPartitionsRDD[34] at reduceByKey at console:23 []
  |  MapPartitionsRDD[33] at mapValues at console:23 []
  |  ShuffledRDD[32] at reduceByKey at console:23 []
  +-(8) MapPartitionsRDD[31] at map at console:23 []
 |  ParallelCollectionRDD[30] at parallelize at console:23 []

  The difference is that spark has no way to know that your map closure
 doesn't change the key. if you only use mapValues, it does. Pretty cool
 that they optimized that :)

 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:

 Hi Folks,

 Does anybody know what is the reason not allowing preserverPartitioning
 in RDD.map? Do I miss something here?

 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?

  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
  val r2 = r1.map((_, 1))
  val r3 = r2.reduceByKey(_+_)
  val r4 = r3.map(x=(x._1, x._2 + 1))
  val r5 = r4.reduceByKey(_+_)
  r5.collect.foreach(println)

 scala r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at console:29 []
  +-(8) MapPartitionsRDD[3] at map at console:27 []
 |  ShuffledRDD[2] at reduceByKey at console:25 []
 +-(8) MapPartitionsRDD[1] at map at console:23 []
|  ParallelCollectionRDD[0] at parallelize at console:21 []

 Thanks.

 Zhan Zhang

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Patrick Wendell
I think we have a version of mapPartitions that allows you to tell
Spark the partitioning is preserved:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639

We could also add a map function that does same. Or you can just write
your map using an iterator.

- Patrick

On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote:
 This is just a deficiency of the api, imo. I agree: mapValues could
 definitely be a function (K, V)=V1. The option isn't set by the function,
 it's on the RDD. So you could look at the code and do this.
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

  def mapValues[U](f: V = U): RDD[(K, U)] = {
 val cleanF = self.context.clean(f)
 new MapPartitionsRDD[(K, U), (K, V)](self,
   (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
   preservesPartitioning = true)
   }

 What you want:

  def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
 val cleanF = self.context.clean(f)
 new MapPartitionsRDD[(K, U), (K, V)](self,
   (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) },
   preservesPartitioning = true)
   }

 One of the nice things about spark is that making such new operators is very
 easy :)

 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:

 Thanks Jonathan. You are right regarding rewrite the example.

 I mean providing such option to developer so that it is controllable. The
 example may seems silly, and I don't know the use cases.

 But for example, if I also want to operate both the key and value part to
 generate some new value with keeping key part untouched. Then mapValues may
 not be able to  do this.

 Changing the code to allow this is trivial, but I don't know whether there
 is some special reason behind this.

 Thanks.

 Zhan Zhang




 On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote:

 I believe if you do the following:


 sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

 (8) MapPartitionsRDD[34] at reduceByKey at console:23 []
  |  MapPartitionsRDD[33] at mapValues at console:23 []
  |  ShuffledRDD[32] at reduceByKey at console:23 []
  +-(8) MapPartitionsRDD[31] at map at console:23 []
 |  ParallelCollectionRDD[30] at parallelize at console:23 []

 The difference is that spark has no way to know that your map closure
 doesn't change the key. if you only use mapValues, it does. Pretty cool that
 they optimized that :)

 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:

 Hi Folks,

 Does anybody know what is the reason not allowing preserverPartitioning
 in RDD.map? Do I miss something here?

 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?

  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
  val r2 = r1.map((_, 1))
  val r3 = r2.reduceByKey(_+_)
  val r4 = r3.map(x=(x._1, x._2 + 1))
  val r5 = r4.reduceByKey(_+_)
  r5.collect.foreach(println)

 scala r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at console:29 []
  +-(8) MapPartitionsRDD[3] at map at console:27 []
 |  ShuffledRDD[2] at reduceByKey at console:25 []
 +-(8) MapPartitionsRDD[1] at map at console:23 []
|  ParallelCollectionRDD[0] at parallelize at console:21 []

 Thanks.

 Zhan Zhang

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks all for the quick response.

Thanks.

Zhan Zhang

On Mar 26, 2015, at 3:14 PM, Patrick Wendell pwend...@gmail.com wrote:

 I think we have a version of mapPartitions that allows you to tell
 Spark the partitioning is preserved:
 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639
 
 We could also add a map function that does same. Or you can just write
 your map using an iterator.
 
 - Patrick
 
 On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote:
 This is just a deficiency of the api, imo. I agree: mapValues could
 definitely be a function (K, V)=V1. The option isn't set by the function,
 it's on the RDD. So you could look at the code and do this.
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
 
 def mapValues[U](f: V = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
  preservesPartitioning = true)
  }
 
 What you want:
 
 def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) },
  preservesPartitioning = true)
  }
 
 One of the nice things about spark is that making such new operators is very
 easy :)
 
 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:
 
 Thanks Jonathan. You are right regarding rewrite the example.
 
 I mean providing such option to developer so that it is controllable. The
 example may seems silly, and I don't know the use cases.
 
 But for example, if I also want to operate both the key and value part to
 generate some new value with keeping key part untouched. Then mapValues may
 not be able to  do this.
 
 Changing the code to allow this is trivial, but I don't know whether there
 is some special reason behind this.
 
 Thanks.
 
 Zhan Zhang
 
 
 
 
 On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote:
 
 I believe if you do the following:
 
 
 sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
 
 (8) MapPartitionsRDD[34] at reduceByKey at console:23 []
 |  MapPartitionsRDD[33] at mapValues at console:23 []
 |  ShuffledRDD[32] at reduceByKey at console:23 []
 +-(8) MapPartitionsRDD[31] at map at console:23 []
|  ParallelCollectionRDD[30] at parallelize at console:23 []
 
 The difference is that spark has no way to know that your map closure
 doesn't change the key. if you only use mapValues, it does. Pretty cool that
 they optimized that :)
 
 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:
 
 Hi Folks,
 
 Does anybody know what is the reason not allowing preserverPartitioning
 in RDD.map? Do I miss something here?
 
 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?
 
 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)
 
 scala r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []
 
 Thanks.
 
 Zhan Zhang
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org