rdblue commented on a change in pull request #26684: [WIP][SPARK-30001][SQL] 
ResolveRelations should handle both V1 and V2 tables.
URL: https://github.com/apache/spark/pull/26684#discussion_r351403605
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -777,36 +772,84 @@ class Analyzer(
     }
 
     def apply(plan: LogicalPlan): LogicalPlan = 
ResolveTables(plan).resolveOperatorsUp {
-      case i @ InsertIntoStatement(u @ 
UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
-          if child.resolved =>
-        EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
+      case i @ InsertIntoStatement(
+          u @ UnresolvedRelation(CatalogObjectIdentifier(catalog, ident)), _, 
_, _, _)
+            if i.query.resolved && CatalogV2Util.isSessionCatalog(catalog) =>
+        val relation = ResolveTempViews(u) match {
+          case unresolved: UnresolvedRelation =>
+            lookupRelation(catalog, ident, recurse = 
false).getOrElse(unresolved)
+          case tempView => tempView
+        }
+
+        EliminateSubqueryAliases(relation) match {
           case v: View =>
             u.failAnalysis(s"Inserting into a view is not allowed. View: 
${v.desc.identifier}.")
           case other => i.copy(table = other)
         }
+
       case u: UnresolvedRelation => resolveRelation(u)
     }
 
-    // Look up the table with the given name from catalog. The database we 
used is decided by the
-    // precedence:
-    // 1. Use the database part of the table identifier, if it is defined;
-    // 2. Use defaultDatabase, if it is defined(In this case, no temporary 
objects can be used,
-    //    and the default database is only used to look up a view);
-    // 3. Use the currentDb of the SessionCatalog.
-    private def lookupTableFromCatalog(
-        tableIdentifier: TableIdentifier,
-        u: UnresolvedRelation,
-        defaultDatabase: Option[String] = None): LogicalPlan = {
-      val tableIdentWithDb = tableIdentifier.copy(
-        database = tableIdentifier.database.orElse(defaultDatabase))
-      try {
-        v1SessionCatalog.lookupRelation(tableIdentWithDb)
-      } catch {
-        case _: NoSuchTableException | _: NoSuchDatabaseException =>
-          u
+    // Look up a relation from a given session catalog with the following 
logic:
+    // 1) If a relation is not found in the catalog, return None.
+    // 2) If a relation is found and is not a v1 table, create a v2 relation.
+    // 3) If a relation is found and is a v1 table,
+    //   a) If it has a v2 provider, create a v2 relation.
+    //   b) If it doesn't have a v2 provider and is not running on files, 
create v1 relation.
+    //      Relation that runs directly on files will be
+    //   c) Otherwise, return None.
+    // If recurse is set to true, it will call `resolveRelation` recursively 
to resolve
+    // relations with the correct AnalysisContext.defaultDatabase scope.
+    private def lookupRelation(
+        catalog: CatalogPlugin,
+        ident: Identifier,
+        recurse: Boolean): Option[LogicalPlan] = {
+      val newIdent = withNewNamespace(ident)
+      require(newIdent.namespace.size == 1)
+
+      CatalogV2Util.loadTable(catalog, newIdent) match {
+        case Some(v1Table: V1Table) =>
+          val tableIdent = TableIdentifier(newIdent.name, 
newIdent.namespace.headOption)
+          if (isV2Provider(v1Table.v1Table.provider)) {
+            Some(DataSourceV2Relation.create(v1Table))
+          } else if (!isRunningDirectlyOnFiles(tableIdent)) {
+            val relation = v1SessionCatalog.lookupRelation(tableIdent, 
v1Table.v1Table)
+            if (recurse) {
+              Some(resolveRelation(relation))
+            } else {
+              Some(relation)
+            }
+          } else {
+            None
+          }
+        case Some(table) =>
+          Some(DataSourceV2Relation.create(table))
+        case None => None
       }
     }
 
+    // The namespace used for lookup is decided by the following precedence:
+    // 1. Use the existing namespace if it is defined.
+    // 2. Use defaultDatabase fom AnalysisContext, if it is defined. In this 
case, no temporary
+    //    objects can be used, and the default database is only used to look 
up a view.
+    // 3. Use the current namespace. Note that the catalog will be a session 
catalog since
+    //    this function gets called only when the catalog is resolved to a 
session catalog.
+    private def withNewNamespace(ident: Identifier): Identifier = {
+      if (ident.namespace.nonEmpty) {
+        ident
+      } else {
+        val defaultNamespace = AnalysisContext.get.defaultDatabase match {
+          case Some(db) => Array(db)
+          case None => catalogManager.currentNamespace
+        }
+        Identifier.of(defaultNamespace, ident.name)
+      }
+    }
+
+    private def isV2Provider(provider: Option[String]): Boolean = {
+      return false
 
 Review comment:
   This shouldn't happen in a Catalyst rule. The purpose of V1Table is to carry 
all of the information needed to convert from V2 back to V1. That conversion 
should happen independently, so that table resolution just needs to look up the 
table and return.
   
   Whether to return a V1 table is up to the session catalog implementation. 
That's where the TableProvider check should happen, so it can either return a 
V1Table for a v1 provider, or a v2 table for a v2 provider.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to