mengxr commented on a change in pull request #24387: [SPARK-27473][SQL][WIP]
Support filter push down for status fields in binary file data source
URL: https://github.com/apache/spark/pull/24387#discussion_r276433539
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
##########
@@ -104,35 +171,43 @@ class BinaryFileFormat extends FileFormat with
DataSourceRegister {
val fileStatus = fs.getFileStatus(fsPath)
val length = fileStatus.getLen()
val modificationTime = fileStatus.getModificationTime()
- val stream = fs.open(fsPath)
-
- val content = try {
- ByteStreams.toByteArray(stream)
- } finally {
- Closeables.close(stream, true)
- }
-
- val fullOutput = dataSchema.map { f =>
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
- }
- val requiredOutput = fullOutput.filter { a =>
- requiredSchema.fieldNames.contains(a.name)
- }
- // TODO: Add column pruning
- // currently it still read the file content even if content column is
not required.
- val requiredColumns =
GenerateUnsafeProjection.generate(requiredOutput, fullOutput)
-
- val internalRow = InternalRow(
- content,
- InternalRow(
+ if (
Review comment:
Then here becomes one line `filterFuncs.forall(f => f(fileStatus))`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]