viirya commented on a change in pull request #31811:
URL: https://github.com/apache/spark/pull/31811#discussion_r594630141
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
##########
@@ -57,15 +60,43 @@ object EliminateView extends Rule[LogicalPlan] with
CastSupport {
// The child has the different output attributes with the View operator.
Adds a Project over
// the child of the view.
case v @ View(desc, _, output, child) if child.resolved &&
!v.sameOutput(child) =>
+ // Use the stored view query output column names to find the matching
attributes. The column
+ // names may have duplication, e.g. `CREATE VIEW v AS SELECT 1 col, 2
col`. We need to make
+ // sure the that matching attributes have the same number of
duplications, and pick the
+ // corresponding attribute by ordinal.
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
- // Find the attribute that has the expected attribute name from an
attribute list, the names
- // are compared using conf.resolver.
- // `CheckAnalysis` already guarantees the expected attribute can be
found for sure.
- desc.viewQueryColumnNames.map { colName =>
- child.output.find(attr => resolver(attr.name, colName)).get
+ val normalizeColName: String => String = if
(conf.caseSensitiveAnalysis) {
+ identity
+ } else {
+ _.toLowerCase(Locale.ROOT)
+ }
+ val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+ val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String,
Seq[Attribute]]
+
+ val outputAttrs = queryColumnNames.map { colName =>
+ val normalized = normalizeColName(colName)
+ val count = nameToCounts.getOrElse(normalized, 0)
+ val matchedCols = nameToMatchedCols.getOrElseUpdate(
+ normalized, child.output.filter(attr => resolver(attr.name,
colName)))
+ if (matchedCols.length - 1 < count) {
+ throw new AnalysisException(s"The SQL query of view
${desc.identifier} has an " +
+ s"incompatible schema change and column $colName cannot be
resolved.")
+ }
+ val col = matchedCols(count)
+ nameToCounts(normalized) = count + 1
+ col
}
+
+ nameToCounts.foreach { case (colName, count) =>
+ if (count > 1 && nameToMatchedCols(colName).length != count) {
+ throw new AnalysisException(s"The SQL query of view
${desc.identifier} has an " +
+ s"incompatible schema change and column $colName cannot be
resolved.")
+ }
Review comment:
Does it mean If the table adds a same name column, we don't want to
ignore it implicitly?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]