johanl-db commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1150727764


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
+  val METADATA: Metadata = new MetadataBuilder()
+    .withMetadata(MetadataAttribute.METADATA)
+    .putBoolean(METADATA_COL_ATTR_KEY, value = true)
+    .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+    .build()
+
   /**
-   * Cleanup the internal metadata information of an attribute if it is
-   * a [[FileSourceConstantMetadataAttribute]] or 
[[FileSourceGeneratedMetadataAttribute]].
+   * Removes the internal field metadata.
    */
   def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
-    removeInternalMetadata(attr)
+    attr.withMetadata(removeInternalMetadata(attr.metadata))

Review Comment:
   
[FileFormatWriter](https://github.com/apache/spark/blob/b36d1484c1a090a33d9add056730128b9ba5729f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L111)
 still uses it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, 
nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {

Review Comment:
   I don't have a strong opinion on this, reverting to not using a helper 
method. The comment a few line above should be enough to get context on what 
happens here.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
+  val METADATA: Metadata = new MetadataBuilder()
+    .withMetadata(MetadataAttribute.METADATA)
+    .putBoolean(METADATA_COL_ATTR_KEY, value = true)

Review Comment:
   Yes, removed



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, 
nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {
+          case FileSourceGeneratedMetadataStructField(field, internalName) =>
+            if (schemaColumns.contains(internalName)) {
+              throw new AnalysisException(internalName +
+                s"${internalName} is a reserved column name that cannot be 
read in combination " +
+                s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+            }
+
+            // NOTE: Readers require the internal column to be nullable 
because it's not part of the
+            // file's public schema. The projection below will restore the 
correct nullability for
+            // the column while constructing the final metadata struct.
+            val attr = field.copy(internalName, nullable = true).toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            generatedMetadataColumns += attr
+
+          case FileSourceConstantMetadataStructField(field) =>
+            val attr = field.toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            constantMetadataColumns += attr
+
+          case field => throw new AnalysisException(s"Unrecognized file 
metadata field: $field")
         }
-      }
 
-      val metadataColumns: Seq[Attribute] =
-        constantMetadataColumns.toSeq ++ generatedMetadataColumns.toSeq
+        
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)

Review Comment:
   We know that `schema` is a `StructType` here because we matched on it in 
`MetadataStructColumn` above but the type checker thinks this can be any 
`DataType` so we have a partial match. The cast looks like the most readable 
option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to