ryan-johnson-databricks commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1150634235


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, 
nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {

Review Comment:
   I'm not convinced that introducing this single-use helper improves 
readability or maintainability?
   ```scala
   def helper(field: StructField) = field match {
     case ...
     case ...
   }
   
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)
   ```
   vs. just inlining the logic:
   ```scala
   metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
     case ...
     case ...
   }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, 
nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {
+          case FileSourceGeneratedMetadataStructField(field, internalName) =>
+            if (schemaColumns.contains(internalName)) {
+              throw new AnalysisException(internalName +
+                s"${internalName} is a reserved column name that cannot be 
read in combination " +
+                s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+            }
+
+            // NOTE: Readers require the internal column to be nullable 
because it's not part of the
+            // file's public schema. The projection below will restore the 
correct nullability for
+            // the column while constructing the final metadata struct.
+            val attr = field.copy(internalName, nullable = true).toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            generatedMetadataColumns += attr
+
+          case FileSourceConstantMetadataStructField(field) =>
+            val attr = field.toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            constantMetadataColumns += attr
+
+          case field => throw new AnalysisException(s"Unrecognized file 
metadata field: $field")
         }
-      }
 
-      val metadataColumns: Seq[Attribute] =
-        constantMetadataColumns.toSeq ++ generatedMetadataColumns.toSeq
+        
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)

Review Comment:
   Rather than cast the struct here, would it make sense to directly match on 
it?
   ```scala
   metadataStructOpt.foreach { case AttributeReference(_, schema: StructType, 
_, _) =>
       ...
     schema.fields.foreach(...)
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
+  val METADATA: Metadata = new MetadataBuilder()
+    .withMetadata(MetadataAttribute.METADATA)
+    .putBoolean(METADATA_COL_ATTR_KEY, value = true)

Review Comment:
   Isn't this already part of `MetadataAttribute.METADATA`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -282,29 +302,21 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // all references will be bound to output attributes which are either
       // [[FileSourceConstantMetadataAttribute]] or 
[[FileSourceGeneratedMetadataAttribute]] after
       // the flattening from the metadata struct.
-      def rebindFileSourceMetadataAttributesInFilters(
-          filters: Seq[Expression]): Seq[Expression] = {
-        // The row index field attribute got renamed.
-        def newFieldName(name: String) = name match {
-          case FileFormat.ROW_INDEX => 
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
-          case other => other
-        }
-
+      def rebindFileSourceMetadataAttributesInFilters(filters: 
Seq[Expression]): Seq[Expression] =
         filters.map { filter =>
           filter.transform {
             // Replace references to the _metadata column. This will affect 
references to the column
             // itself but also where fields from the metadata struct are used.
             case MetadataStructColumn(AttributeReference(_, fields @ 
StructType(_), _, _)) =>

Review Comment:
   aside: `fields @ StructType(_)` is equialent to `fields: StructType`, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to