darabos closed pull request #22673: [SPARK-20144] Allow reading files in order 
with spark.sql.files.allowReordering=false
URL: https://github.com/apache/spark/pull/22673
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b699707d85235..101d077aa8d50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -835,6 +835,13 @@ object SQLConf {
     .longConf
     .createWithDefault(4 * 1024 * 1024)
 
+  val ALLOW_REORDERING_FILES = buildConf("spark.sql.files.allowReordering")
+    .internal()
+    .doc("If enabled, Spark is free to change the order of input files when 
trying to organize" +
+      " them into even-sized partitions for performance.")
+    .booleanConf
+    .createWithDefault(true)
+
   val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
       "encountering corrupted files and the contents that have been read will 
still be returned.")
@@ -1639,6 +1646,8 @@ class SQLConf extends Serializable with Logging {
 
   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
 
+  def allowReorderingFiles: Boolean = getConf(ALLOW_REORDERING_FILES)
+
   def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
 
   def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 738c0666bc3fd..b9dc7b055b160 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -428,6 +428,7 @@ case class FileSourceScanExec(
       fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
     val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
     val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
+    val allowReordering = 
fsRelation.sparkSession.sessionState.conf.allowReorderingFiles
     val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
     val bytesPerCore = totalBytes / defaultParallelism
 
@@ -453,7 +454,7 @@ case class FileSourceScanExec(
             partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
         }
       }
-    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    }.toArray
 
     val partitions = new ArrayBuffer[FilePartition]
     val currentFiles = new ArrayBuffer[PartitionedFile]
@@ -472,8 +473,9 @@ case class FileSourceScanExec(
       currentSize = 0
     }
 
-    // Assign files to partitions using "Next Fit Decreasing"
-    splitFiles.foreach { file =>
+    // Assign files to partitions using "Next Fit Decreasing" or just "Next 
Fit".
+    (if (allowReordering) 
splitFiles.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+      else splitFiles.sortBy(_.filePath)).foreach { file =>
       if (currentSize + file.length > maxSplitBytes) {
         closePartition()
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f70df0bcecde7..5e951133c4e33 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1368,6 +1368,24 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     })
   }
 
+  test("SPARK-20144: Keep partition order if allowReordering=false.") {
+    withTempPath { path =>
+      val data = List("one", "2", "three", "four")
+      data.toDF.write.csv(path.getAbsolutePath)
+      withSQLConf("spark.sql.files.allowReordering" -> "false") {
+        // The original order. Useful for reading data that has been sorted 
before writing, or where
+        // the order matters for some other reason.
+        val readback = 
spark.read.csv(path.getCanonicalPath).collect.map(_.getString(0)).toList
+        assert(readback == data)
+      }
+      withSQLConf("spark.sql.files.allowReordering" -> "true") {
+        // Ordered by file size. Better performance for cases where the order 
does not matter.
+        val readback = 
spark.read.csv(path.getCanonicalPath).collect.map(_.getString(0)).toList
+        assert(readback == List("three", "four", "one", "2"))
+      }
+    }
+  }
+
   test("SPARK-23846: usage of samplingRatio while parsing a dataset of 
strings") {
     val ds = sampledTestData.coalesce(1)
     val readback = spark.read


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to