imback82 commented on a change in pull request #27391: [SPARK-30612][SQL] 
Resolve qualified column name with v2 tables
URL: https://github.com/apache/spark/pull/27391#discussion_r373636189
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ##########
 @@ -128,90 +127,65 @@ package object expressions  {
       m.mapValues(_.distinct).map(identity)
     }
 
-    /** Map to use for direct case insensitive attribute lookups. */
-    @transient private lazy val direct: Map[String, Seq[Attribute]] = {
+    /** Attribute name to attributes */
+    @transient private val attrsMap: Map[String, Seq[Attribute]] = {
       unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT)))
     }
 
-    /** Map to use for qualified case insensitive attribute lookups with 2 
part key */
-    @transient private lazy val qualified: Map[(String, String), 
Seq[Attribute]] = {
-      // key is 2 part: table/alias and name
-      val grouped = attrs.filter(_.qualifier.nonEmpty).groupBy {
-        a => (a.qualifier.last.toLowerCase(Locale.ROOT), 
a.name.toLowerCase(Locale.ROOT))
-      }
-      unique(grouped)
-    }
-
-    /** Map to use for qualified case insensitive attribute lookups with 3 
part key */
-    @transient private val qualified3Part: Map[(String, String, String), 
Seq[Attribute]] = {
-      // key is 3 part: database name, table name and name
-      val grouped = attrs.filter(_.qualifier.length == 2).groupBy { a =>
-        (a.qualifier.head.toLowerCase(Locale.ROOT),
-          a.qualifier.last.toLowerCase(Locale.ROOT),
-          a.name.toLowerCase(Locale.ROOT))
-      }
-      unique(grouped)
-    }
-
     /** Perform attribute resolution given a name and a resolver. */
     def resolve(nameParts: Seq[String], resolver: Resolver): 
Option[NamedExpression] = {
-      // Collect matching attributes given a name and a lookup.
-      def collectMatches(name: String, candidates: Option[Seq[Attribute]]): 
Seq[Attribute] = {
-        candidates.toSeq.flatMap(_.collect {
-          case a if resolver(a.name, name) => a.withName(name)
-        })
+      // Returns true if the `short` qualifier is a subset of the last 
elements of
+      // `long` qualifier. For example, Seq("a", "b") is a subset of Seq("a", 
"a", "b"),
+      // but not a subset of Seq("a", "b", "b").
+      def matchQualifier(short: Seq[String], long: Seq[String]): Boolean = {
+        (long.length >= short.length) &&
+          long.takeRight(short.length)
+            .zip(short)
+            .forall(x => resolver(x._1, x._2))
       }
 
-      // Find matches for the given name assuming that the 1st two parts are 
qualifier
-      // (i.e. database name and table name) and the 3rd part is the actual 
column name.
-      //
-      // For example, consider an example where "db1" is the database name, 
"a" is the table name
-      // and "b" is the column name and "c" is the struct field name.
-      // If the name parts is db1.a.b.c, then Attribute will match
-      // Attribute(b, qualifier("db1,"a")) and List("c") will be the second 
element
-      var matches: (Seq[Attribute], Seq[String]) = nameParts match {
-        case dbPart +: tblPart +: name +: nestedFields =>
-          val key = (dbPart.toLowerCase(Locale.ROOT),
-            tblPart.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT))
-          val attributes = collectMatches(name, 
qualified3Part.get(key)).filter {
-            a => (resolver(dbPart, a.qualifier.head) && resolver(tblPart, 
a.qualifier.last))
-          }
-          (attributes, nestedFields)
-        case _ =>
-          (Seq.empty, Seq.empty)
+      // Collect attributes that match the given name and qualifier.
+      // A match occurs if
+      //   1) the given name matches the attribute's name according to the 
resolver.
+      //   2) the given qualifier is a subset of the attribute's qualifier.
+      def collectMatches(
+          name: String,
+          qualifier: Seq[String],
+          candidates: Option[Seq[Attribute]]): Seq[Attribute] = {
+        candidates.toSeq.flatMap(_.collect {
+          case a if resolver(name, a.name) && matchQualifier(qualifier, 
a.qualifier) =>
+            a.withName(name)
+        })
       }
 
-      // If there are no matches, then find matches for the given name 
assuming that
-      // the 1st part is a qualifier (i.e. table name, alias, or subquery 
alias) and the
-      // 2nd part is the actual name. This returns a tuple of
-      // matched attributes and a list of parts that are to be resolved.
-      //
-      // For example, consider an example where "a" is the table name, "b" is 
the column name,
-      // and "c" is the struct field name, i.e. "a.b.c". In this case, 
Attribute will be "a.b",
-      // and the second element will be List("c").
-      if (matches._1.isEmpty) {
-        matches = nameParts match {
-          case qualifier +: name +: nestedFields =>
-            val key = (qualifier.toLowerCase(Locale.ROOT), 
name.toLowerCase(Locale.ROOT))
-            val attributes = collectMatches(name, qualified.get(key)).filter { 
a =>
-              resolver(qualifier, a.qualifier.last)
-            }
-            (attributes, nestedFields)
-          case _ =>
-            (Seq.empty[Attribute], Seq.empty[String])
+      // Iterate each string in `nameParts` in a reverse order and try to 
match the attributes
+      // considering the current string as the attribute name. For example, if 
`nameParts` is
+      // Seq("a", "b", "c"), the match will be performed in the following 
order:
+      // 1) name = "c", qualifier = Seq("a", "b")
+      // 2) name = "b", qualifier = Seq("a")
+      // 3) name = "a", qualifier = Seq()
+      // Note that the match is performed in the reverse order in order to 
match the longest
+      // qualifier as possible. If a match is found, the remaining portion of 
`nameParts`
+      // is also returned as nested fields.
+      val matches = nameParts.zipWithIndex.reverseIterator.flatMap { case 
(name, index) =>
 
 Review comment:
   The type of `matches` here is `Iterator[(Seq[Attribute], Seq[String])]`.  
And since I am doing `val (candidates, nestedFields) = matches.next` below, it 
will find only the first match.
   
   If this was not clear, I can make this to an explicit loop. Please let me 
know, thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to