spark git commit: Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"
Repository: spark Updated Branches: refs/heads/branch-2.3 fd66a3b7b -> a5a8a86e2 Revert "[SPARK-23249][SQL] Improved block merging logic for partitions" This reverts commit f5f21e8c4261c0dfe8e3e788a30b38b188a18f67. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5a8a86e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5a8a86e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5a8a86e Branch: refs/heads/branch-2.3 Commit: a5a8a86e213c34d6fb32f0ae52db24d8f1ef0905 Parents: fd66a3b Author: gatorsmileAuthored: Wed Feb 14 10:59:36 2018 -0800 Committer: gatorsmile Committed: Wed Feb 14 10:59:36 2018 -0800 -- .../sql/execution/DataSourceScanExec.scala | 29 ++-- .../datasources/FileSourceStrategySuite.scala | 15 ++ 2 files changed, 17 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f7732e2..aa66ee7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -445,29 +445,16 @@ case class FileSourceScanExec( currentSize = 0 } -def addFile(file: PartitionedFile): Unit = { -currentFiles += file -currentSize += file.length + openCostInBytes -} - -var frontIndex = 0 -var backIndex = splitFiles.length - 1 - -while (frontIndex <= backIndex) { - addFile(splitFiles(frontIndex)) - frontIndex += 1 - while (frontIndex <= backIndex && - currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { -addFile(splitFiles(frontIndex)) -frontIndex += 1 - } - while (backIndex > frontIndex && - currentSize + splitFiles(backIndex).length <= maxSplitBytes) { -addFile(splitFiles(backIndex)) -backIndex -= 1 +// Assign files to partitions using "Next Fit Decreasing" +splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { +closePartition() } - closePartition() + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file } +closePartition() new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index bfccc93..c1d61b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4", SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => -// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)] -assert(partitions.size == 3, "when checking partitions") -assert(partitions(0).files.size == 2, "when checking partition 1") +// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] +assert(partitions.size == 4, "when checking partitions") +assert(partitions(0).files.size == 1, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") assert(partitions(2).files.size == 2, "when checking partition 3") +assert(partitions(3).files.size == 1, "when checking partition 4") -// First partition reads (file1, file6) +// First partition reads (file1) assert(partitions(0).files(0).start == 0) assert(partitions(0).files(0).length == 2) -assert(partitions(0).files(1).start == 0) -assert(partitions(0).files(1).length == 1) // Second partition reads (file2, file3) assert(partitions(1).files(0).start == 0) @@ -164,6 +163,10 @@ class FileSourceStrategySuite
spark git commit: Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"
Repository: spark Updated Branches: refs/heads/master 140f87533 -> 400a1d9e2 Revert "[SPARK-23249][SQL] Improved block merging logic for partitions" This reverts commit 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400a1d9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400a1d9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400a1d9e Branch: refs/heads/master Commit: 400a1d9e25c1196f0be87323bd89fb3af0660166 Parents: 140f875 Author: gatorsmileAuthored: Wed Feb 14 10:57:12 2018 -0800 Committer: gatorsmile Committed: Wed Feb 14 10:57:12 2018 -0800 -- .../sql/execution/DataSourceScanExec.scala | 29 ++-- .../datasources/FileSourceStrategySuite.scala | 15 ++ 2 files changed, 17 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ba1157d..08ff33a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -444,29 +444,16 @@ case class FileSourceScanExec( currentSize = 0 } -def addFile(file: PartitionedFile): Unit = { -currentFiles += file -currentSize += file.length + openCostInBytes -} - -var frontIndex = 0 -var backIndex = splitFiles.length - 1 - -while (frontIndex <= backIndex) { - addFile(splitFiles(frontIndex)) - frontIndex += 1 - while (frontIndex <= backIndex && - currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { -addFile(splitFiles(frontIndex)) -frontIndex += 1 - } - while (backIndex > frontIndex && - currentSize + splitFiles(backIndex).length <= maxSplitBytes) { -addFile(splitFiles(backIndex)) -backIndex -= 1 +// Assign files to partitions using "Next Fit Decreasing" +splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { +closePartition() } - closePartition() + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file } +closePartition() new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index bfccc93..c1d61b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4", SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => -// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)] -assert(partitions.size == 3, "when checking partitions") -assert(partitions(0).files.size == 2, "when checking partition 1") +// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] +assert(partitions.size == 4, "when checking partitions") +assert(partitions(0).files.size == 1, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") assert(partitions(2).files.size == 2, "when checking partition 3") +assert(partitions(3).files.size == 1, "when checking partition 4") -// First partition reads (file1, file6) +// First partition reads (file1) assert(partitions(0).files(0).start == 0) assert(partitions(0).files(0).length == 2) -assert(partitions(0).files(1).start == 0) -assert(partitions(0).files(1).length == 1) // Second partition reads (file2, file3) assert(partitions(1).files(0).start == 0) @@ -164,6 +163,10 @@ class FileSourceStrategySuite extends