Github user holdenk commented on the issue:
https://github.com/apache/spark/pull/22010
Did another quick micro benchmark on a small cluster:
```scala
import org.apache.spark.util.collection.ExternalAppendOnlyMap
def removeDuplicatesInPartition(partition: Iterator[(Int, Int)]):
Iterator[(Int, Int)] = {
// Create an instance of external append only map which ignores values.
val map = new ExternalAppendOnlyMap[(Int, Int), Null, Null](
createCombiner = value => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
def time[R](block: => R): (Long, R) = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
(t1, result)
}
val count = 10000000
val inputData = sc.parallelize(1.to(count))
val keyed = inputData.map(x => (x % 100, x))
val shuffled = keyed.repartition(50).cache()
shuffled.count()
val o1 = time(shuffled.distinct().count())
val n1 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
val n2 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
val o2 = time(shuffled.distinct().count())
val n3 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
```
And the result is:
> Elapsed time: 1790932239ns
> Elapsed time: 381450402ns
> Elapsed time: 340449179ns
> Elapsed time: 1524955492ns
> Elapsed time: 291948041ns
> import org.apache.spark.util.collection.ExternalAppendOnlyMap
> removeDuplicatesInPartition: (partition: Iterator[(Int,
Int)])Iterator[(Int, Int)]
> time: [R](block: => R)(Long, R)
> count: Int = 10000000
> inputData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at
parallelize at <console>:52
> keyed: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at map
at <console>:53
> shuffled: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at
repartition at <console>:54
> o1: (Long, Long) = (2943493642271881,10000000)
> n1: (Long, Long) = (2943494027399482,10000000)
> n2: (Long, Long) = (2943494371228656,10000000)
> o2: (Long, Long) = (2943495899580372,10000000)
> n3: (Long, Long) = (2943496195569891,10000000)
>
Increasing count by a factor of 10 we get:
> Elapsed time: 21679193176ns
> Elapsed time: 3114223737ns
> Elapsed time: 3348141004ns
> Elapsed time: 51267597984ns
> Elapsed time: 3931899963ns
> import org.apache.spark.util.collection.ExternalAppendOnlyMap
> removeDuplicatesInPartition: (partition: Iterator[(Int,
Int)])Iterator[(Int, Int)]
> time: [R](block: => R)(Long, R)
> count: Int = 100000000
> inputData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at
parallelize at <console>:56
> keyed: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[26] at map
at <console>:57
> shuffled: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[30] at
repartition at <console>:58
> o1: (Long, Long) = (2943648438919959,100000000)
> n1: (Long, Long) = (2943651557292201,100000000)
> n2: (Long, Long) = (2943654909392808,100000000)
> o2: (Long, Long) = (2943706180722021,100000000)
> n3: (Long, Long) = (2943710116461734,100000000)
>
>
So that looks like close to an order of magnitude improvement.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]