spark git commit: Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

2018-02-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fd66a3b7b -> a5a8a86e2


Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

This reverts commit f5f21e8c4261c0dfe8e3e788a30b38b188a18f67.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5a8a86e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5a8a86e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5a8a86e

Branch: refs/heads/branch-2.3
Commit: a5a8a86e213c34d6fb32f0ae52db24d8f1ef0905
Parents: fd66a3b
Author: gatorsmile 
Authored: Wed Feb 14 10:59:36 2018 -0800
Committer: gatorsmile 
Committed: Wed Feb 14 10:59:36 2018 -0800

--
 .../sql/execution/DataSourceScanExec.scala  | 29 ++--
 .../datasources/FileSourceStrategySuite.scala   | 15 ++
 2 files changed, 17 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7732e2..aa66ee7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -445,29 +445,16 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-def addFile(file: PartitionedFile): Unit = {
-currentFiles += file
-currentSize += file.length + openCostInBytes
-}
-
-var frontIndex = 0
-var backIndex = splitFiles.length - 1
-
-while (frontIndex <= backIndex) {
-  addFile(splitFiles(frontIndex))
-  frontIndex += 1
-  while (frontIndex <= backIndex &&
- currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
-addFile(splitFiles(frontIndex))
-frontIndex += 1
-  }
-  while (backIndex > frontIndex &&
- currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
-addFile(splitFiles(backIndex))
-backIndex -= 1
+// Assign files to partitions using "Next Fit Decreasing"
+splitFiles.foreach { file =>
+  if (currentSize + file.length > maxSplitBytes) {
+closePartition()
   }
-  closePartition()
+  // Add the given file to the current partition.
+  currentSize += file.length + openCostInBytes
+  currentFiles += file
 }
+closePartition()
 
 new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bfccc93..c1d61b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
 SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
   checkScan(table.select('c1)) { partitions =>
-// Files should be laid out [(file1, file6), (file2, file3), (file4, 
file5)]
-assert(partitions.size == 3, "when checking partitions")
-assert(partitions(0).files.size == 2, "when checking partition 1")
+// Files should be laid out [(file1), (file2, file3), (file4, file5), 
(file6)]
+assert(partitions.size == 4, "when checking partitions")
+assert(partitions(0).files.size == 1, "when checking partition 1")
 assert(partitions(1).files.size == 2, "when checking partition 2")
 assert(partitions(2).files.size == 2, "when checking partition 3")
+assert(partitions(3).files.size == 1, "when checking partition 4")
 
-// First partition reads (file1, file6)
+// First partition reads (file1)
 assert(partitions(0).files(0).start == 0)
 assert(partitions(0).files(0).length == 2)
-assert(partitions(0).files(1).start == 0)
-assert(partitions(0).files(1).length == 1)
 
 // Second partition reads (file2, file3)
 assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite 

spark git commit: Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

2018-02-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 140f87533 -> 400a1d9e2


Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

This reverts commit 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400a1d9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400a1d9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400a1d9e

Branch: refs/heads/master
Commit: 400a1d9e25c1196f0be87323bd89fb3af0660166
Parents: 140f875
Author: gatorsmile 
Authored: Wed Feb 14 10:57:12 2018 -0800
Committer: gatorsmile 
Committed: Wed Feb 14 10:57:12 2018 -0800

--
 .../sql/execution/DataSourceScanExec.scala  | 29 ++--
 .../datasources/FileSourceStrategySuite.scala   | 15 ++
 2 files changed, 17 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index ba1157d..08ff33a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -444,29 +444,16 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-def addFile(file: PartitionedFile): Unit = {
-currentFiles += file
-currentSize += file.length + openCostInBytes
-}
-
-var frontIndex = 0
-var backIndex = splitFiles.length - 1
-
-while (frontIndex <= backIndex) {
-  addFile(splitFiles(frontIndex))
-  frontIndex += 1
-  while (frontIndex <= backIndex &&
- currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
-addFile(splitFiles(frontIndex))
-frontIndex += 1
-  }
-  while (backIndex > frontIndex &&
- currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
-addFile(splitFiles(backIndex))
-backIndex -= 1
+// Assign files to partitions using "Next Fit Decreasing"
+splitFiles.foreach { file =>
+  if (currentSize + file.length > maxSplitBytes) {
+closePartition()
   }
-  closePartition()
+  // Add the given file to the current partition.
+  currentSize += file.length + openCostInBytes
+  currentFiles += file
 }
+closePartition()
 
 new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bfccc93..c1d61b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
 SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
   checkScan(table.select('c1)) { partitions =>
-// Files should be laid out [(file1, file6), (file2, file3), (file4, 
file5)]
-assert(partitions.size == 3, "when checking partitions")
-assert(partitions(0).files.size == 2, "when checking partition 1")
+// Files should be laid out [(file1), (file2, file3), (file4, file5), 
(file6)]
+assert(partitions.size == 4, "when checking partitions")
+assert(partitions(0).files.size == 1, "when checking partition 1")
 assert(partitions(1).files.size == 2, "when checking partition 2")
 assert(partitions(2).files.size == 2, "when checking partition 3")
+assert(partitions(3).files.size == 1, "when checking partition 4")
 
-// First partition reads (file1, file6)
+// First partition reads (file1)
 assert(partitions(0).files(0).start == 0)
 assert(partitions(0).files(0).length == 2)
-assert(partitions(0).files(1).start == 0)
-assert(partitions(0).files(1).length == 1)
 
 // Second partition reads (file2, file3)
 assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite extends