[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22010 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220674969 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, Partitioner, TaskContext} --- End diff -- Thanks! I'll fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220674846 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, + Utils => collectionUtils} --- End diff -- yeah but we generally break anyways based on the rest of the code base. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220674552 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partitioner preserves partitioning") { +val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() +val initialPartitioner = rdd.partitioner +val distinctRdd = rdd.distinct() +val resultingPartitioner = distinctRdd.partitioner +assert(initialPartitioner === resultingPartitioner) +val distinctRddDifferent = rdd.distinct(5) +val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner +assert(initialPartitioner != distinctRddDifferentPartitioner) +assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) --- End diff -- We could, but we don't need to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r22067 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( +createCombiner = value => null, +mergeValue = (a, b) => a, +mergeCombiners = (a, b) => a) --- End diff -- scratch that - does not matter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220672896 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partitioner preserves partitioning") { +val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() +val initialPartitioner = rdd.partitioner +val distinctRdd = rdd.distinct() +val resultingPartitioner = distinctRdd.partitioner +assert(initialPartitioner === resultingPartitioner) +val distinctRddDifferent = rdd.distinct(5) +val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner +assert(initialPartitioner != distinctRddDifferentPartitioner) +assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) --- End diff -- We could also check if the number of stages is what we expect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220672579 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( +createCombiner = value => null, +mergeValue = (a, b) => a, +mergeCombiners = (a, b) => a) --- End diff -- nit: clean them ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220399074 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, Partitioner, TaskContext} --- End diff -- nit: unnecessary change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220399123 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, + Utils => collectionUtils} --- End diff -- nit: AFAIK we don't have length limit for import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218946996 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- yes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218946895 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Yes, would use right partitioner in this case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218918701 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- So I _think_ it is partitioner of input RDD if known partitioner otherwise hash partitioner of the default parallelism. Yes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218917483 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- I mean yes we can sub-class just as easily -- is that what you mean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217901185 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Ah yes, no partitioner specified => use parent's partitioner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217901179 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- Since we are already creating a `MapPartitionsRDD` in distinct, overriding `partitioner` should be trivial. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217900574 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- `MapPartitionsRDD` is already private. But yes the other option is sub-classing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217900563 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- No reduceByKey on a known partioner is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217876215 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Dont you need to specify the partitioner here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217876184 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- We dont need to modify public api to add support for this. Create a subclass of MapPartitionsRDD which has partitioner method overridden to specify what you need. Did I miss something here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r216170355 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { --- End diff -- No we can't. Since the original partitioner function takes in the key of the RDD and they key is now changing we can't gaurantee that the previous partitioner preserves partioning when we do (`x => (x, null)`). In fact with the HashBased partitioner this is not the case (if you want I explored this in my live stream - https://youtu.be/NDGM501uUrE?t=19m17s and you can see the Challenger with that approach.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r216145892 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { --- End diff -- you can just create a new MapPartitionsRDD with preservesPartitioning set to true, can't you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r214575455 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- So to reuse `reduceByKey` I'd write a custom partitioner which uses the existing partioner as it's base but takes in the combined key type as input and drop it down to the original key. Sound right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r214103667 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- This is a bad implementation and could OOM. You should reuse reduceByKey. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r214103223 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- This is a bad implementation and could OOM. You should reuse reduceByKey. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r214057591 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- yes, the hash should be computed anyway, this is just a nit, I am not sure if the perf gain would be even noticeable, that is why I already gave my LGTM, despite this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r214046905 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- But we need to compute the hash of the key anyways to check if it exists. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209450199 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- yes, it is not a big deal, but if you check the implementation in the scala lib you can see that the hash and the index for the key is computed despite it not needed (since `addElem` is called anyway). Probably it doesn't change much, but we could save this computation... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209440037 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- So according to HashSet can only contain one instance for each element so we don't need to worry about adding multiple copies of the elements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209351478 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partitioned with a known partitioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) --- End diff -- not a big deal, but despite this is really compact and elegant, it adds to the set also the elements which are already there and it is not needed. We can probably check if the key is there and add it only in that case, probably it is a bit faster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209340344 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partioner does not cause shuffle") { --- End diff -- Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209340321 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partioned with a known partioner we can work locally. --- End diff -- Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209338809 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partioned with a known partioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() ++= itr --- End diff -- I like this suggestion, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209230417 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partioner does not cause shuffle") { --- End diff -- nite: partioner -> partitioner --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r209230438 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partioned with a known partioner we can work locally. --- End diff -- nit: partioned -> partitioned, partioner -> partitioner --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r208180371 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +// If the data is already approriately partioned with a known partioner we can work locally. +def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() ++= itr --- End diff -- I think here we could return a new iterator which wraps `itr` and uses this set as a state in order to filter out the records we have already met. In this way we could have only one pass over the data, instead of the 2 of the current solution. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org