I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at <console>:23 []
 |  MapPartitionsRDD[33] at mapValues at <console>:23 []
 |  ShuffledRDD[32] at reduceByKey at <console>:23 []
 +-(8) MapPartitionsRDD[31] at map at <console>:23 []
    |  ParallelCollectionRDD[30] at parallelize at <console>:23 []

The difference is that spark has no way to know that your map closure
doesn't change the key. if you only use mapValues, it does. Pretty cool
that they optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang <zzh...@hortonworks.com>:

> Hi Folks,
>
> Does anybody know what is the reason not allowing preserverPartitioning in
> RDD.map? Do I miss something here?
>
> Following example involves two shuffles. I think if preservePartitioning
> is allowed, we can avoid the second one, right?
>
>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>  val r2 = r1.map((_, 1))
>  val r3 = r2.reduceByKey(_+_)
>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>  val r5 = r4.reduceByKey(_+_)
>  r5.collect.foreach(println)
>
> scala> r5.toDebugString
> res2: String =
> (8) ShuffledRDD[4] at reduceByKey at <console>:29 []
>  +-(8) MapPartitionsRDD[3] at map at <console>:27 []
>     |  ShuffledRDD[2] at reduceByKey at <console>:25 []
>     +-(8) MapPartitionsRDD[1] at map at <console>:23 []
>        |  ParallelCollectionRDD[0] at parallelize at <console>:21 []
>
> Thanks.
>
> Zhan Zhang
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to