jose-torres commented on a change in pull request #24768: [SPARK-27919][SQL]
Add v2 session catalog
URL: https://github.com/apache/spark/pull/24768#discussion_r299673770
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
##########
@@ -26,64 +26,73 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin,
Identifier, LookupCatalog
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
CreateV2Table, DropTable, LogicalPlan}
import
org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement,
AlterTableSetLocationStatement, AlterTableSetPropertiesStatement,
AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement,
AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement,
CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand,
AlterTableSetLocationCommand, AlterTableSetPropertiesCommand,
AlterTableUnsetPropertiesCommand, DropTableCommand}
+import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2,
DataSourceV2Relation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType,
MetadataBuilder, StructField, StructType}
case class DataSourceResolution(
conf: SQLConf,
- findCatalog: String => CatalogPlugin)
- extends Rule[LogicalPlan] with CastSupport with LookupCatalog {
+ lookup: LookupCatalog)
+ extends Rule[LogicalPlan] with CastSupport {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ import lookup._
- override protected def lookupCatalog(name: String): CatalogPlugin =
findCatalog(name)
-
- def defaultCatalog: Option[CatalogPlugin] =
conf.defaultV2Catalog.map(findCatalog)
+ lazy val v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog
+ .getOrElse(throw new AnalysisException("No v2 session catalog
implementation is available"))
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec,
properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) =>
-
+ // the source is v1, the identifier has no catalog, and there is no
default v2 catalog
Review comment:
I don't know if there's anything to do about it in this PR, and maybe I'm
misunderstanding, but I'm very concerned if we're inferring which catalogs do
or don't exist based only on the shape of the query plan. Ideally we should be
able to interpret query plans without needing to think about which catalogs
exist, and directly check in the rare situations where it does matter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]