imback82 commented on a change in pull request #31811:
URL: https://github.com/apache/spark/pull/31811#discussion_r594646085



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
##########
@@ -57,15 +60,43 @@ object EliminateView extends Rule[LogicalPlan] with 
CastSupport {
     // The child has the different output attributes with the View operator. 
Adds a Project over
     // the child of the view.
     case v @ View(desc, _, output, child) if child.resolved && 
!v.sameOutput(child) =>
+      // Use the stored view query output column names to find the matching 
attributes. The column
+      // names may have duplication, e.g. `CREATE VIEW v AS SELECT 1 col, 2 
col`. We need to make
+      // sure the that matching attributes have the same number of 
duplications, and pick the
+      // corresponding attribute by ordinal.
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
-        // Find the attribute that has the expected attribute name from an 
attribute list, the names
-        // are compared using conf.resolver.
-        // `CheckAnalysis` already guarantees the expected attribute can be 
found for sure.
-        desc.viewQueryColumnNames.map { colName =>
-          child.output.find(attr => resolver(attr.name, colName)).get
+        val normalizeColName: String => String = if 
(conf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+        val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, 
Seq[Attribute]]
+
+        val outputAttrs = queryColumnNames.map { colName =>
+          val normalized = normalizeColName(colName)
+          val count = nameToCounts.getOrElse(normalized, 0)
+          val matchedCols = nameToMatchedCols.getOrElseUpdate(
+            normalized, child.output.filter(attr => resolver(attr.name, 
colName)))
+          if (matchedCols.length - 1 < count) {
+            throw new AnalysisException(s"The SQL query of view 
${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be 
resolved.")

Review comment:
       nit: Should we be more specific for the "schema change" (e.g., one less 
duplicated column)?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
##########
@@ -57,15 +60,43 @@ object EliminateView extends Rule[LogicalPlan] with 
CastSupport {
     // The child has the different output attributes with the View operator. 
Adds a Project over
     // the child of the view.
     case v @ View(desc, _, output, child) if child.resolved && 
!v.sameOutput(child) =>
+      // Use the stored view query output column names to find the matching 
attributes. The column
+      // names may have duplication, e.g. `CREATE VIEW v AS SELECT 1 col, 2 
col`. We need to make
+      // sure the that matching attributes have the same number of 
duplications, and pick the
+      // corresponding attribute by ordinal.
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
-        // Find the attribute that has the expected attribute name from an 
attribute list, the names
-        // are compared using conf.resolver.
-        // `CheckAnalysis` already guarantees the expected attribute can be 
found for sure.
-        desc.viewQueryColumnNames.map { colName =>
-          child.output.find(attr => resolver(attr.name, colName)).get
+        val normalizeColName: String => String = if 
(conf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+        val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, 
Seq[Attribute]]
+
+        val outputAttrs = queryColumnNames.map { colName =>
+          val normalized = normalizeColName(colName)
+          val count = nameToCounts.getOrElse(normalized, 0)
+          val matchedCols = nameToMatchedCols.getOrElseUpdate(
+            normalized, child.output.filter(attr => resolver(attr.name, 
colName)))
+          if (matchedCols.length - 1 < count) {
+            throw new AnalysisException(s"The SQL query of view 
${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be 
resolved.")
+          }
+          val col = matchedCols(count)
+          nameToCounts(normalized) = count + 1
+          col

Review comment:
       nit: I think you can simplify it as:
   ```scala
   nameToCounts(normalized) = count + 1
   matchedCols(count)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to