ljfgem commented on code in PR #35636:
URL: https://github.com/apache/spark/pull/35636#discussion_r952796978
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -447,6 +454,74 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
+ /**
+ * Substitute persisted views in parsed plans with parsed view sql text.
+ */
+ case class ViewSubstitution(sqlParser: ParserInterface) extends
Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ case u @ UnresolvedRelation(nameParts, _, _) if
v1SessionCatalog.isTempView(nameParts) =>
+ u
+ case u @ UnresolvedRelation(
+ parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if
!isSQLOnFile(parts) =>
+ CatalogV2Util.loadView(catalog, ident)
+ .map(createViewRelation(parts.quoted, _))
+ .getOrElse(u)
+ }
+
+ private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
+ case Seq(_, path) if path.contains("/") => true
+ case _ => false
+ }
+
+ private def createViewRelation(name: String, view: V2View): LogicalPlan = {
+ if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
+ throw new AnalysisException(
+ s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
+ }
+
+ val child = parseViewText(name, view.sql)
+ val desc = V2ViewDescription(name, view)
+ val qualifiedChild = desc.viewCatalogAndNamespace match {
+ case Seq() =>
+ // Views from Spark 2.2 or prior do not store catalog or namespace,
+ // however its sql text should already be fully qualified.
+ child
+ case catalogAndNamespace =>
+ // Substitute CTEs within the view before qualifying table
identifiers
+ qualifyTableIdentifiers(CTESubstitution.apply(child),
catalogAndNamespace)
+ }
+
+ // The relation is a view, so we wrap the relation by:
+ // 1. Add a [[View]] operator over the relation to keep track of the
view desc;
+ // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name
of the view.
+ SubqueryAlias(name, View(desc, false, qualifiedChild))
+ }
+
+ private def parseViewText(name: String, viewText: String): LogicalPlan = {
+ try {
+ sqlParser.parsePlan(viewText)
+ } catch {
+ case _: ParseException =>
+ throw QueryCompilationErrors.invalidViewText(viewText, name)
+ }
+ }
+
+ /**
+ * Qualify table identifiers with default catalog and namespace if
necessary.
+ */
+ private def qualifyTableIdentifiers(
+ child: LogicalPlan,
+ catalogAndNamespace: Seq[String]): LogicalPlan =
+ child transform {
+ case u @ UnresolvedRelation(Seq(table), _, _) =>
+ u.copy(multipartIdentifier = catalogAndNamespace :+ table)
+ case u @ UnresolvedRelation(parts, _, _)
+ if !catalogManager.isCatalogRegistered(parts.head) =>
+ u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
Review Comment:
Hi @jzhuge, I think this part of code doesn't consider the scenario that
view is based on the tables from the session catalog, in this case, the table
identifier shouldn’t have the catalog name, but view’s catalog name will be
wrongly added for the base table by this method, which is not expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]