ljfgem commented on code in PR #35636:
URL: https://github.com/apache/spark/pull/35636#discussion_r950735369
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -447,6 +454,74 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
+ /**
+ * Substitute persisted views in parsed plans with parsed view sql text.
+ */
+ case class ViewSubstitution(sqlParser: ParserInterface) extends
Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case u @ UnresolvedRelation(nameParts, _, _) if
v1SessionCatalog.isTempView(nameParts) =>
+ u
+ case u @ UnresolvedRelation(
+ parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if
!isSQLOnFile(parts) =>
+ CatalogV2Util.loadView(catalog, ident)
+ .map(createViewRelation(parts.quoted, _))
+ .getOrElse(u)
+ }
+
+ private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
+ case Seq(_, path) if path.contains("/") => true
+ case _ => false
+ }
+
+ private def createViewRelation(name: String, view: V2View): LogicalPlan = {
+ if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
+ throw new AnalysisException(
+ s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
+ }
+
+ val child = parseViewText(name, view.sql)
+ val desc = V2ViewDescription(name, view)
+ val qualifiedChild = desc.viewCatalogAndNamespace match {
+ case Seq() =>
+ // Views from Spark 2.2 or prior do not store catalog or namespace,
+ // however its sql text should already be fully qualified.
+ child
+ case catalogAndNamespace =>
+ // Substitute CTEs within the view before qualifying table
identifiers
+ qualifyTableIdentifiers(CTESubstitution.apply(child),
catalogAndNamespace)
+ }
+
+ // The relation is a view, so we wrap the relation by:
+ // 1. Add a [[View]] operator over the relation to keep track of the
view desc;
+ // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name
of the view.
+ SubqueryAlias(name, View(desc, false, qualifiedChild))
+ }
+
+ private def parseViewText(name: String, viewText: String): LogicalPlan = {
+ try {
+ sqlParser.parsePlan(viewText)
+ } catch {
+ case _: ParseException =>
+ throw QueryCompilationErrors.invalidViewText(viewText, name)
+ }
+ }
+
+ /**
+ * Qualify table identifiers with default catalog and namespace if
necessary.
+ */
+ private def qualifyTableIdentifiers(
+ child: LogicalPlan,
+ catalogAndNamespace: Seq[String]): LogicalPlan =
+ child transform {
+ case u @ UnresolvedRelation(Seq(table), _, _) =>
+ u.copy(multipartIdentifier = catalogAndNamespace :+ table)
+ case u @ UnresolvedRelation(parts, _, _)
+ if !catalogManager.isCatalogRegistered(parts.head) =>
+ u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
+ }
Review Comment:
Hi @jzhuge, I think this implementation assumes that the view and its base
tables must share the same catalog? If so, a view catalog must also implement
`TableCatalog` interface?
Could we add a field like `baseTableCatalogName` in the `View` interface to
specify the base tables' catalog, so that the view can be based on tables in
another catalog?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]