darabos closed pull request #22673: [SPARK-20144] Allow reading files in order
with spark.sql.files.allowReordering=false
URL: https://github.com/apache/spark/pull/22673
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b699707d85235..101d077aa8d50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -835,6 +835,13 @@ object SQLConf {
.longConf
.createWithDefault(4 * 1024 * 1024)
+ val ALLOW_REORDERING_FILES = buildConf("spark.sql.files.allowReordering")
+ .internal()
+ .doc("If enabled, Spark is free to change the order of input files when
trying to organize" +
+ " them into even-sized partitions for performance.")
+ .booleanConf
+ .createWithDefault(true)
+
val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will
continue to run when " +
"encountering corrupted files and the contents that have been read will
still be returned.")
@@ -1639,6 +1646,8 @@ class SQLConf extends Serializable with Logging {
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
+ def allowReorderingFiles: Boolean = getConf(ALLOW_REORDERING_FILES)
+
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 738c0666bc3fd..b9dc7b055b160 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -428,6 +428,7 @@ case class FileSourceScanExec(
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism =
fsRelation.sparkSession.sparkContext.defaultParallelism
+ val allowReordering =
fsRelation.sparkSession.sessionState.conf.allowReorderingFiles
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen +
openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
@@ -453,7 +454,7 @@ case class FileSourceScanExec(
partition.values, file.getPath.toUri.toString, 0, file.getLen,
hosts))
}
}
- }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ }.toArray
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
@@ -472,8 +473,9 @@ case class FileSourceScanExec(
currentSize = 0
}
- // Assign files to partitions using "Next Fit Decreasing"
- splitFiles.foreach { file =>
+ // Assign files to partitions using "Next Fit Decreasing" or just "Next
Fit".
+ (if (allowReordering)
splitFiles.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ else splitFiles.sortBy(_.filePath)).foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f70df0bcecde7..5e951133c4e33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1368,6 +1368,24 @@ class CSVSuite extends QueryTest with SharedSQLContext
with SQLTestUtils with Te
})
}
+ test("SPARK-20144: Keep partition order if allowReordering=false.") {
+ withTempPath { path =>
+ val data = List("one", "2", "three", "four")
+ data.toDF.write.csv(path.getAbsolutePath)
+ withSQLConf("spark.sql.files.allowReordering" -> "false") {
+ // The original order. Useful for reading data that has been sorted
before writing, or where
+ // the order matters for some other reason.
+ val readback =
spark.read.csv(path.getCanonicalPath).collect.map(_.getString(0)).toList
+ assert(readback == data)
+ }
+ withSQLConf("spark.sql.files.allowReordering" -> "true") {
+ // Ordered by file size. Better performance for cases where the order
does not matter.
+ val readback =
spark.read.csv(path.getCanonicalPath).collect.map(_.getString(0)).toList
+ assert(readback == List("three", "four", "one", "2"))
+ }
+ }
+ }
+
test("SPARK-23846: usage of samplingRatio while parsing a dataset of
strings") {
val ds = sampledTestData.coalesce(1)
val readback = spark.read
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]