rdblue commented on a change in pull request #24768: [SPARK-27919][SQL] Add v2
session catalog
URL: https://github.com/apache/spark/pull/24768#discussion_r300768695
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
##########
@@ -17,54 +17,127 @@
package org.apache.spark.sql.catalog.v2
+import scala.util.control.NonFatal
+
import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
/**
* A trait to encapsulate catalog lookup function and helpful extractors.
*/
@Experimental
-trait LookupCatalog {
+trait LookupCatalog extends Logging {
+
+ import LookupCatalog._
+ protected def defaultCatalogName: Option[String] = None
protected def lookupCatalog(name: String): CatalogPlugin
- type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+ /**
+ * Returns the default catalog. When set, this catalog is used for all
identifiers that do not
+ * set a specific catalog. When this is None, the session catalog is
responsible for the
+ * identifier.
+ *
+ * If this is None and a table's provider (source) is a v2 provider, the v2
session catalog will
+ * be used.
+ */
+ def defaultCatalog: Option[CatalogPlugin] = {
+ try {
+ defaultCatalogName.map(lookupCatalog)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}",
e)
+ None
+ }
+ }
/**
- * Extract catalog plugin and identifier from a multi-part identifier.
+ * This catalog is a v2 catalog that delegates to the v1 session catalog. it
is used when the
+ * session catalog is responsible for an identifier, but the source requires
the v2 catalog API.
+ * This happens when the source implementation extends the v2 TableProvider
API and is not listed
+ * in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
- object CatalogObjectIdentifier {
- def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts
match {
- case Seq(name) =>
- Some((None, Identifier.of(Array.empty, name)))
+ def sessionCatalog: Option[CatalogPlugin] = {
+ try {
+ Some(lookupCatalog(SESSION_CATALOG_NAME))
+ } catch {
+ case NonFatal(e) =>
+ logError("Cannot load v2 session catalog", e)
+ None
+ }
+ }
+
+ /**
+ * Extract catalog plugin and remaining identifier names.
+ *
+ * This does not substitute the default catalog if no catalog is set in the
identifier.
+ */
+ private object CatalogAndIdentifier {
+ def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin],
Seq[String])] = parts match {
+ case Seq(_) =>
+ Some((None, parts))
case Seq(catalogName, tail @ _*) =>
try {
- Some((Some(lookupCatalog(catalogName)),
Identifier.of(tail.init.toArray, tail.last)))
+ Some((Some(lookupCatalog(catalogName)), tail))
} catch {
case _: CatalogNotFoundException =>
- Some((None, Identifier.of(parts.init.toArray, parts.last)))
+ Some((None, parts))
}
}
}
+ type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
+
+ /**
+ * Extract catalog and identifier from a multi-part identifier with the
default catalog if needed.
+ */
+ object CatalogObjectIdentifier {
+ def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts
match {
+ case CatalogAndIdentifier(maybeCatalog, nameParts) =>
+ Some((
+ maybeCatalog.orElse(defaultCatalog),
+ Identifier.of(nameParts.init.toArray, nameParts.last)
+ ))
+ }
+ }
+
/**
* Extract legacy table identifier from a multi-part identifier.
*
* For legacy support only. Please use [[CatalogObjectIdentifier]] instead
on DSv2 code paths.
*/
object AsTableIdentifier {
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
- case CatalogObjectIdentifier(None, ident) =>
- ident.namespace match {
- case Array() =>
- Some(TableIdentifier(ident.name))
- case Array(database) =>
- Some(TableIdentifier(ident.name, Some(database)))
+ case CatalogAndIdentifier(None, names) if defaultCatalog.isEmpty =>
+ names match {
+ case Seq(name) =>
+ Some(TableIdentifier(name))
+ case Seq(database, name) =>
+ Some(TableIdentifier(name, Some(database)))
case _ =>
None
}
case _ =>
None
}
}
+
+ /**
+ * Extract a legacy table identifier from a multi-part identifier if it has
no catalog.
+ */
+ object AsTemporaryTableIdentifier {
+ def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
+ case CatalogAndIdentifier(None, Seq(table)) =>
+ Some(TableIdentifier(table))
+ case CatalogAndIdentifier(None, Seq(database, table)) =>
+ Some(TableIdentifier(table, Some(database)))
+ case _ =>
+ None
+ }
+ }
+}
+
+object LookupCatalog {
+ val SESSION_CATALOG_NAME: String = "session"
Review comment:
I added a default config for this that uses the V2SessionCatalog. May as
well add it now, but we do need to be able to override it until we have a
built-in source that passes all SQL tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]