This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ea547e5681a [HUDI-6219] Ensure consistency between Spark catalog 
schema and Hudi schema (#8725)
ea547e5681a is described below

commit ea547e5681a007e546b8ca8cb1399da0a4cd5012
Author: Wechar Yu <yuwq1...@gmail.com>
AuthorDate: Wed May 24 21:53:11 2023 +0800

    [HUDI-6219] Ensure consistency between Spark catalog schema and Hudi schema 
(#8725)
---
 .../apache/spark/sql/avro/SchemaConverters.scala   |  2 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  | 34 ++++++++++++----------
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 23 ++++++++++++++-
 .../hudi/TestNestedSchemaPruningOptimization.scala |  4 +--
 4 files changed, 44 insertions(+), 19 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index c178d1b8491..b4e09f6d1f6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -168,7 +168,7 @@ private[sql] object SchemaConverters {
 
       case FloatType => builder.floatType()
       case DoubleType => builder.doubleType()
-      case StringType => builder.stringType()
+      case StringType | CharType(_) | VarcharType(_) => builder.stringType()
       case NullType => builder.nullType()
       case d: DecimalType =>
         val avroType = LogicalTypes.decimal(d.precision, d.scale)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index b8d93fa51e1..a329a943969 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -130,9 +130,20 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
   lazy val baseFileFormat: String = 
metaClient.getTableConfig.getBaseFileFormat.name()
 
   /**
-   * Table schema
+   * Firstly try to load table schema from meta directory on filesystem.
+   * If that fails then fallback to retrieving it from the Spark catalog.
    */
-  lazy val tableSchema: StructType = table.schema
+  lazy val tableSchema: StructType = {
+    val schemaFromMetaOpt = loadTableSchemaByMetaClient()
+    if (schemaFromMetaOpt.nonEmpty) {
+      schemaFromMetaOpt.get
+    } else if (table.schema.nonEmpty) {
+      addMetaFields(table.schema)
+    } else {
+      throw new AnalysisException(
+        s"$catalogTableName does not contains schema fields.")
+    }
+  }
 
   /**
    * The schema without hoodie meta fields
@@ -237,16 +248,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
         val options = extraTableConfig(hoodieTableExists, currentTableConfig) 
++
           mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig
 
-        val schemaFromMetaOpt = loadTableSchemaByMetaClient()
-        val schema = if (schemaFromMetaOpt.nonEmpty) {
-          schemaFromMetaOpt.get
-        } else if (table.schema.nonEmpty) {
-          addMetaFields(table.schema)
-        } else {
-          throw new AnalysisException(
-            s"Missing schema fields when applying CREATE TABLE clause for 
${catalogTableName}")
-        }
-        (schema, options)
+        (tableSchema, options)
 
       case (_, false) =>
         checkArgument(table.schema.nonEmpty,
@@ -314,7 +316,9 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
     getTableSqlSchema(metaClient, includeMetadataFields = 
true).map(originSchema => {
       // Load table schema from meta on filesystem, and fill in 'comment'
       // information from Spark catalog.
-      val fields = originSchema.fields.map { f =>
+      // Hoodie newly added columns are positioned after partition columns,
+      // so it's necessary to reorder fields.
+      val (partFields, dataFields) = originSchema.fields.map { f =>
         val nullableField: StructField = f.copy(nullable = true)
         val catalogField = findColumnByName(table.schema, nullableField.name, 
resolver)
         if (catalogField.isDefined) {
@@ -322,8 +326,8 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
         } else {
           nullableField
         }
-      }
-      StructType(fields)
+      }.partition(f => partitionFields.contains(f.name))
+      StructType(dataFields ++ partFields)
     })
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 371ac5d97a1..ff474e7872c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -27,7 +27,7 @@ import 
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
 import org.apache.hudi.keygen.SimpleKeyGenerator
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
HoodieCatalogTable}
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
@@ -1082,4 +1082,25 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
     assertTrue(exception.getMessage.contains(s"""$tableName is not a Hudi 
table"""))
   }
+
+  test("Test hoodie table schema consistency for non-Avro data types") {
+    val tableName = generateTableName
+    spark.sql(
+      s"""
+         | create table $tableName (
+         |  id tinyint,
+         |  name varchar(10),
+         |  price double,
+         |  ts long
+         | ) using hudi
+         | tblproperties (
+         |   primaryKey = 'id',
+         |   preCombineField = 'ts'
+         | )
+       """.stripMargin)
+    val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+    val hoodieCatalogTable = new HoodieCatalogTable(spark, table)
+    val hoodieSchema = 
HoodieSqlCommonUtils.getTableSqlSchema(hoodieCatalogTable.metaClient, true)
+    assertResult(hoodieSchema.get)(table.schema)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
index 4f758a9e4f7..f8fe24b2174 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
@@ -53,8 +53,8 @@ class TestNestedSchemaPruningOptimization extends 
HoodieSparkSqlTestBase with Sp
           val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName")
 
           val expectedSchema = StructType(Seq(
-            StructField("id", IntegerType, nullable = false),
-            StructField("item" , StructType(Seq(StructField("name", 
StringType, nullable = false))), nullable = false)
+            StructField("id", IntegerType, nullable = true),
+            StructField("item" , StructType(Seq(StructField("name", 
StringType, nullable = false))), nullable = true)
           ))
 
           val expectedReadSchemaClause = "ReadSchema: 
struct<id:int,item:struct<name:string>>"

Reply via email to