Github user glentakahashi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20372#discussion_r163427261
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
currentSize = 0
}
- // Assign files to partitions using "Next Fit Decreasing"
- splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
+ def addFile(file: PartitionedFile): Unit = {
+ currentFiles += file
+ currentSize += file.length + openCostInBytes
+ }
+
+ var frontIndex = 0
+ var backIndex = splitFiles.length - 1
+
+ while (frontIndex <= backIndex) {
+ while (frontIndex <= backIndex && currentSize +
splitFiles(frontIndex).length <= maxSplitBytes) {
+ addFile(splitFiles(frontIndex))
+ frontIndex += 1
+ }
+ while (backIndex > frontIndex && currentSize +
splitFiles(backIndex).length <= maxSplitBytes) {
+ addFile(splitFiles(backIndex))
+ backIndex -= 1
+ }
closePartition()
- }
- // Add the given file to the current partition.
- currentSize += file.length + openCostInBytes
--- End diff --
Oh, only if the file `isSplittable`, so yea, will fail for non-splittable
files. Will update code
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]