rdblue commented on a change in pull request #24768: [SPARK-27919][SQL] Add v2 
session catalog
URL: https://github.com/apache/spark/pull/24768#discussion_r300534039
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
 ##########
 @@ -17,54 +17,127 @@
 
 package org.apache.spark.sql.catalog.v2
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.TableIdentifier
 
 /**
  * A trait to encapsulate catalog lookup function and helpful extractors.
  */
 @Experimental
-trait LookupCatalog {
+trait LookupCatalog extends Logging {
+
+  import LookupCatalog._
 
+  protected def defaultCatalogName: Option[String] = None
   protected def lookupCatalog(name: String): CatalogPlugin
 
-  type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+  /**
+   * Returns the default catalog. When set, this catalog is used for all 
identifiers that do not
+   * set a specific catalog. When this is None, the session catalog is 
responsible for the
+   * identifier.
+   *
+   * If this is None and a table's provider (source) is a v2 provider, the v2 
session catalog will
+   * be used.
+   */
+  def defaultCatalog: Option[CatalogPlugin] = {
+    try {
+      defaultCatalogName.map(lookupCatalog)
+    } catch {
+      case NonFatal(e) =>
+        logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}", 
e)
+        None
+    }
+  }
 
   /**
-   * Extract catalog plugin and identifier from a multi-part identifier.
+   * This catalog is a v2 catalog that delegates to the v1 session catalog. it 
is used when the
+   * session catalog is responsible for an identifier, but the source requires 
the v2 catalog API.
+   * This happens when the source implementation extends the v2 TableProvider 
API and is not listed
+   * in the fallback configuration, spark.sql.sources.write.useV1SourceList
    */
-  object CatalogObjectIdentifier {
-    def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts 
match {
-      case Seq(name) =>
-        Some((None, Identifier.of(Array.empty, name)))
+  def sessionCatalog: Option[CatalogPlugin] = {
+    try {
+      Some(lookupCatalog(SESSION_CATALOG_NAME))
+    } catch {
+      case NonFatal(e) =>
+        logError("Cannot load v2 session catalog", e)
+        None
+    }
+  }
+
+  /**
+   * Extract catalog plugin and remaining identifier names.
+   *
+   * This does not substitute the default catalog if no catalog is set in the 
identifier.
+   */
+  private object CatalogAndIdentifier {
+    def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], 
Seq[String])] = parts match {
+      case Seq(_) =>
+        Some((None, parts))
       case Seq(catalogName, tail @ _*) =>
         try {
-          Some((Some(lookupCatalog(catalogName)), 
Identifier.of(tail.init.toArray, tail.last)))
+          Some((Some(lookupCatalog(catalogName)), tail))
         } catch {
           case _: CatalogNotFoundException =>
-            Some((None, Identifier.of(parts.init.toArray, parts.last)))
+            Some((None, parts))
         }
     }
   }
 
+  type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+
+  /**
+   * Extract catalog and identifier from a multi-part identifier with the 
default catalog if needed.
+   */
+  object CatalogObjectIdentifier {
+    def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts 
match {
+      case CatalogAndIdentifier(maybeCatalog, nameParts) =>
+        Some((
+            maybeCatalog.orElse(defaultCatalog),
+            Identifier.of(nameParts.init.toArray, nameParts.last)
+        ))
+    }
+  }
+
   /**
    * Extract legacy table identifier from a multi-part identifier.
    *
    * For legacy support only. Please use [[CatalogObjectIdentifier]] instead 
on DSv2 code paths.
    */
   object AsTableIdentifier {
     def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
-      case CatalogObjectIdentifier(None, ident) =>
-        ident.namespace match {
-          case Array() =>
-            Some(TableIdentifier(ident.name))
-          case Array(database) =>
-            Some(TableIdentifier(ident.name, Some(database)))
+      case CatalogAndIdentifier(None, names) if defaultCatalog.isEmpty =>
+        names match {
+          case Seq(name) =>
+            Some(TableIdentifier(name))
+          case Seq(database, name) =>
+            Some(TableIdentifier(name, Some(database)))
           case _ =>
             None
         }
       case _ =>
         None
     }
   }
+
+  /**
+   * Extract a legacy table identifier from a multi-part identifier if it has 
no catalog.
+   */
+  object AsTemporaryTableIdentifier {
+    def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
+      case CatalogAndIdentifier(None, Seq(table)) =>
+        Some(TableIdentifier(table))
+      case CatalogAndIdentifier(None, Seq(database, table)) =>
+        Some(TableIdentifier(table, Some(database)))
+      case _ =>
+        None
+    }
+  }
+}
+
+object LookupCatalog {
+  val SESSION_CATALOG_NAME: String = "session"
 
 Review comment:
   The v2 session catalog is configured by adding a catalog named "session" in 
SQL config. That needs to be added to SQLConf defaults, but I wasn't sure 
whether it should happen in this PR or later. I can add it in this PR if you'd 
prefer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to