jose-torres commented on a change in pull request #24768: [SPARK-27919][SQL]
Add v2 session catalog
URL: https://github.com/apache/spark/pull/24768#discussion_r299717662
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
##########
@@ -26,64 +26,73 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin,
Identifier, LookupCatalog
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
CreateV2Table, DropTable, LogicalPlan}
import
org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement,
AlterTableSetLocationStatement, AlterTableSetPropertiesStatement,
AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement,
AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement,
CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand,
AlterTableSetLocationCommand, AlterTableSetPropertiesCommand,
AlterTableUnsetPropertiesCommand, DropTableCommand}
+import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2,
DataSourceV2Relation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType,
MetadataBuilder, StructField, StructType}
case class DataSourceResolution(
conf: SQLConf,
- findCatalog: String => CatalogPlugin)
- extends Rule[LogicalPlan] with CastSupport with LookupCatalog {
+ lookup: LookupCatalog)
+ extends Rule[LogicalPlan] with CastSupport {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ import lookup._
- override protected def lookupCatalog(name: String): CatalogPlugin =
findCatalog(name)
-
- def defaultCatalog: Option[CatalogPlugin] =
conf.defaultV2Catalog.map(findCatalog)
+ lazy val v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog
+ .getOrElse(throw new AnalysisException("No v2 session catalog
implementation is available"))
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec,
properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) =>
-
+ // the source is v1, the identifier has no catalog, and there is no
default v2 catalog
Review comment:
I agree with the design goals you're describing, and agree that we shouldn't
revisit that decision here. (Even if I had a better solution in mind, which I
don't.)
I'm still confused about the "there is no default v2 catalog" part, though.
How do we know that a CreateTableStatement matching this case can't exist when
there's a default v2 catalog?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]