cloud-fan commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1411877904


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,9 +164,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = 
DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not 
be allowed to
+      // specify custom schema when creating the data source table, since the 
schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = 
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema 
is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: 
${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = new CaseInsensitiveStringMap(properties)
+        if (schema.isEmpty) {
+          assert(partitions.isEmpty,
+            s"Partitions should be empty when the schema is empty: 
${partitions.mkString(", ")}")
+          // Infer the schema and partitions and store them in the catalog.
+          (tableProvider.inferSchema(dsOptions), 
tableProvider.inferPartitioning(dsOptions))
+        } else if (partitions.isEmpty) {
+          (schema, tableProvider.inferPartitioning(dsOptions))
+        } else {
+          (schema, partitions)
+        }
+
+      case _ =>
+        (schema, partitions)

Review Comment:
   maybe we can do it latter. It's the current behavior that allows any table 
provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to