This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3e079b  [SPARK-36271][SQL] Unify V1 insert check field name before 
prepare writter
f3e079b is described below

commit f3e079b09b5877c97fc10864937e76d866935880
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Mon Aug 9 17:18:06 2021 +0800

    [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter
    
    ### What changes were proposed in this pull request?
    Unify DataSource V1 insert schema check field name before prepare writer.
    And in this PR we add check for avro V1 insert too.
    
    ### Why are the changes needed?
    Unify code and add check for avro V1 insert too.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #33566 from AngersZhuuuu/SPARK-36271.
    
    Authored-by: Angerszhuuuu <angers....@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 22 ++++++++++++++++++++++
 .../execution/datasources/DataSourceUtils.scala    |  1 +
 .../datasources/parquet/ParquetWriteSupport.scala  |  1 -
 .../sql/execution/datasources/v2/FileWrite.scala   |  1 +
 .../datasources/v2/parquet/ParquetWrite.scala      |  1 +
 .../spark/sql/FileBasedDataSourceSuite.scala       | 22 ++++++++++++++++++++++
 6 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 43ba20f..510ddfc 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2194,6 +2194,28 @@ class AvroV1Suite extends AvroSuite {
     super
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "avro")
+
+  test("SPARK-36271: V1 insert should check schema field name too") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite)
+            .format("avro").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains 
invalid character(s)."))
+      }
+
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS 
col1 FROM v")
+            .write.mode(SaveMode.Overwrite)
+            .format("avro").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains 
invalid character(s)."))
+      }
+    }
+  }
 }
 
 class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index b562d44..fcd95a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -84,6 +84,7 @@ object DataSourceUtils {
         throw 
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(format.toString, 
field)
       }
     }
+    checkFieldNames(format, schema)
   }
 
   // SPARK-24626: Metadata files and temporary files should not be
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 20f69e8..d0cd02f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -482,7 +482,6 @@ object ParquetWriteSupport {
   val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
 
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
-    ParquetSchemaConverter.checkFieldNames(schema)
     configuration.set(SPARK_ROW_SCHEMA, schema.json)
     configuration.setIfUnset(
       ParquetOutputFormat.WRITER_VERSION,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index 9c4bc78..7f66a09 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -93,6 +93,7 @@ trait FileWrite extends Write {
       s"when inserting into $pathName", caseSensitiveAnalysis)
     DataSource.validateSchema(schema)
 
+    // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
     schema.foreach { field =>
       if (!supportsDataType(field.dataType)) {
         throw 
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
index 0316d91..b2b6d31 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
@@ -72,6 +72,7 @@ case class ParquetWrite(
 
     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
+    ParquetSchemaConverter.checkFieldNames(dataSchema)
     // This metadata is useful for keeping UDTs like Vector/Matrix.
     ParquetWriteSupport.setSchema(dataSchema, conf)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index c71f667..2436392 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -967,6 +967,28 @@ class FileBasedDataSourceSuite extends QueryTest
       checkAnswer(df, Row("v1", "v2"))
     }
   }
+
+  test("SPARK-36271: V1 insert should check schema field name too") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite)
+            .format("parquet").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains 
invalid character(s)."))
+      }
+
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS 
col1 FROM v")
+            .write.mode(SaveMode.Overwrite)
+            .format("parquet").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains 
invalid character(s)."))
+      }
+    }
+  }
 }
 
 object TestingUDT {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to