allisonwang-db commented on code in PR #43949:
URL: https://github.com/apache/spark/pull/43949#discussion_r1408150483


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = 
partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = 
DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = 
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))
+
+      case Some(p) if !p.supportsExternalMetadata() =>
+        // Partitions cannot be specified when schema is empty.

Review Comment:
   Add assert here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = 
partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = 
DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = 
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))

Review Comment:
   We can pass in the catalogManager and skip this check. And user cannot 
provide schema here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala:
##########
@@ -115,8 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = 
partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
+
+    val (newSchema, newPartitions) = 
DataSourceV2Utils.getTableProvider(provider, conf) match {
+      case Some(_: SupportsCatalogOptions) =>
+        throw new SparkUnsupportedOperationException(
+          errorClass = 
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED",
+          messageParameters = Map("provider" -> provider))
+
+      case Some(p) if !p.supportsExternalMetadata() =>
+        // Partitions cannot be specified when schema is empty.
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = 
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("provider" -> provider))
+        }
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        if (schema.isEmpty) {
+          // Infer the schema and partitions and store them in the catalog.
+          val dsOptions = new CaseInsensitiveStringMap(properties)
+          (tableProvider.inferSchema(dsOptions), 
tableProvider.inferPartitioning(dsOptions))
+        } else {
+          // TODO: when schema is defined but partitioning is empty, should we 
infer it?

Review Comment:
   We should infer partitions here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to