Github user glentakahashi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20372#discussion_r163426554
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
currentSize = 0
}
- // Assign files to partitions using "Next Fit Decreasing"
- splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
+ def addFile(file: PartitionedFile): Unit = {
+ currentFiles += file
+ currentSize += file.length + openCostInBytes
+ }
+
+ var frontIndex = 0
+ var backIndex = splitFiles.length - 1
+
+ while (frontIndex <= backIndex) {
+ while (frontIndex <= backIndex && currentSize +
splitFiles(frontIndex).length <= maxSplitBytes) {
+ addFile(splitFiles(frontIndex))
+ frontIndex += 1
+ }
+ while (backIndex > frontIndex && currentSize +
splitFiles(backIndex).length <= maxSplitBytes) {
+ addFile(splitFiles(backIndex))
+ backIndex -= 1
+ }
closePartition()
- }
- // Add the given file to the current partition.
- currentSize += file.length + openCostInBytes
--- End diff --
This test still passes
https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101,
and the splitFiles should have been already split by `filesMaxPartitionBytes`
I originally had similar functionality in, but I /think/ its redundant
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]