[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73969863 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,31 +521,29 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties -if (props.isDefinedAt(DATASOURCE_SCHEMA)) { +props.get(DATASOURCE_SCHEMA).map { schema => // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType]) -} else { - metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => + DataType.fromJson(schema).asInstanceOf[StructType] +} getOrElse { + props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => val parts = (0 until numParts.toInt).map { index => val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull if (part == null) { -throw new AnalysisException( - "Could not read schema from the metastore because it is corrupted " + -s"(missing part $index of the schema, $numParts parts are expected).") +throw new AnalysisException(msgSchemaCorrupted + + s" (missing part $index of the schema, $numParts parts are expected).") } - part } // Stick all parts back to a single schema string. DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } + } getOrElse(throw new AnalysisException(msgSchemaCorrupted)) --- End diff -- : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73968228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns +} else { + val res = dataSource match { +case r: HadoopFsRelation => r.partitionSchema.fieldNames +case _ => Array.empty[String] + } + if (userSpecifiedPartitionColumns.length > 0) { +// The table does not have a specified schema, which means that the schema will be inferred +// when we load the table. So, we are not expecting partition columns and we will discover +// partitions when we load the table. However, if there are specified partition columns, +// we simply ignore them and provide a warning message. +logWarning( + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table $tableIdent are inferred. " + +s"Schema: ${dataSource.schema.simpleString}; " + +s"Partition columns: ${res.mkString("(", ", ", ")")}") + } + res +} CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, - userSpecifiedSchema = userSpecifiedSchema, + schema = dataSource.schema, --- End diff -- Here, `dataSource.schema` could be inferred. Previously, we do not store the inferred schema. After this PR, we did and thus we use `dataSource.schema`. Actually, after re-checking the code, I found the schema might be adjusted a little even if users specify the schema. For example, the nullability could be changed : https://github.com/apache/spark/blob/64529b186a1c33740067cc7639d630bc5b9ae6e8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L407 I think we should make such a change but maybe we should test and log it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73965912 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns +} else { + val res = dataSource match { +case r: HadoopFsRelation => r.partitionSchema.fieldNames +case _ => Array.empty[String] + } + if (userSpecifiedPartitionColumns.length > 0) { --- End diff -- Here, I just keep the existing behavior. To be honest, I think we should throw an exception whenever it makes sense. It sounds like the job log is not being read by most users. Will submit a follow-up PR to make it a change. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73941472 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns +} else { + val res = dataSource match { +case r: HadoopFsRelation => r.partitionSchema.fieldNames +case _ => Array.empty[String] + } + if (userSpecifiedPartitionColumns.length > 0) { +// The table does not have a specified schema, which means that the schema will be inferred +// when we load the table. So, we are not expecting partition columns and we will discover +// partitions when we load the table. However, if there are specified partition columns, +// we simply ignore them and provide a warning message. +logWarning( + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table $tableIdent are inferred. " + +s"Schema: ${dataSource.schema.simpleString}; " + +s"Partition columns: ${res.mkString("(", ", ", ")")}") + } + res +} CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, - userSpecifiedSchema = userSpecifiedSchema, + schema = dataSource.schema, --- End diff -- I think from the code, it is not very clear that `dataSource.schema` will be `userSpecifiedSchema`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73940895 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,31 +521,29 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties -if (props.isDefinedAt(DATASOURCE_SCHEMA)) { +props.get(DATASOURCE_SCHEMA).map { schema => // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType]) -} else { - metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => + DataType.fromJson(schema).asInstanceOf[StructType] +} getOrElse { --- End diff -- I am not sure if `getOrElse` makes the code easier to follow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73940848 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,31 +521,29 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties -if (props.isDefinedAt(DATASOURCE_SCHEMA)) { +props.get(DATASOURCE_SCHEMA).map { schema => // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType]) -} else { - metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => + DataType.fromJson(schema).asInstanceOf[StructType] +} getOrElse { + props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => val parts = (0 until numParts.toInt).map { index => val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull if (part == null) { -throw new AnalysisException( - "Could not read schema from the metastore because it is corrupted " + -s"(missing part $index of the schema, $numParts parts are expected).") +throw new AnalysisException(msgSchemaCorrupted + + s" (missing part $index of the schema, $numParts parts are expected).") } - part } // Stick all parts back to a single schema string. DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } + } getOrElse(throw new AnalysisException(msgSchemaCorrupted)) --- End diff -- ah, this `getOrElse` is too far from the `get(DATASOURCE_SCHEMA)`... Actually, I prefer the `if/else`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73940468 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns +} else { + val res = dataSource match { +case r: HadoopFsRelation => r.partitionSchema.fieldNames +case _ => Array.empty[String] + } + if (userSpecifiedPartitionColumns.length > 0) { +// The table does not have a specified schema, which means that the schema will be inferred +// when we load the table. So, we are not expecting partition columns and we will discover +// partitions when we load the table. However, if there are specified partition columns, +// we simply ignore them and provide a warning message. +logWarning( + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table $tableIdent are inferred. " + +s"Schema: ${dataSource.schema.simpleString}; " + +s"Partition columns: ${res.mkString("(", ", ", ")")}") + } + res +} CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, - userSpecifiedSchema = userSpecifiedSchema, + schema = dataSource.schema, --- End diff -- seems we should still use the user-specified schema, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r73940350 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns +} else { + val res = dataSource match { +case r: HadoopFsRelation => r.partitionSchema.fieldNames +case _ => Array.empty[String] + } + if (userSpecifiedPartitionColumns.length > 0) { --- End diff -- Should we throw an exception for this case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14207 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r72385549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,209 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { --- End diff -- Sure, will do. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r72378623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,209 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { --- End diff -- how about we pass in the expected schema and partCols, and do the check in this method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71633897 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,222 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { +var tableSchema = StructType(Nil) +var partCols = Seq.empty[String] + +val tabName = "tab1" +withTable(tabName) { + val partitionClause = +userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( +s""" + |CREATE TABLE $tabName $schemaClause + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) +} +(tableSchema, partCols) + } + + test("Create partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("str", StringType, nullable = true) :: +StructField("num", IntegerType, nullable = true) :: Nil)) +assert(partCols == Seq("num")) + } +} + } + + test("Create partitioned data source table with user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("num") +// Case 2: without schema and partitioning columns: None +Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = Option("num int, str string"), +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: +StructField("str", StringType, nullable = true) :: Nil)) +assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + } +} + } + + test("Create non-partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToNonPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: +StructField("str", StringType, nullable = true) :: Nil)) +assert(partCols.isEmpty) + } +} + } + + test("Create non-partitioned data source table with user specified schema") { +import testImplicits._
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71633776 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,222 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { +var tableSchema = StructType(Nil) +var partCols = Seq.empty[String] + +val tabName = "tab1" +withTable(tabName) { + val partitionClause = +userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( +s""" + |CREATE TABLE $tabName $schemaClause + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) +} +(tableSchema, partCols) + } + + test("Create partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("str", StringType, nullable = true) :: --- End diff -- Sure, will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71633259 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,222 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { +var tableSchema = StructType(Nil) +var partCols = Seq.empty[String] + +val tabName = "tab1" +withTable(tabName) { + val partitionClause = +userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( +s""" + |CREATE TABLE $tabName $schemaClause + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) +} +(tableSchema, partCols) + } + + test("Create partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("str", StringType, nullable = true) :: +StructField("num", IntegerType, nullable = true) :: Nil)) +assert(partCols == Seq("num")) + } +} + } + + test("Create partitioned data source table with user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("num") +// Case 2: without schema and partitioning columns: None +Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = Option("num int, str string"), +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: +StructField("str", StringType, nullable = true) :: Nil)) +assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + } +} + } + + test("Create non-partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToNonPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: +StructField("str", StringType, nullable = true) :: Nil)) +assert(partCols.isEmpty) + } +} + } + + test("Create non-partitioned data source table with user specified schema") { +import testImplicits._
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71632699 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,222 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { +var tableSchema = StructType(Nil) +var partCols = Seq.empty[String] + +val tabName = "tab1" +withTable(tabName) { + val partitionClause = +userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( +s""" + |CREATE TABLE $tabName $schemaClause + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) +} +(tableSchema, partCols) + } + + test("Create partitioned data source table without user specified schema") { +import testImplicits._ +val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + +// Case 1: with partitioning columns but no schema: Option("inexistentColumns") +// Case 2: without schema and partitioning columns: None +Seq(Option("inexistentColumns"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => +df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) +val (tableSchema, partCols) = + createDataSourceTable( +pathToPartitionedTable, +userSpecifiedSchema = None, +userSpecifiedPartitionCols = partitionCols) +assert(tableSchema == + StructType(StructField("str", StringType, nullable = true) :: --- End diff -- nit: `new StructType().add...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71632333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,222 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def createDataSourceTable( + path: File, + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { --- End diff -- nit: rename it to `userSpecifiedPartitionCol`, or declare it as `Seq[String]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71474538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -316,27 +340,25 @@ object CreateDataSourceTableUtils extends Logging { tableProperties.put(DATASOURCE_PROVIDER, provider) // Saves optional user specified schema. Serialized JSON schema string may be too long to be --- End diff -- Yeah, will correct it. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71474127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -316,27 +340,25 @@ object CreateDataSourceTableUtils extends Logging { tableProperties.put(DATASOURCE_PROVIDER, provider) // Saves optional user specified schema. Serialized JSON schema string may be too long to be --- End diff -- I think this comment is not correct anymore? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71472473 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,41 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = --- End diff -- Sure, will do it. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71471942 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,165 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create partitioned data source table with partitioning columns but no schema") { +import testImplicits._ + +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + val tabName = "tab1" + withTable(tabName) { +spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$pathToPartitionedTable' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) +val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) --- End diff -- Sure, will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71471136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,41 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: BaseRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath).resolveRelation(checkPathExist = false) + +val partitionColumns = --- End diff -- IIUC, the logic should be: if schema is specified, use the given partition columns, else, infer it. Maybe it's more clear to write: ``` val partitionColumns = if (userSpecifiedSchema.isEmpty) { if (userSpecifiedPartitionColumns.length > 0) { ... } dataSource match { ... } } else { userSpecifiedPartitionColumns } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71470908 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -522,31 +522,31 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties if (props.isDefinedAt(DATASOURCE_SCHEMA)) { --- End diff -- Sure, let me change it. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71470907 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -522,31 +522,31 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties if (props.isDefinedAt(DATASOURCE_SCHEMA)) { --- End diff -- Sure, let me change it. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71470616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,165 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create partitioned data source table with partitioning columns but no schema") { +import testImplicits._ + +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + val tabName = "tab1" + withTable(tabName) { +spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$pathToPartitionedTable' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) +val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) --- End diff -- we can abstract common logic into some methods, to remove duplicated code a bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71470370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -522,31 +522,31 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) +val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties if (props.isDefinedAt(DATASOURCE_SCHEMA)) { --- End diff -- how about ``` props.get(DATASOURCE_SCHEMA).map { schema => // DataType.fromJson(schema).asInstanceOf[StructType] }.getOrElse { props.get(DATASOURCE_SCHEMA_NUMPARTS).map { }.getOrElse(throw ...) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71460325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { +if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { --- End diff -- Sure, will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71460333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { +import testImplicits._ + +val tabName = "tab1" +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => +withTable(tabName) { + spark.sql( +s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val catalog = spark.sessionState.catalog + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + + if (tableMetadata.storage.serdeProperties.get("path") == --- End diff -- Ok, no problem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71458430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { +if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { --- End diff -- Can we make `DDLUtils.getSchemaFromTableProperties` always return a schema and throw exception if it's corrupted? I think it's more consistent with the previous behaviour, i.e. throw exception if the expected schema properties doesn't exist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71457330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { +if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { --- End diff -- Now, the message is changed to `"# Schema of this table is corrupted"` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71457323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { +import testImplicits._ + +val tabName = "tab1" +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => +withTable(tabName) { + spark.sql( +s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val catalog = spark.sessionState.catalog + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + + if (tableMetadata.storage.serdeProperties.get("path") == --- End diff -- hmmm, can we separate it into 2 cases instead of doing `Seq(...).foreach`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71456601 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { +if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { --- End diff -- For all types of data source tables, we store the schema in the table properties. Thus, we should not return None; unless the table properties are modified by users using the `Alter Table` command. Sorry, forgot to update the message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71456428 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,38 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: HadoopFsRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath) +.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] --- End diff -- Sure, will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71456320 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { +import testImplicits._ + +val tabName = "tab1" +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => +withTable(tabName) { + spark.sql( +s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val catalog = spark.sessionState.catalog + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + + if (tableMetadata.storage.serdeProperties.get("path") == --- End diff -- In this test case, it verifies two scenarios. One is the path to a partitioned table (i.e., `pathToPartitionedTable`); another is the path to a non-partitioned table (i.e., `pathToNonPartitionedTable`) . This condition is just to check which path is being used. If the path points to `pathToNonPartitionedTable`, it will return false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71453615 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { +import testImplicits._ + +val tabName = "tab1" +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => +withTable(tabName) { + spark.sql( +s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val catalog = spark.sessionState.catalog + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + + if (tableMetadata.storage.serdeProperties.get("path") == --- End diff -- how could this condition be false? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71453539 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { +import testImplicits._ + +val tabName = "tab1" +withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => +withTable(tabName) { + spark.sql( +s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) --- End diff -- this doesn't fail? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71453356 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,38 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: HadoopFsRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath) +.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] --- End diff -- I think a safer way is to do a pattern match here, if it's `HadoopFsRelation`, get it's partition columns, else, no partition columns --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71453222 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -95,17 +95,38 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. -DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) +val dataSource: HadoopFsRelation = + DataSource( +sparkSession = sparkSession, +userSpecifiedSchema = userSpecifiedSchema, +className = provider, +bucketSpec = None, +options = optionsWithPath) +.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] --- End diff -- is it safe to cast it `HadoopFsRelation` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71453136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { +if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { --- End diff -- Now `getSchemaFromTableProperties` should never return None? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] Store the Inferred Schemas in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71452979 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand( userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], -partitionColumns: Array[String], +userSpecifiedPartitionColumns: Array[String], --- End diff -- use `Option[Array[String]]`? Or we should make `userSpecifiedSchema` a `StructType`, and use `length == 0` to indicate if there is no user specified schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org