Yaohua628 commented on code in PR #37228:
URL: https://github.com/apache/spark/pull/37228#discussion_r928356617


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
         }.toSeq
       }.getOrElse(Seq.empty)
 
+      val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+        metadataColumns.map(_.name).flatMap {
+          case FileFormat.ROW_INDEX =>
+            
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())

Review Comment:
   a qq: what if the user schema contains a column `_tmp_metadata_row_index`? 
would it be overridden?
   
   maybe you can create an `AttributeReference` with a `__metadata_col` in its 
metadata field, see the usage of `__metadata_col`: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala#L188
 and pattern match on it afterward (just be safe)?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala:
##########
@@ -78,7 +78,7 @@ case class LogicalRelation(
       // filter out the metadata struct column if it has the name conflicting 
with output columns.
       // if the file has a column "_metadata",
       // then the data column should be returned not the metadata struct column
-      Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+      Seq(relation.fileFormat.createFileMetadataCol).filterNot(isOutputColumn)

Review Comment:
   nit: also we could do: 
`FileFormat.createFileMetadataCol(relation.fileFormat)`. 
   If we put everything about creating metadata in `FileFormat`, maybe it is 
easier to see all supported metadata for every different file format, WDYT?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
         }.toSeq
       }.getOrElse(Seq.empty)
 
+      val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+        metadataColumns.map(_.name).flatMap {

Review Comment:
   maybe casing is not an issue here, but could you add tests around different 
row_index cases (ROW_INDEX, row_INdeX)? thanks!



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,18 +188,25 @@ object FileFormat {
 
   val FILE_MODIFICATION_TIME = "file_modification_time"
 
+  val ROW_INDEX = "row_index"
+
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
   val METADATA_NAME = "_metadata"
 
-  // supported metadata struct fields for hadoop fs relation
-  val METADATA_STRUCT: StructType = new StructType()
-    .add(StructField(FILE_PATH, StringType))
-    .add(StructField(FILE_NAME, StringType))
-    .add(StructField(FILE_SIZE, LongType))
-    .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+  /** Schema of metadata struct that can be produced by every file format. */
+  def getBaseFileMetadataCol: StructType = new StructType()

Review Comment:
   Do we need/have to change this to `def`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,18 +188,25 @@ object FileFormat {
 
   val FILE_MODIFICATION_TIME = "file_modification_time"
 
+  val ROW_INDEX = "row_index"
+
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
   val METADATA_NAME = "_metadata"
 
-  // supported metadata struct fields for hadoop fs relation
-  val METADATA_STRUCT: StructType = new StructType()
-    .add(StructField(FILE_PATH, StringType))
-    .add(StructField(FILE_NAME, StringType))
-    .add(StructField(FILE_SIZE, LongType))
-    .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+  /** Schema of metadata struct that can be produced by every file format. */
+  def getBaseFileMetadataCol: StructType = new StructType()
+    .add(StructField(FileFormat.FILE_PATH, StringType))
+    .add(StructField(FileFormat.FILE_NAME, StringType))
+    .add(StructField(FileFormat.FILE_SIZE, LongType))
+    .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType))
 
-  // create a file metadata struct col
-  def createFileMetadataCol: AttributeReference =
-    FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)
+  /** Create a file metadata struct column containing fields supported by 
every format. */
+  def createBaseFileMetadataCol: AttributeReference = {

Review Comment:
   Another option: we can take the `fileFormat` when creating 
`FileMetadataCol`: `def createFileMetadataCol(fileFormat: FileFormat):...`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -223,8 +216,25 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
         }.toSeq
       }.getOrElse(Seq.empty)
 
+      val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+        metadataColumns.map(_.name).flatMap {
+          case FileFormat.ROW_INDEX =>
+            
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
+          case _ => None
+        }
+
+      val fileConstantMetadataColumns: Seq[Attribute] =
+        metadataColumns.filter(_.name != FileFormat.ROW_INDEX)
+
+      val readDataColumns = dataColumns
+          .filter(requiredAttributes.contains)
+          .filterNot(partitionColumns.contains)
+
+      val outputSchema = (readDataColumns ++ 
fileFormatReaderGeneratedMetadataColumns).toStructType
+
       // outputAttributes should also include the metadata columns at the very 
end
-      val outputAttributes = readDataColumns ++ partitionColumns ++ 
metadataColumns
+      val outputAttributes = readDataColumns ++ 
fileFormatReaderGeneratedMetadataColumns ++

Review Comment:
   nit: could you add/update the doc, why we need to put 
`fileFormatReaderGeneratedMetadataColumns` before `partitionColumns`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to