Yaohua628 commented on code in PR #37228:
URL: https://github.com/apache/spark/pull/37228#discussion_r928356617
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
}.toSeq
}.getOrElse(Seq.empty)
+ val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+ metadataColumns.map(_.name).flatMap {
+ case FileFormat.ROW_INDEX =>
+
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
Review Comment:
a qq: what if the user schema contains a column `_tmp_metadata_row_index`?
would it be overridden?
maybe you can create an `AttributeReference` with a `__metadata_col` in its
metadata field, see the usage of `__metadata_col`:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala#L188
and pattern match on it afterward (just be safe)?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala:
##########
@@ -78,7 +78,7 @@ case class LogicalRelation(
// filter out the metadata struct column if it has the name conflicting
with output columns.
// if the file has a column "_metadata",
// then the data column should be returned not the metadata struct column
- Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+ Seq(relation.fileFormat.createFileMetadataCol).filterNot(isOutputColumn)
Review Comment:
nit: also we could do:
`FileFormat.createFileMetadataCol(relation.fileFormat)`.
If we put everything about creating metadata in `FileFormat`, maybe it is
easier to see all supported metadata for every different file format, WDYT?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
}.toSeq
}.getOrElse(Seq.empty)
+ val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+ metadataColumns.map(_.name).flatMap {
Review Comment:
maybe casing is not an issue here, but could you add tests around different
row_index cases (ROW_INDEX, row_INdeX)? thanks!
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,18 +188,25 @@ object FileFormat {
val FILE_MODIFICATION_TIME = "file_modification_time"
+ val ROW_INDEX = "row_index"
+
+ // A name for a temporary column that holds row indexes computed by the file
format reader
+ // until they can be placed in the _metadata struct.
+ val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
val METADATA_NAME = "_metadata"
- // supported metadata struct fields for hadoop fs relation
- val METADATA_STRUCT: StructType = new StructType()
- .add(StructField(FILE_PATH, StringType))
- .add(StructField(FILE_NAME, StringType))
- .add(StructField(FILE_SIZE, LongType))
- .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+ /** Schema of metadata struct that can be produced by every file format. */
+ def getBaseFileMetadataCol: StructType = new StructType()
Review Comment:
Do we need/have to change this to `def`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,18 +188,25 @@ object FileFormat {
val FILE_MODIFICATION_TIME = "file_modification_time"
+ val ROW_INDEX = "row_index"
+
+ // A name for a temporary column that holds row indexes computed by the file
format reader
+ // until they can be placed in the _metadata struct.
+ val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
val METADATA_NAME = "_metadata"
- // supported metadata struct fields for hadoop fs relation
- val METADATA_STRUCT: StructType = new StructType()
- .add(StructField(FILE_PATH, StringType))
- .add(StructField(FILE_NAME, StringType))
- .add(StructField(FILE_SIZE, LongType))
- .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+ /** Schema of metadata struct that can be produced by every file format. */
+ def getBaseFileMetadataCol: StructType = new StructType()
+ .add(StructField(FileFormat.FILE_PATH, StringType))
+ .add(StructField(FileFormat.FILE_NAME, StringType))
+ .add(StructField(FileFormat.FILE_SIZE, LongType))
+ .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType))
- // create a file metadata struct col
- def createFileMetadataCol: AttributeReference =
- FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)
+ /** Create a file metadata struct column containing fields supported by
every format. */
+ def createBaseFileMetadataCol: AttributeReference = {
Review Comment:
Another option: we can take the `fileFormat` when creating
`FileMetadataCol`: `def createFileMetadataCol(fileFormat: FileFormat):...`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
}.toSeq
}.getOrElse(Seq.empty)
+ val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+ metadataColumns.map(_.name).flatMap {
+ case FileFormat.ROW_INDEX =>
+
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
+ case _ => None
+ }
+
+ val fileConstantMetadataColumns: Seq[Attribute] =
+ metadataColumns.filter(_.name != FileFormat.ROW_INDEX)
+
+ val readDataColumns = dataColumns
+ .filter(requiredAttributes.contains)
+ .filterNot(partitionColumns.contains)
+
+ val outputSchema = (readDataColumns ++
fileFormatReaderGeneratedMetadataColumns).toStructType
+
// outputAttributes should also include the metadata columns at the very
end
- val outputAttributes = readDataColumns ++ partitionColumns ++
metadataColumns
+ val outputAttributes = readDataColumns ++
fileFormatReaderGeneratedMetadataColumns ++
Review Comment:
nit: could you add/update the doc, why we need to put
`fileFormatReaderGeneratedMetadataColumns` before `partitionColumns`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]