mihailotim-db commented on code in PR #53508:
URL: https://github.com/apache/spark/pull/53508#discussion_r2633818190
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -695,19 +695,43 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ // Only trigger redo on comment changes if preserve flag is
disabled.
+
(!session.sessionState.conf.viewSchemaEvolutionPreserveUserComments &&
+ field.getComment() != planField.getComment()))))
}
+ lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
+
if (redo) {
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
val newFields = viewQuery.schema.map {
case StructField(name, dataType, nullable, _) =>
StructField(name, dataType, nullable,
- viewFields.find(_.name == name).get.metadata)
+ viewFieldsByName(name).metadata)
+ }
+ StructType(newFields)
+ } else if
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
Review Comment:
If you make `ViewSyncSchemaToMetaStore` extend `SQLConfHelper`, then you can
just use `conf.viewSchemaEvolutionPreserveUserComment`, here and elsewhere
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -695,19 +695,43 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ // Only trigger redo on comment changes if preserve flag is
disabled.
+
(!session.sessionState.conf.viewSchemaEvolutionPreserveUserComments &&
+ field.getComment() != planField.getComment()))))
}
+ lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
+
if (redo) {
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
val newFields = viewQuery.schema.map {
case StructField(name, dataType, nullable, _) =>
StructField(name, dataType, nullable,
- viewFields.find(_.name == name).get.metadata)
+ viewFieldsByName(name).metadata)
+ }
+ StructType(newFields)
+ } else if
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
+ // Adopt types/nullable/names from query, but preserve view
comments.
+ val newFields = viewQuery.schema.map { planField =>
+ val newMetadata = viewFieldsByName.get(planField.name) match {
+ case Some(viewField) =>
+ // Use table metadata but override with view comment
+ val builder = new
MetadataBuilder().withMetadata(planField.metadata)
+ viewField.getComment() match {
+ case Some(comment) => builder.putString("comment", comment)
Review Comment:
Do we have a metadata key for comments instead of using raw strings here?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2183,6 +2183,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS =
Review Comment:
I think the flag should be added to `RETAINED_ANALYSIS_FLAGS` in
`Analyzer.scala`, otherwise session value of this flag won't propagate to
nested views. Let's double check this
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -695,19 +695,43 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ // Only trigger redo on comment changes if preserve flag is
disabled.
+
(!session.sessionState.conf.viewSchemaEvolutionPreserveUserComments &&
+ field.getComment() != planField.getComment()))))
}
+ lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
+
if (redo) {
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
val newFields = viewQuery.schema.map {
case StructField(name, dataType, nullable, _) =>
StructField(name, dataType, nullable,
- viewFields.find(_.name == name).get.metadata)
+ viewFieldsByName(name).metadata)
+ }
+ StructType(newFields)
+ } else if
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
+ // Adopt types/nullable/names from query, but preserve view
comments.
+ val newFields = viewQuery.schema.map { planField =>
+ val newMetadata = viewFieldsByName.get(planField.name) match {
+ case Some(viewField) =>
+ // Use table metadata but override with view comment
+ val builder = new
MetadataBuilder().withMetadata(planField.metadata)
+ viewField.getComment() match {
+ case Some(comment) => builder.putString("comment", comment)
+ case None => builder.remove("comment")
Review Comment:
By doing this, do we lost table commnets where a view body is a simple
`select * from table`, if the table has comments? Is that the intended behavior?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -695,19 +695,43 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ // Only trigger redo on comment changes if preserve flag is
disabled.
+
(!session.sessionState.conf.viewSchemaEvolutionPreserveUserComments &&
+ field.getComment() != planField.getComment()))))
Review Comment:
Can we make this a separate method?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -695,19 +695,43 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ // Only trigger redo on comment changes if preserve flag is
disabled.
+
(!session.sessionState.conf.viewSchemaEvolutionPreserveUserComments &&
+ field.getComment() != planField.getComment()))))
}
+ lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
Review Comment:
Lazy is pointless here since we will evaluate anyways, we just add mutex
overhead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]