gengliangwang commented on code in PR #46267:
URL: https://github.com/apache/spark/pull/46267#discussion_r1596122256
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -945,54 +945,73 @@ class SessionCatalog(
throw QueryCompilationErrors.invalidViewText(viewText,
metadata.qualifiedName)
}
}
- val projectList = if (!isHiveCreatedView(metadata)) {
- val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
- // For view created before Spark 2.2.0, the view text is already fully
qualified, the plan
- // output is the same with the view output.
- metadata.schema.fieldNames.toImmutableArraySeq
- } else {
- assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
- metadata.viewQueryColumnNames
- }
+ val schemaMode = metadata.viewSchemaMode
+ if (schemaMode == SchemaEvolution) {
+ View(desc = metadata, isTempView = isTempView, child = parsedPlan)
+ } else {
+ val projectList = if (!isHiveCreatedView(metadata)) {
+ val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
+ // For view created before Spark 2.2.0, the view text is already
fully qualified, the plan
+ // output is the same with the view output.
+ metadata.schema.fieldNames.toImmutableArraySeq
+ } else {
+ assert(metadata.viewQueryColumnNames.length ==
metadata.schema.length)
+ metadata.viewQueryColumnNames
+ }
- // For view queries like `SELECT * FROM t`, the schema of the referenced
table/view may
- // change after the view has been created. We need to add an extra
SELECT to pick the columns
- // according to the recorded column names (to get the correct view
column ordering and omit
- // the extra columns that we don't require), with UpCast (to make sure
the type change is
- // safe) and Alias (to respect user-specified view column names)
according to the view schema
- // in the catalog.
- // Note that, the column names may have duplication, e.g. `CREATE VIEW
v(x, y) AS
- // SELECT 1 col, 2 col`. We need to make sure that the matching
attributes have the same
- // number of duplications, and pick the corresponding attribute by
ordinal.
- val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
- val normalizeColName: String => String = if
(viewConf.caseSensitiveAnalysis) {
- identity
+ // For view queries like `SELECT * FROM t`, the schema of the
referenced table/view may
+ // change after the view has been created. We need to add an extra
SELECT to pick the
+ // columns according to the recorded column names (to get the correct
view column ordering
+ // and omit the extra columns that we don't require), with UpCast (to
make sure the type
+ // change is safe) and Alias (to respect user-specified view column
names) according to the
+ // view schema in the catalog.
+ // Note that, the column names may have duplication, e.g. `CREATE VIEW
v(x, y) AS
+ // SELECT 1 col, 2 col`. We need to make sure that the matching
attributes have the same
+ // number of duplications, and pick the corresponding attribute by
ordinal.
+ val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs,
isTempView)
+ val normalizeColName: String => String = if
(viewConf.caseSensitiveAnalysis) {
+ identity
+ } else {
+ _.toLowerCase(Locale.ROOT)
+ }
+ val nameToCounts =
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
+ val nameToCurrentOrdinal =
scala.collection.mutable.HashMap.empty[String, Int]
+ val viewDDL = buildViewDDL(metadata, isTempView)
+
+ viewColumnNames.zip(metadata.schema).map { case (name, field) =>
+ val normalizedName = normalizeColName(name)
+ val count = nameToCounts(normalizedName)
+ val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
+ nameToCurrentOrdinal(normalizedName) = ordinal + 1
+ val col = GetViewColumnByNameAndOrdinal(
+ metadata.identifier.toString, name, ordinal, count, viewDDL)
+ val cast = schemaMode match {
+ /*
+ ** For schema binding, we cast the column to the expected type
using safe cast only.
+ ** For legacy behavior, we cast the column to the expected type
using safe cast only.
+ ** For schema compensation, we cast the column to the expected
type using any cast
+ * in ansi mode.
+ ** For schema (type) evolution, we take teh column as is.
+ */
+ case SchemaBinding => UpCast(col, field.dataType)
+ case SchemaUnsupported => UpCast(col, field.dataType)
+ case SchemaCompensation => Cast(col, field.dataType, ansiEnabled =
true)
+ case SchemaTypeEvolution => col
+ case SchemaEvolution => col
Review Comment:
schemaMode can't be `SchemaEvolution` here, right?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -945,54 +945,73 @@ class SessionCatalog(
throw QueryCompilationErrors.invalidViewText(viewText,
metadata.qualifiedName)
}
}
- val projectList = if (!isHiveCreatedView(metadata)) {
- val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
- // For view created before Spark 2.2.0, the view text is already fully
qualified, the plan
- // output is the same with the view output.
- metadata.schema.fieldNames.toImmutableArraySeq
- } else {
- assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
- metadata.viewQueryColumnNames
- }
+ val schemaMode = metadata.viewSchemaMode
+ if (schemaMode == SchemaEvolution) {
+ View(desc = metadata, isTempView = isTempView, child = parsedPlan)
+ } else {
+ val projectList = if (!isHiveCreatedView(metadata)) {
+ val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
+ // For view created before Spark 2.2.0, the view text is already
fully qualified, the plan
+ // output is the same with the view output.
+ metadata.schema.fieldNames.toImmutableArraySeq
+ } else {
+ assert(metadata.viewQueryColumnNames.length ==
metadata.schema.length)
+ metadata.viewQueryColumnNames
+ }
- // For view queries like `SELECT * FROM t`, the schema of the referenced
table/view may
- // change after the view has been created. We need to add an extra
SELECT to pick the columns
- // according to the recorded column names (to get the correct view
column ordering and omit
- // the extra columns that we don't require), with UpCast (to make sure
the type change is
- // safe) and Alias (to respect user-specified view column names)
according to the view schema
- // in the catalog.
- // Note that, the column names may have duplication, e.g. `CREATE VIEW
v(x, y) AS
- // SELECT 1 col, 2 col`. We need to make sure that the matching
attributes have the same
- // number of duplications, and pick the corresponding attribute by
ordinal.
- val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
- val normalizeColName: String => String = if
(viewConf.caseSensitiveAnalysis) {
- identity
+ // For view queries like `SELECT * FROM t`, the schema of the
referenced table/view may
+ // change after the view has been created. We need to add an extra
SELECT to pick the
+ // columns according to the recorded column names (to get the correct
view column ordering
+ // and omit the extra columns that we don't require), with UpCast (to
make sure the type
+ // change is safe) and Alias (to respect user-specified view column
names) according to the
+ // view schema in the catalog.
+ // Note that, the column names may have duplication, e.g. `CREATE VIEW
v(x, y) AS
+ // SELECT 1 col, 2 col`. We need to make sure that the matching
attributes have the same
+ // number of duplications, and pick the corresponding attribute by
ordinal.
+ val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs,
isTempView)
+ val normalizeColName: String => String = if
(viewConf.caseSensitiveAnalysis) {
+ identity
+ } else {
+ _.toLowerCase(Locale.ROOT)
+ }
+ val nameToCounts =
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
+ val nameToCurrentOrdinal =
scala.collection.mutable.HashMap.empty[String, Int]
+ val viewDDL = buildViewDDL(metadata, isTempView)
+
+ viewColumnNames.zip(metadata.schema).map { case (name, field) =>
+ val normalizedName = normalizeColName(name)
+ val count = nameToCounts(normalizedName)
+ val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
+ nameToCurrentOrdinal(normalizedName) = ordinal + 1
+ val col = GetViewColumnByNameAndOrdinal(
+ metadata.identifier.toString, name, ordinal, count, viewDDL)
+ val cast = schemaMode match {
+ /*
+ ** For schema binding, we cast the column to the expected type
using safe cast only.
+ ** For legacy behavior, we cast the column to the expected type
using safe cast only.
+ ** For schema compensation, we cast the column to the expected
type using any cast
+ * in ansi mode.
+ ** For schema (type) evolution, we take teh column as is.
+ */
+ case SchemaBinding => UpCast(col, field.dataType)
+ case SchemaUnsupported => UpCast(col, field.dataType)
+ case SchemaCompensation => Cast(col, field.dataType, ansiEnabled =
true)
+ case SchemaTypeEvolution => col
+ case SchemaEvolution => col
+ }
+ Alias(cast, field.name)(explicitMetadata = Some(field.metadata))
+ }
} else {
- _.toLowerCase(Locale.ROOT)
- }
- val nameToCounts =
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
- val nameToCurrentOrdinal =
scala.collection.mutable.HashMap.empty[String, Int]
- val viewDDL = buildViewDDL(metadata, isTempView)
-
- viewColumnNames.zip(metadata.schema).map { case (name, field) =>
- val normalizedName = normalizeColName(name)
- val count = nameToCounts(normalizedName)
- val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
- nameToCurrentOrdinal(normalizedName) = ordinal + 1
- val col = GetViewColumnByNameAndOrdinal(
- metadata.identifier.toString, name, ordinal, count, viewDDL)
- Alias(UpCast(col, field.dataType), field.name)(explicitMetadata =
Some(field.metadata))
- }
- } else {
- // For view created by hive, the parsed view plan may have different
output columns with
- // the schema stored in metadata. For example: `CREATE VIEW v AS SELECT
1 FROM t`
- // the schema in metadata will be `_c0` while the parsed view plan has
column named `1`
- metadata.schema.zipWithIndex.map { case (field, index) =>
- val col = GetColumnByOrdinal(index, field.dataType)
- Alias(UpCast(col, field.dataType), field.name)(explicitMetadata =
Some(field.metadata))
+ // For view created by hive, the parsed view plan may have different
output columns with
Review Comment:
why the schema mode doesn't affect the hive created view? Shall we add some
comment about it?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -620,3 +621,65 @@ object CollationCheck extends (LogicalPlan => Unit) {
private def isCollationExpression(expression: Expression): Boolean =
expression.isInstanceOf[Collation] || expression.isInstanceOf[Collate]
}
+
+
+/**
+ * This rule checks for references to views WITH SCHEMA [TYPE] EVOLUTION and
synchronizes the
+ * catalog if evolution was detected.
+ * It does so by walking the resolved plan looking for View operators for
persisted views.
+ */
+object SyncViewsCheck extends (LogicalPlan => Unit) {
+ def apply(plan: LogicalPlan): Unit = {
Review Comment:
```suggestion
def apply(plan: LogicalPlan): Unit = {
```
##########
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala:
##########
@@ -224,6 +224,7 @@ abstract class BaseSessionStateBuilder(
TableCapabilityCheck +:
CommandCheck +:
CollationCheck +:
+ SyncViewsCheck +:
Review Comment:
The check rules should not come with side effects. Ideally it should check
the whole plan and throw exception if something is wrong. However, it seems
that there is side effect in this new rule:
```
session.sessionState.catalog.alterTable(updatedViewMeta)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]