This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ea547e5681a [HUDI-6219] Ensure consistency between Spark catalog schema and Hudi schema (#8725) ea547e5681a is described below commit ea547e5681a007e546b8ca8cb1399da0a4cd5012 Author: Wechar Yu <yuwq1...@gmail.com> AuthorDate: Wed May 24 21:53:11 2023 +0800 [HUDI-6219] Ensure consistency between Spark catalog schema and Hudi schema (#8725) --- .../apache/spark/sql/avro/SchemaConverters.scala | 2 +- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 34 ++++++++++++---------- .../apache/spark/sql/hudi/TestCreateTable.scala | 23 ++++++++++++++- .../hudi/TestNestedSchemaPruningOptimization.scala | 4 +-- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index c178d1b8491..b4e09f6d1f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -168,7 +168,7 @@ private[sql] object SchemaConverters { case FloatType => builder.floatType() case DoubleType => builder.doubleType() - case StringType => builder.stringType() + case StringType | CharType(_) | VarcharType(_) => builder.stringType() case NullType => builder.nullType() case d: DecimalType => val avroType = LogicalTypes.decimal(d.precision, d.scale) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index b8d93fa51e1..a329a943969 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -130,9 +130,20 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name() /** - * Table schema + * Firstly try to load table schema from meta directory on filesystem. + * If that fails then fallback to retrieving it from the Spark catalog. */ - lazy val tableSchema: StructType = table.schema + lazy val tableSchema: StructType = { + val schemaFromMetaOpt = loadTableSchemaByMetaClient() + if (schemaFromMetaOpt.nonEmpty) { + schemaFromMetaOpt.get + } else if (table.schema.nonEmpty) { + addMetaFields(table.schema) + } else { + throw new AnalysisException( + s"$catalogTableName does not contains schema fields.") + } + } /** * The schema without hoodie meta fields @@ -237,16 +248,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++ mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig - val schemaFromMetaOpt = loadTableSchemaByMetaClient() - val schema = if (schemaFromMetaOpt.nonEmpty) { - schemaFromMetaOpt.get - } else if (table.schema.nonEmpty) { - addMetaFields(table.schema) - } else { - throw new AnalysisException( - s"Missing schema fields when applying CREATE TABLE clause for ${catalogTableName}") - } - (schema, options) + (tableSchema, options) case (_, false) => checkArgument(table.schema.nonEmpty, @@ -314,7 +316,9 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten getTableSqlSchema(metaClient, includeMetadataFields = true).map(originSchema => { // Load table schema from meta on filesystem, and fill in 'comment' // information from Spark catalog. - val fields = originSchema.fields.map { f => + // Hoodie newly added columns are positioned after partition columns, + // so it's necessary to reorder fields. + val (partFields, dataFields) = originSchema.fields.map { f => val nullableField: StructField = f.copy(nullable = true) val catalogField = findColumnByName(table.schema, nullableField.name, resolver) if (catalogField.isDefined) { @@ -322,8 +326,8 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten } else { nullableField } - } - StructType(fields) + }.partition(f => partitionFields.contains(f.name)) + StructType(dataFields ++ partFields) }) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 371ac5d97a1..ff474e7872c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -27,7 +27,7 @@ import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} @@ -1082,4 +1082,25 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } assertTrue(exception.getMessage.contains(s"""$tableName is not a Hudi table""")) } + + test("Test hoodie table schema consistency for non-Avro data types") { + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id tinyint, + | name varchar(10), + | price double, + | ts long + | ) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + val hoodieCatalogTable = new HoodieCatalogTable(spark, table) + val hoodieSchema = HoodieSqlCommonUtils.getTableSqlSchema(hoodieCatalogTable.metaClient, true) + assertResult(hoodieSchema.get)(table.schema) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala index 4f758a9e4f7..f8fe24b2174 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala @@ -53,8 +53,8 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName") val expectedSchema = StructType(Seq( - StructField("id", IntegerType, nullable = false), - StructField("item" , StructType(Seq(StructField("name", StringType, nullable = false))), nullable = false) + StructField("id", IntegerType, nullable = true), + StructField("item" , StructType(Seq(StructField("name", StringType, nullable = false))), nullable = true) )) val expectedReadSchemaClause = "ReadSchema: struct<id:int,item:struct<name:string>>"