ljfgem commented on code in PR #35636:
URL: https://github.com/apache/spark/pull/35636#discussion_r950735369


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -447,6 +454,74 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
   }
 
+  /**
+   * Substitute persisted views in parsed plans with parsed view sql text.
+   */
+  case class ViewSubstitution(sqlParser: ParserInterface) extends 
Rule[LogicalPlan] {
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+      case u @ UnresolvedRelation(nameParts, _, _) if 
v1SessionCatalog.isTempView(nameParts) =>
+        u
+      case u @ UnresolvedRelation(
+          parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if 
!isSQLOnFile(parts) =>
+        CatalogV2Util.loadView(catalog, ident)
+            .map(createViewRelation(parts.quoted, _))
+            .getOrElse(u)
+    }
+
+    private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
+      case Seq(_, path) if path.contains("/") => true
+      case _ => false
+    }
+
+    private def createViewRelation(name: String, view: V2View): LogicalPlan = {
+      if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
+        throw new AnalysisException(
+          s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
+      }
+
+      val child = parseViewText(name, view.sql)
+      val desc = V2ViewDescription(name, view)
+      val qualifiedChild = desc.viewCatalogAndNamespace match {
+        case Seq() =>
+          // Views from Spark 2.2 or prior do not store catalog or namespace,
+          // however its sql text should already be fully qualified.
+          child
+        case catalogAndNamespace =>
+          // Substitute CTEs within the view before qualifying table 
identifiers
+          qualifyTableIdentifiers(CTESubstitution.apply(child), 
catalogAndNamespace)
+      }
+
+      // The relation is a view, so we wrap the relation by:
+      // 1. Add a [[View]] operator over the relation to keep track of the 
view desc;
+      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name 
of the view.
+      SubqueryAlias(name, View(desc, false, qualifiedChild))
+    }
+
+    private def parseViewText(name: String, viewText: String): LogicalPlan = {
+      try {
+        sqlParser.parsePlan(viewText)
+      } catch {
+        case _: ParseException =>
+          throw QueryCompilationErrors.invalidViewText(viewText, name)
+      }
+    }
+
+    /**
+     * Qualify table identifiers with default catalog and namespace if 
necessary.
+     */
+    private def qualifyTableIdentifiers(
+        child: LogicalPlan,
+        catalogAndNamespace: Seq[String]): LogicalPlan =
+      child transform {
+        case u @ UnresolvedRelation(Seq(table), _, _) =>
+          u.copy(multipartIdentifier = catalogAndNamespace :+ table)
+        case u @ UnresolvedRelation(parts, _, _)
+            if !catalogManager.isCatalogRegistered(parts.head) =>
+          u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
+      }

Review Comment:
   Hi @jzhuge, I think this implementation assumes that the view and its base 
tables must share the same catalog? If so, a view catalog must also implement 
`TableCatalog` interface? 
   Could we add a field like `baseTableCatalogName` in the `View` interface to 
specify the base tables' catalog, so that the view can be based on tables in 
another catalog?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to