johanl-db commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1150727764
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+ val METADATA: Metadata = new MetadataBuilder()
+ .withMetadata(MetadataAttribute.METADATA)
+ .putBoolean(METADATA_COL_ATTR_KEY, value = true)
+ .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+ .build()
+
/**
- * Cleanup the internal metadata information of an attribute if it is
- * a [[FileSourceConstantMetadataAttribute]] or
[[FileSourceGeneratedMetadataAttribute]].
+ * Removes the internal field metadata.
*/
def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
- removeInternalMetadata(attr)
+ attr.withMetadata(removeInternalMetadata(attr.metadata))
Review Comment:
[FileFormatWriter](https://github.com/apache/spark/blob/b36d1484c1a090a33d9add056730128b9ba5729f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L111)
still uses it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// For generated metadata columns, they are set as nullable when passed
to readers,
// as the values will be null when trying to read the missing column
from the file.
// They are then replaced by the actual values later in the process.
- // All metadata columns will be non-null in the returned output.
- // We then change the nullability to non-nullable in the metadata
projection node below.
- val constantMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
- val generatedMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
+ // We then restore the specified nullability in the metadata projection
node below.
+ // Also remember the attribute for each column name, so we can easily
map back to it.
+ val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+ val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+ val metadataColumnsByName = mutable.Map.empty[String, Attribute]
metadataStructOpt.foreach { metadataStruct =>
- metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
field =>
- field.name match {
- case FileFormat.ROW_INDEX =>
- if ((readDataColumns ++
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
- .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
- throw new
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
- " is a reserved column name that cannot be read in
combination with " +
- s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}
column.")
- }
- generatedMetadataColumns +=
- FileSourceGeneratedMetadataAttribute(
- FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType,
nullable = true)
- case _ =>
- constantMetadataColumns +=
- FileSourceConstantMetadataAttribute(field.name, field.dataType)
- }
+ val schemaColumns = (readDataColumns ++ partitionColumns)
+ .map(_.name.toLowerCase(Locale.ROOT))
+ .toSet
+
+ def createMetadataColumn(field: StructField) = field match {
Review Comment:
I don't have a strong opinion on this, reverting to not using a helper
method. The comment a few line above should be enough to get context on what
happens here.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+ val METADATA: Metadata = new MetadataBuilder()
+ .withMetadata(MetadataAttribute.METADATA)
+ .putBoolean(METADATA_COL_ATTR_KEY, value = true)
Review Comment:
Yes, removed
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// For generated metadata columns, they are set as nullable when passed
to readers,
// as the values will be null when trying to read the missing column
from the file.
// They are then replaced by the actual values later in the process.
- // All metadata columns will be non-null in the returned output.
- // We then change the nullability to non-nullable in the metadata
projection node below.
- val constantMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
- val generatedMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
+ // We then restore the specified nullability in the metadata projection
node below.
+ // Also remember the attribute for each column name, so we can easily
map back to it.
+ val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+ val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+ val metadataColumnsByName = mutable.Map.empty[String, Attribute]
metadataStructOpt.foreach { metadataStruct =>
- metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
field =>
- field.name match {
- case FileFormat.ROW_INDEX =>
- if ((readDataColumns ++
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
- .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
- throw new
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
- " is a reserved column name that cannot be read in
combination with " +
- s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}
column.")
- }
- generatedMetadataColumns +=
- FileSourceGeneratedMetadataAttribute(
- FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType,
nullable = true)
- case _ =>
- constantMetadataColumns +=
- FileSourceConstantMetadataAttribute(field.name, field.dataType)
- }
+ val schemaColumns = (readDataColumns ++ partitionColumns)
+ .map(_.name.toLowerCase(Locale.ROOT))
+ .toSet
+
+ def createMetadataColumn(field: StructField) = field match {
+ case FileSourceGeneratedMetadataStructField(field, internalName) =>
+ if (schemaColumns.contains(internalName)) {
+ throw new AnalysisException(internalName +
+ s"${internalName} is a reserved column name that cannot be
read in combination " +
+ s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+ }
+
+ // NOTE: Readers require the internal column to be nullable
because it's not part of the
+ // file's public schema. The projection below will restore the
correct nullability for
+ // the column while constructing the final metadata struct.
+ val attr = field.copy(internalName, nullable = true).toAttribute
+ metadataColumnsByName.put(field.name, attr)
+ generatedMetadataColumns += attr
+
+ case FileSourceConstantMetadataStructField(field) =>
+ val attr = field.toAttribute
+ metadataColumnsByName.put(field.name, attr)
+ constantMetadataColumns += attr
+
+ case field => throw new AnalysisException(s"Unrecognized file
metadata field: $field")
}
- }
- val metadataColumns: Seq[Attribute] =
- constantMetadataColumns.toSeq ++ generatedMetadataColumns.toSeq
+
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)
Review Comment:
We know that `schema` is a `StructType` here because we matched on it in
`MetadataStructColumn` above but the type checker thinks this can be any
`DataType` so we have a partial match. The cast looks like the most readable
option
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]