[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22961 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232906123 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,22 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) +} +implicit val ordering: Ordering[InternalRow] = + new LazilyGeneratedOrdering(orderingAttributes) --- End diff -- yea, let's follow the previous style: https://github.com/apache/spark/pull/22961/files#diff-3ceee31a3da1b7c7132f666126fbL223 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user mu5358271 commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232888324 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,22 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) +} +implicit val ordering: Ordering[InternalRow] = + new LazilyGeneratedOrdering(orderingAttributes) --- End diff -- this line would actually exceed the 100 character per line limit by 2 characters if I keep the ": Ordering[InternalRow]" type info for the implicit value. I can remove the type info though. Is that what you are suggesting? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232564430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,22 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) +} +implicit val ordering: Ordering[InternalRow] = + new LazilyGeneratedOrdering(orderingAttributes) --- End diff -- style nit: this can be merged to the previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user mu5358271 commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232070793 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,24 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = --- End diff -- you are right. this is much better. changed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232061457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,24 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = --- End diff -- This is a bit clunky IMO. Can we do this instead: ```scala val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
GitHub user mu5358271 opened a pull request: https://github.com/apache/spark/pull/22961 [SPARK-25947][SQL] Reduce memory usage in ShuffleExchangeExec by selecting only the sort columns ## What changes were proposed in this pull request? When sorting rows, ShuffleExchangeExec uses the entire row instead of just the columns references in SortOrder to create the RangePartitioner. This causes the RangePartitioner to sample entire rows to create rangeBounds and can cause OOM issues on the driver when rows contain large fields. This change creates a projection and only use columns involved in the SortOrder for the RangePartitioner ## How was this patch tested? started a local spark-shell with a small spark.driver.maxResultSize: ``` spark-shell --master 'local[16]' --conf spark.driver.maxResultSize=128M --driver-memory 4g ``` and ran the following script: ``` import com.google.common.io.Files import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import scala.util.Random @transient val sc = SparkContext.getOrCreate() @transient val spark = SparkSession.builder().getOrCreate() import spark.implicits._ val path = Files.createTempDir().toString // this creates a dataset with 1024 entries, each 1MB in size, across 16 partitions sc.parallelize(0 until (1 << 10), sc.defaultParallelism). map(_ => Array.fill(1 << 18)(Random.nextInt)). toDS. write.mode("overwrite").parquet(path) spark.read.parquet(path). orderBy('value (0)). write.mode("overwrite").parquet(s"$path-sorted") spark.read.parquet(s"$path-sorted").show ``` execution would fail when initializing RangePartitioner without this change. execution succeeds and generates a correctly sorted dataset with this change. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mu5358271/spark sort-improvement Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22961.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22961 commit 61288d40475a4145561ea4be566bc63b78c25b5a Author: shuhengd Date: 2018-11-06T04:23:18Z [SPARK-25947][SQL] Reduce memory usage in ShuffleExchangeExec by selecting only the sort columns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org