ljfgem commented on code in PR #35636:
URL: https://github.com/apache/spark/pull/35636#discussion_r952796978


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -447,6 +454,74 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
   }
 
+  /**
+   * Substitute persisted views in parsed plans with parsed view sql text.
+   */
+  case class ViewSubstitution(sqlParser: ParserInterface) extends 
Rule[LogicalPlan] {
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+      case u @ UnresolvedRelation(nameParts, _, _) if 
v1SessionCatalog.isTempView(nameParts) =>
+        u
+      case u @ UnresolvedRelation(
+          parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if 
!isSQLOnFile(parts) =>
+        CatalogV2Util.loadView(catalog, ident)
+            .map(createViewRelation(parts.quoted, _))
+            .getOrElse(u)
+    }
+
+    private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
+      case Seq(_, path) if path.contains("/") => true
+      case _ => false
+    }
+
+    private def createViewRelation(name: String, view: V2View): LogicalPlan = {
+      if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
+        throw new AnalysisException(
+          s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
+      }
+
+      val child = parseViewText(name, view.sql)
+      val desc = V2ViewDescription(name, view)
+      val qualifiedChild = desc.viewCatalogAndNamespace match {
+        case Seq() =>
+          // Views from Spark 2.2 or prior do not store catalog or namespace,
+          // however its sql text should already be fully qualified.
+          child
+        case catalogAndNamespace =>
+          // Substitute CTEs within the view before qualifying table 
identifiers
+          qualifyTableIdentifiers(CTESubstitution.apply(child), 
catalogAndNamespace)
+      }
+
+      // The relation is a view, so we wrap the relation by:
+      // 1. Add a [[View]] operator over the relation to keep track of the 
view desc;
+      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name 
of the view.
+      SubqueryAlias(name, View(desc, false, qualifiedChild))
+    }
+
+    private def parseViewText(name: String, viewText: String): LogicalPlan = {
+      try {
+        sqlParser.parsePlan(viewText)
+      } catch {
+        case _: ParseException =>
+          throw QueryCompilationErrors.invalidViewText(viewText, name)
+      }
+    }
+
+    /**
+     * Qualify table identifiers with default catalog and namespace if 
necessary.
+     */
+    private def qualifyTableIdentifiers(
+        child: LogicalPlan,
+        catalogAndNamespace: Seq[String]): LogicalPlan =
+      child transform {
+        case u @ UnresolvedRelation(Seq(table), _, _) =>
+          u.copy(multipartIdentifier = catalogAndNamespace :+ table)
+        case u @ UnresolvedRelation(parts, _, _)
+            if !catalogManager.isCatalogRegistered(parts.head) =>
+          u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)

Review Comment:
   Hi @jzhuge, I think this part of code doesn't consider the scenario that 
view is based on the tables from the session catalog, in this case, the table 
identifier shouldn’t have the catalog name, but view’s catalog name will be 
wrongly added for the base table by this method, which is not expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to