[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...

2018-02-14 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
No worries. Can you shed some more light onto the performance regressions? 
Are the benchmark code/results public for me to peruse? If not, could you post 
a high level summary? I'd love to know for our own benefit what cases/metrics 
you see regressions on.

Regardless, I'll submit a new PR to make it configurable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...

2018-02-14 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
@gatorsmile can you link the ticket about the perf regression? I imagine 
you would be seeing perf regressions in cases where partition counts are less 
than total cluster capacity, as this has the chance to reduce parallelism there.

Also, is there a specific suite of perf-testing benchmarks that I should be 
running here that are used in testing releases? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...

2018-01-31 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
What are the remaining steps to get this merged? Just checking that I don't 
need to do anything else from my end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20372: [SPARK-23249] Improved block merging logic for partition...

2018-01-27 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
Created https://issues.apache.org/jira/browse/SPARK-23249


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20372: Improved block merging logic for partitions

2018-01-24 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
The large non-splittable files is already tested by 
https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346
 actually


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
Github user glentakahashi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163427261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

Oh, only if the file `isSplittable`, so yea, will fail for non-splittable 
files. Will update code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
Github user glentakahashi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163426554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

This test still passes 
https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101,
 and the splitFiles should have been already split by `filesMaxPartitionBytes`

I originally had similar functionality in, but I /think/ its redundant


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
GitHub user glentakahashi opened a pull request:

https://github.com/apache/spark/pull/20372

Improved block merging logic for partitions

## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into 
partitions, also checks the end of the sorted list of splits to more 
efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions 
to drop from 4 -> 3


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/glentakahashi/spark 
feature/improved-block-merging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20372


commit c575977a5952bf50b605be8079c9be1e30f3bd36
Author: Glen Takahashi <gtakahashi@...>
Date:   2018-01-23T23:22:34Z

Improved block merging logic for partitions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org