[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22961


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232906123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,22 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes = sortingExpressions.zipWithIndex.map { 
case (ord, i) =>
+  ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
+}
+implicit val ordering: Ordering[InternalRow] =
+  new LazilyGeneratedOrdering(orderingAttributes)
--- End diff --

yea, let's follow the previous style: 
https://github.com/apache/spark/pull/22961/files#diff-3ceee31a3da1b7c7132f666126fbL223


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-12 Thread mu5358271
Github user mu5358271 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232888324
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,22 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes = sortingExpressions.zipWithIndex.map { 
case (ord, i) =>
+  ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
+}
+implicit val ordering: Ordering[InternalRow] =
+  new LazilyGeneratedOrdering(orderingAttributes)
--- End diff --

this line would actually exceed the 100 character per line limit by 2 
characters if I keep the ": Ordering[InternalRow]" type info for the implicit 
value. I can remove the type info though. Is that what you are suggesting?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232564430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,22 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes = sortingExpressions.zipWithIndex.map { 
case (ord, i) =>
+  ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
+}
+implicit val ordering: Ordering[InternalRow] =
+  new LazilyGeneratedOrdering(orderingAttributes)
--- End diff --

style nit: this can be merged to the previous line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-08 Thread mu5358271
Github user mu5358271 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232070793
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,24 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes =
--- End diff --

you are right. this is much better. changed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232061457
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,24 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes =
--- End diff --

This is a bit clunky IMO. Can we do this instead:
```scala
val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, 
i) =>
  ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-06 Thread mu5358271
GitHub user mu5358271 opened a pull request:

https://github.com/apache/spark/pull/22961

[SPARK-25947][SQL] Reduce memory usage in ShuffleExchangeExec by selecting 
only the sort columns

## What changes were proposed in this pull request?

When sorting rows, ShuffleExchangeExec uses the entire row instead of just 
the columns references in SortOrder to create the RangePartitioner. This causes 
the RangePartitioner to sample entire rows to create rangeBounds and can cause 
OOM issues on the driver when rows contain large fields.

This change creates a projection and only use columns involved in the 
SortOrder for the RangePartitioner

## How was this patch tested?

started a local spark-shell with a small spark.driver.maxResultSize:

```
spark-shell --master 'local[16]' --conf spark.driver.maxResultSize=128M 
--driver-memory 4g
```

and ran the following script:

```
import com.google.common.io.Files
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

import scala.util.Random

@transient val sc = SparkContext.getOrCreate()
@transient val spark = SparkSession.builder().getOrCreate()

import spark.implicits._

val path = Files.createTempDir().toString

// this creates a dataset with 1024 entries, each 1MB in size, across 16 
partitions
sc.parallelize(0 until (1 << 10), sc.defaultParallelism).
  map(_ => Array.fill(1 << 18)(Random.nextInt)).
  toDS.
  write.mode("overwrite").parquet(path)

spark.read.parquet(path).
  orderBy('value (0)).
  write.mode("overwrite").parquet(s"$path-sorted")

spark.read.parquet(s"$path-sorted").show
```
execution would fail when initializing RangePartitioner without this change.
execution succeeds and generates a correctly sorted dataset with this 
change.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mu5358271/spark sort-improvement

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22961.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22961


commit 61288d40475a4145561ea4be566bc63b78c25b5a
Author: shuhengd 
Date:   2018-11-06T04:23:18Z

[SPARK-25947][SQL] Reduce memory usage in ShuffleExchangeExec by selecting 
only the sort columns




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org