Github user holdenk commented on the issue:
https://github.com/apache/spark/pull/22010
I did a quick micro-benchmark on this and got:
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> import scala.collection.{mutable, Map}
> def removeDuplicatesInPartition(itr: Iterator[Int]): Iterator[Int] = {
> val set = new mutable.HashSet[Int]()
> itr.filter(set.add(_))
> }
>
> def time[R](block: => R): (Long, R) = {
> val t0 = System.nanoTime()
> val result = block // call-by-name
> val t1 = System.nanoTime()
> println("Elapsed time: " + (t1 - t0) + "ns")
> (t1, result)
> }
>
> val count = 1000000
> val inputData = sc.parallelize(1.to(count)).cache()
> inputData.count()
>
> val o1 = time(inputData.distinct().count())
> val n1 =
time(inputData.mapPartitions(removeDuplicatesInPartition).count())
> val n2 =
time(inputData.mapPartitions(removeDuplicatesInPartition).count())
> val o2 = time(inputData.distinct().count())
> val n3 =
time(inputData.mapPartitions(removeDuplicatesInPartition).count())
>
>
> // Exiting paste mode, now interpreting.
>
> Elapsed time: 2464151504ns
> Elapsed time: 219130154ns
> Elapsed time: 133545428ns
> Elapsed time: 927133584ns
> Elapsed time: 242432642ns
> import scala.collection.{mutable, Map}
> removeDuplicatesInPartition: (itr: Iterator[Int])Iterator[Int]
> time: [R](block: => R)(Long, R)
> count: Int = 1000000
> inputData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at
parallelize at <console>:47
> o1: (Long, Long) = (437102431151279,1000000)
> n1: (Long, Long) = (437102654798968,1000000)
> n2: (Long, Long) = (437102792389328,1000000)
> o2: (Long, Long) = (437103724196085,1000000)
> n3: (Long, Long) = (437103971061275,1000000)
>
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]