gengliangwang commented on code in PR #46267:
URL: https://github.com/apache/spark/pull/46267#discussion_r1596122256


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -945,54 +945,73 @@ class SessionCatalog(
           throw QueryCompilationErrors.invalidViewText(viewText, 
metadata.qualifiedName)
       }
     }
-    val projectList = if (!isHiveCreatedView(metadata)) {
-      val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
-        // For view created before Spark 2.2.0, the view text is already fully 
qualified, the plan
-        // output is the same with the view output.
-        metadata.schema.fieldNames.toImmutableArraySeq
-      } else {
-        assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
-        metadata.viewQueryColumnNames
-      }
+    val schemaMode = metadata.viewSchemaMode
+    if (schemaMode == SchemaEvolution) {
+      View(desc = metadata, isTempView = isTempView, child = parsedPlan)
+    } else {
+      val projectList = if (!isHiveCreatedView(metadata)) {
+        val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
+          // For view created before Spark 2.2.0, the view text is already 
fully qualified, the plan
+          // output is the same with the view output.
+          metadata.schema.fieldNames.toImmutableArraySeq
+        } else {
+          assert(metadata.viewQueryColumnNames.length == 
metadata.schema.length)
+          metadata.viewQueryColumnNames
+        }
 
-      // For view queries like `SELECT * FROM t`, the schema of the referenced 
table/view may
-      // change after the view has been created. We need to add an extra 
SELECT to pick the columns
-      // according to the recorded column names (to get the correct view 
column ordering and omit
-      // the extra columns that we don't require), with UpCast (to make sure 
the type change is
-      // safe) and Alias (to respect user-specified view column names) 
according to the view schema
-      // in the catalog.
-      // Note that, the column names may have duplication, e.g. `CREATE VIEW 
v(x, y) AS
-      // SELECT 1 col, 2 col`. We need to make sure that the matching 
attributes have the same
-      // number of duplications, and pick the corresponding attribute by 
ordinal.
-      val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
-      val normalizeColName: String => String = if 
(viewConf.caseSensitiveAnalysis) {
-        identity
+        // For view queries like `SELECT * FROM t`, the schema of the 
referenced table/view may
+        // change after the view has been created. We need to add an extra 
SELECT to pick the
+        // columns according to the recorded column names (to get the correct 
view column ordering
+        // and omit the extra columns that we don't require), with UpCast (to 
make sure the type
+        // change is safe) and Alias (to respect user-specified view column 
names) according to the
+        // view schema in the catalog.
+        // Note that, the column names may have duplication, e.g. `CREATE VIEW 
v(x, y) AS
+        // SELECT 1 col, 2 col`. We need to make sure that the matching 
attributes have the same
+        // number of duplications, and pick the corresponding attribute by 
ordinal.
+        val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, 
isTempView)
+        val normalizeColName: String => String = if 
(viewConf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
+        val nameToCurrentOrdinal = 
scala.collection.mutable.HashMap.empty[String, Int]
+        val viewDDL = buildViewDDL(metadata, isTempView)
+
+        viewColumnNames.zip(metadata.schema).map { case (name, field) =>
+          val normalizedName = normalizeColName(name)
+          val count = nameToCounts(normalizedName)
+          val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
+          nameToCurrentOrdinal(normalizedName) = ordinal + 1
+          val col = GetViewColumnByNameAndOrdinal(
+            metadata.identifier.toString, name, ordinal, count, viewDDL)
+          val cast = schemaMode match {
+            /*
+            ** For schema binding, we cast the column to the expected type 
using safe cast only.
+            ** For legacy behavior, we cast the column to the expected type 
using safe cast only.
+            ** For schema compensation, we cast the column to the expected 
type using any cast
+            *  in ansi mode.
+            ** For schema (type) evolution, we take teh column as is.
+            */
+            case SchemaBinding => UpCast(col, field.dataType)
+            case SchemaUnsupported => UpCast(col, field.dataType)
+            case SchemaCompensation => Cast(col, field.dataType, ansiEnabled = 
true)
+            case SchemaTypeEvolution => col
+            case SchemaEvolution => col

Review Comment:
   schemaMode can't be `SchemaEvolution` here, right?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -945,54 +945,73 @@ class SessionCatalog(
           throw QueryCompilationErrors.invalidViewText(viewText, 
metadata.qualifiedName)
       }
     }
-    val projectList = if (!isHiveCreatedView(metadata)) {
-      val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
-        // For view created before Spark 2.2.0, the view text is already fully 
qualified, the plan
-        // output is the same with the view output.
-        metadata.schema.fieldNames.toImmutableArraySeq
-      } else {
-        assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
-        metadata.viewQueryColumnNames
-      }
+    val schemaMode = metadata.viewSchemaMode
+    if (schemaMode == SchemaEvolution) {
+      View(desc = metadata, isTempView = isTempView, child = parsedPlan)
+    } else {
+      val projectList = if (!isHiveCreatedView(metadata)) {
+        val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
+          // For view created before Spark 2.2.0, the view text is already 
fully qualified, the plan
+          // output is the same with the view output.
+          metadata.schema.fieldNames.toImmutableArraySeq
+        } else {
+          assert(metadata.viewQueryColumnNames.length == 
metadata.schema.length)
+          metadata.viewQueryColumnNames
+        }
 
-      // For view queries like `SELECT * FROM t`, the schema of the referenced 
table/view may
-      // change after the view has been created. We need to add an extra 
SELECT to pick the columns
-      // according to the recorded column names (to get the correct view 
column ordering and omit
-      // the extra columns that we don't require), with UpCast (to make sure 
the type change is
-      // safe) and Alias (to respect user-specified view column names) 
according to the view schema
-      // in the catalog.
-      // Note that, the column names may have duplication, e.g. `CREATE VIEW 
v(x, y) AS
-      // SELECT 1 col, 2 col`. We need to make sure that the matching 
attributes have the same
-      // number of duplications, and pick the corresponding attribute by 
ordinal.
-      val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
-      val normalizeColName: String => String = if 
(viewConf.caseSensitiveAnalysis) {
-        identity
+        // For view queries like `SELECT * FROM t`, the schema of the 
referenced table/view may
+        // change after the view has been created. We need to add an extra 
SELECT to pick the
+        // columns according to the recorded column names (to get the correct 
view column ordering
+        // and omit the extra columns that we don't require), with UpCast (to 
make sure the type
+        // change is safe) and Alias (to respect user-specified view column 
names) according to the
+        // view schema in the catalog.
+        // Note that, the column names may have duplication, e.g. `CREATE VIEW 
v(x, y) AS
+        // SELECT 1 col, 2 col`. We need to make sure that the matching 
attributes have the same
+        // number of duplications, and pick the corresponding attribute by 
ordinal.
+        val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, 
isTempView)
+        val normalizeColName: String => String = if 
(viewConf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
+        val nameToCurrentOrdinal = 
scala.collection.mutable.HashMap.empty[String, Int]
+        val viewDDL = buildViewDDL(metadata, isTempView)
+
+        viewColumnNames.zip(metadata.schema).map { case (name, field) =>
+          val normalizedName = normalizeColName(name)
+          val count = nameToCounts(normalizedName)
+          val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
+          nameToCurrentOrdinal(normalizedName) = ordinal + 1
+          val col = GetViewColumnByNameAndOrdinal(
+            metadata.identifier.toString, name, ordinal, count, viewDDL)
+          val cast = schemaMode match {
+            /*
+            ** For schema binding, we cast the column to the expected type 
using safe cast only.
+            ** For legacy behavior, we cast the column to the expected type 
using safe cast only.
+            ** For schema compensation, we cast the column to the expected 
type using any cast
+            *  in ansi mode.
+            ** For schema (type) evolution, we take teh column as is.
+            */
+            case SchemaBinding => UpCast(col, field.dataType)
+            case SchemaUnsupported => UpCast(col, field.dataType)
+            case SchemaCompensation => Cast(col, field.dataType, ansiEnabled = 
true)
+            case SchemaTypeEvolution => col
+            case SchemaEvolution => col
+          }
+          Alias(cast, field.name)(explicitMetadata = Some(field.metadata))
+        }
       } else {
-        _.toLowerCase(Locale.ROOT)
-      }
-      val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
-      val nameToCurrentOrdinal = 
scala.collection.mutable.HashMap.empty[String, Int]
-      val viewDDL = buildViewDDL(metadata, isTempView)
-
-      viewColumnNames.zip(metadata.schema).map { case (name, field) =>
-        val normalizedName = normalizeColName(name)
-        val count = nameToCounts(normalizedName)
-        val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
-        nameToCurrentOrdinal(normalizedName) = ordinal + 1
-        val col = GetViewColumnByNameAndOrdinal(
-          metadata.identifier.toString, name, ordinal, count, viewDDL)
-        Alias(UpCast(col, field.dataType), field.name)(explicitMetadata = 
Some(field.metadata))
-      }
-    } else {
-      // For view created by hive, the parsed view plan may have different 
output columns with
-      // the schema stored in metadata. For example: `CREATE VIEW v AS SELECT 
1 FROM t`
-      // the schema in metadata will be `_c0` while the parsed view plan has 
column named `1`
-      metadata.schema.zipWithIndex.map { case (field, index) =>
-        val col = GetColumnByOrdinal(index, field.dataType)
-        Alias(UpCast(col, field.dataType), field.name)(explicitMetadata = 
Some(field.metadata))
+        // For view created by hive, the parsed view plan may have different 
output columns with

Review Comment:
   why the schema mode doesn't affect the hive created view? Shall we add some 
comment about it?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -620,3 +621,65 @@ object CollationCheck extends (LogicalPlan => Unit) {
   private def isCollationExpression(expression: Expression): Boolean =
     expression.isInstanceOf[Collation] || expression.isInstanceOf[Collate]
 }
+
+
+/**
+ * This rule checks for references to views WITH SCHEMA [TYPE] EVOLUTION and 
synchronizes the
+ * catalog if evolution was detected.
+ * It does so by walking the resolved plan looking for View operators for 
persisted views.
+ */
+object SyncViewsCheck extends (LogicalPlan => Unit) {
+    def apply(plan: LogicalPlan): Unit = {

Review Comment:
   ```suggestion
     def apply(plan: LogicalPlan): Unit = {
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala:
##########
@@ -224,6 +224,7 @@ abstract class BaseSessionStateBuilder(
         TableCapabilityCheck +:
         CommandCheck +:
         CollationCheck +:
+        SyncViewsCheck +:

Review Comment:
   The check rules should not come with side effects. Ideally it should check 
the whole plan and throw exception if something is wrong. However, it seems 
that there is side effect in this new rule:
   ```
   session.sessionState.catalog.alterTable(updatedViewMeta)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to