Github user holdenk commented on the issue:

    https://github.com/apache/spark/pull/22010
  
    Did another quick micro benchmark on a small cluster:
    
    ```scala
    import org.apache.spark.util.collection.ExternalAppendOnlyMap
    
    def removeDuplicatesInPartition(partition: Iterator[(Int, Int)]): 
Iterator[(Int, Int)] = {
      // Create an instance of external append only map which ignores values.
      val map = new ExternalAppendOnlyMap[(Int, Int), Null, Null](
        createCombiner = value => null,
        mergeValue = (a, b) => a,
        mergeCombiners = (a, b) => a)
      map.insertAll(partition.map(_ -> null))
      map.iterator.map(_._1)
    }
    
    
    def time[R](block: => R): (Long, R) = {
    val t0 = System.nanoTime()
    val result = block // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0) + "ns")
    (t1, result)
    }
    
    val count = 10000000
    val inputData = sc.parallelize(1.to(count))
    val keyed = inputData.map(x => (x % 100, x))
    val shuffled = keyed.repartition(50).cache()
    shuffled.count()
    
    val o1 = time(shuffled.distinct().count())
    val n1 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
    val n2 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
    val o2 = time(shuffled.distinct().count())
    val n3 = time(shuffled.mapPartitions(removeDuplicatesInPartition).count())
    ```
    
    And the result is:
    
    > Elapsed time: 1790932239ns                                                
      
    > Elapsed time: 381450402ns
    > Elapsed time: 340449179ns
    > Elapsed time: 1524955492ns                                                
      
    > Elapsed time: 291948041ns
    > import org.apache.spark.util.collection.ExternalAppendOnlyMap
    > removeDuplicatesInPartition: (partition: Iterator[(Int, 
Int)])Iterator[(Int, Int)]
    > time: [R](block: => R)(Long, R)
    > count: Int = 10000000
    > inputData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at 
parallelize at <console>:52
    > keyed: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at map 
at <console>:53
    > shuffled: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at 
repartition at <console>:54
    > o1: (Long, Long) = (2943493642271881,10000000)
    > n1: (Long, Long) = (2943494027399482,10000000)
    > n2: (Long, Long) = (2943494371228656,10000000)
    > o2: (Long, Long) = (2943495899580372,10000000)
    > n3: (Long, Long) = (2943496195569891,10000000)
    > 
    
    Increasing count by a factor of 10 we get:
    > Elapsed time: 21679193176ns                                               
      
    > Elapsed time: 3114223737ns                                                
      
    > Elapsed time: 3348141004ns                                                
      
    > Elapsed time: 51267597984ns                                               
      
    > Elapsed time: 3931899963ns                                                
      
    > import org.apache.spark.util.collection.ExternalAppendOnlyMap
    > removeDuplicatesInPartition: (partition: Iterator[(Int, 
Int)])Iterator[(Int, Int)]
    > time: [R](block: => R)(Long, R)
    > count: Int = 100000000
    > inputData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at 
parallelize at <console>:56
    > keyed: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[26] at map 
at <console>:57
    > shuffled: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[30] at 
repartition at <console>:58
    > o1: (Long, Long) = (2943648438919959,100000000)
    > n1: (Long, Long) = (2943651557292201,100000000)
    > n2: (Long, Long) = (2943654909392808,100000000)
    > o2: (Long, Long) = (2943706180722021,100000000)
    > n3: (Long, Long) = (2943710116461734,100000000)
    > 
    > 
    
    So that looks like close to an order of magnitude improvement.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to