[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 No worries. Can you shed some more light onto the performance regressions? Are the benchmark code/results public for me to peruse? If not, could you post a high level summary? I'd love to know for our own benefit what cases/metrics you see regressions on. Regardless, I'll submit a new PR to make it configurable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 @gatorsmile can you link the ticket about the perf regression? I imagine you would be seeing perf regressions in cases where partition counts are less than total cluster capacity, as this has the chance to reduce parallelism there. Also, is there a specific suite of perf-testing benchmarks that I should be running here that are used in testing releases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: [SPARK-23249] [SQL] Improved block merging logic for par...
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 What are the remaining steps to get this merged? Just checking that I don't need to do anything else from my end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: [SPARK-23249] Improved block merging logic for partition...
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 Created https://issues.apache.org/jira/browse/SPARK-23249 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: Improved block merging logic for partitions
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 The large non-splittable files is already tested by https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346 actually --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user glentakahashi commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163427261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- Oh, only if the file `isSplittable`, so yea, will fail for non-splittable files. Will update code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user glentakahashi commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163426554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- This test still passes https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101, and the splitFiles should have been already split by `filesMaxPartitionBytes` I originally had similar functionality in, but I /think/ its redundant --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
GitHub user glentakahashi opened a pull request: https://github.com/apache/spark/pull/20372 Improved block merging logic for partitions ## What changes were proposed in this pull request? Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions. ## How was this patch tested? Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3 You can merge this pull request into a Git repository by running: $ git pull https://github.com/glentakahashi/spark feature/improved-block-merging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20372 commit c575977a5952bf50b605be8079c9be1e30f3bd36 Author: Glen Takahashi <gtakahashi@...> Date: 2018-01-23T23:22:34Z Improved block merging logic for partitions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org