ryan-johnson-databricks commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1150634235
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// For generated metadata columns, they are set as nullable when passed
to readers,
// as the values will be null when trying to read the missing column
from the file.
// They are then replaced by the actual values later in the process.
- // All metadata columns will be non-null in the returned output.
- // We then change the nullability to non-nullable in the metadata
projection node below.
- val constantMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
- val generatedMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
+ // We then restore the specified nullability in the metadata projection
node below.
+ // Also remember the attribute for each column name, so we can easily
map back to it.
+ val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+ val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+ val metadataColumnsByName = mutable.Map.empty[String, Attribute]
metadataStructOpt.foreach { metadataStruct =>
- metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
field =>
- field.name match {
- case FileFormat.ROW_INDEX =>
- if ((readDataColumns ++
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
- .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
- throw new
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
- " is a reserved column name that cannot be read in
combination with " +
- s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}
column.")
- }
- generatedMetadataColumns +=
- FileSourceGeneratedMetadataAttribute(
- FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType,
nullable = true)
- case _ =>
- constantMetadataColumns +=
- FileSourceConstantMetadataAttribute(field.name, field.dataType)
- }
+ val schemaColumns = (readDataColumns ++ partitionColumns)
+ .map(_.name.toLowerCase(Locale.ROOT))
+ .toSet
+
+ def createMetadataColumn(field: StructField) = field match {
Review Comment:
I'm not convinced that introducing this single-use helper improves
readability or maintainability?
```scala
def helper(field: StructField) = field match {
case ...
case ...
}
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)
```
vs. just inlining the logic:
```scala
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
case ...
case ...
}
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// For generated metadata columns, they are set as nullable when passed
to readers,
// as the values will be null when trying to read the missing column
from the file.
// They are then replaced by the actual values later in the process.
- // All metadata columns will be non-null in the returned output.
- // We then change the nullability to non-nullable in the metadata
projection node below.
- val constantMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
- val generatedMetadataColumns: mutable.Buffer[Attribute] =
mutable.Buffer.empty
+ // We then restore the specified nullability in the metadata projection
node below.
+ // Also remember the attribute for each column name, so we can easily
map back to it.
+ val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+ val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+ val metadataColumnsByName = mutable.Map.empty[String, Attribute]
metadataStructOpt.foreach { metadataStruct =>
- metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
field =>
- field.name match {
- case FileFormat.ROW_INDEX =>
- if ((readDataColumns ++
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
- .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
- throw new
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
- " is a reserved column name that cannot be read in
combination with " +
- s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}
column.")
- }
- generatedMetadataColumns +=
- FileSourceGeneratedMetadataAttribute(
- FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType,
nullable = true)
- case _ =>
- constantMetadataColumns +=
- FileSourceConstantMetadataAttribute(field.name, field.dataType)
- }
+ val schemaColumns = (readDataColumns ++ partitionColumns)
+ .map(_.name.toLowerCase(Locale.ROOT))
+ .toSet
+
+ def createMetadataColumn(field: StructField) = field match {
+ case FileSourceGeneratedMetadataStructField(field, internalName) =>
+ if (schemaColumns.contains(internalName)) {
+ throw new AnalysisException(internalName +
+ s"${internalName} is a reserved column name that cannot be
read in combination " +
+ s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+ }
+
+ // NOTE: Readers require the internal column to be nullable
because it's not part of the
+ // file's public schema. The projection below will restore the
correct nullability for
+ // the column while constructing the final metadata struct.
+ val attr = field.copy(internalName, nullable = true).toAttribute
+ metadataColumnsByName.put(field.name, attr)
+ generatedMetadataColumns += attr
+
+ case FileSourceConstantMetadataStructField(field) =>
+ val attr = field.toAttribute
+ metadataColumnsByName.put(field.name, attr)
+ constantMetadataColumns += attr
+
+ case field => throw new AnalysisException(s"Unrecognized file
metadata field: $field")
}
- }
- val metadataColumns: Seq[Attribute] =
- constantMetadataColumns.toSeq ++ generatedMetadataColumns.toSeq
+
metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)
Review Comment:
Rather than cast the struct here, would it make sense to directly match on
it?
```scala
metadataStructOpt.foreach { case AttributeReference(_, schema: StructType,
_, _) =>
...
schema.fields.foreach(...)
}
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+ val METADATA: Metadata = new MetadataBuilder()
+ .withMetadata(MetadataAttribute.METADATA)
+ .putBoolean(METADATA_COL_ATTR_KEY, value = true)
Review Comment:
Isn't this already part of `MetadataAttribute.METADATA`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -282,29 +302,21 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// all references will be bound to output attributes which are either
// [[FileSourceConstantMetadataAttribute]] or
[[FileSourceGeneratedMetadataAttribute]] after
// the flattening from the metadata struct.
- def rebindFileSourceMetadataAttributesInFilters(
- filters: Seq[Expression]): Seq[Expression] = {
- // The row index field attribute got renamed.
- def newFieldName(name: String) = name match {
- case FileFormat.ROW_INDEX =>
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
- case other => other
- }
-
+ def rebindFileSourceMetadataAttributesInFilters(filters:
Seq[Expression]): Seq[Expression] =
filters.map { filter =>
filter.transform {
// Replace references to the _metadata column. This will affect
references to the column
// itself but also where fields from the metadata struct are used.
case MetadataStructColumn(AttributeReference(_, fields @
StructType(_), _, _)) =>
Review Comment:
aside: `fields @ StructType(_)` is equialent to `fields: StructType`, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]