This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new aa5c72f  [SPARK-35359][SQL] Insert data with char/varchar datatype 
will fail when data length exceed length limitation
aa5c72f is described below

commit aa5c72f8caa5d63a92dcd28fcb263682a3f0e250
Author: fhygh <283452...@qq.com>
AuthorDate: Tue May 18 00:13:40 2021 +0800

    [SPARK-35359][SQL] Insert data with char/varchar datatype will fail when 
data length exceed length limitation
    
    ### What changes were proposed in this pull request?
    This PR is used to fix this bug:
    
    ```
    set spark.sql.legacy.charVarcharAsString=true;
    create table chartb01(a char(3));
    insert into chartb01 select 'aaaaa';
    ```
    
    here we expect the data of table chartb01 is 'aaa', but it runs failed.
    
    ### Why are the changes needed?
    Improve backward compatibility
    
    ```
    spark-sql>
             > create table tchar01(col char(2)) using parquet;
    Time taken: 0.767 seconds
    spark-sql>
             > insert into tchar01 select 'aaa';
    ERROR | Executor task launch worker for task 0.0 in stage 0.0 (TID 0) | 
Aborting task | org.apache.spark.util.Utils.logError(Logging.scala:94)
    java.lang.RuntimeException: Exceeds char/varchar type length limitation: 2
            at 
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:31)
            at 
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:44)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
            at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:279)
            at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1500)
            at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:288)
            at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:212)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:131)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1466)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No (the legacy config is false by default).
    
    ### How was this patch tested?
    Added unit tests.
    
    Closes #32501 from fhygh/master.
    
    Authored-by: fhygh <283452...@qq.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 3a3f8ca6f421b9bc51e0059c954262489aa41f5d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/analysis/TableOutputResolver.scala    |  6 +++-
 .../apache/spark/sql/util/PartitioningUtils.scala  | 36 ++++++++++++----------
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 12 ++++++++
 3 files changed, 36 insertions(+), 18 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index d5c407b..32bdb82 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -100,7 +100,11 @@ object TableOutputResolver {
         case _ =>
           Cast(queryExpr, tableAttr.dataType, 
Option(conf.sessionLocalTimeZone))
       }
-      val exprWithStrLenCheck = CharVarcharUtils.stringLengthCheck(casted, 
tableAttr)
+      val exprWithStrLenCheck = if (conf.charVarcharAsString) {
+        casted
+      } else {
+        CharVarcharUtils.stringLengthCheck(casted, tableAttr)
+      }
       // Renaming is needed for handling the following cases like
       // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 
SELECT 1, 2
       // 2) Target tables have column metadata
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
index 0047254..cf30c71 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
@@ -23,6 +23,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
 import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{CharType, StructType, VarcharType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -44,23 +45,24 @@ private[sql] object PartitioningUtils {
         throw new AnalysisException(s"$key is not a valid partition column in 
table $tblName.")
       }
 
-      val normalizedVal = normalizedFiled.dataType match {
-        case CharType(len) if value != null && value != DEFAULT_PARTITION_NAME 
=>
-          val v = value match {
-            case Some(str: String) => Some(charTypeWriteSideCheck(str, len))
-            case str: String => charTypeWriteSideCheck(str, len)
-            case other => other
-          }
-          v.asInstanceOf[T]
-        case VarcharType(len) if value != null && value != 
DEFAULT_PARTITION_NAME =>
-          val v = value match {
-            case Some(str: String) => Some(varcharTypeWriteSideCheck(str, len))
-            case str: String => varcharTypeWriteSideCheck(str, len)
-            case other => other
-          }
-          v.asInstanceOf[T]
-        case _ => value
-      }
+      val normalizedVal =
+        if (SQLConf.get.charVarcharAsString) value else 
normalizedFiled.dataType match {
+          case CharType(len) if value != null && value != 
DEFAULT_PARTITION_NAME =>
+            val v = value match {
+              case Some(str: String) => Some(charTypeWriteSideCheck(str, len))
+              case str: String => charTypeWriteSideCheck(str, len)
+              case other => other
+            }
+            v.asInstanceOf[T]
+          case VarcharType(len) if value != null && value != 
DEFAULT_PARTITION_NAME =>
+            val v = value match {
+              case Some(str: String) => Some(varcharTypeWriteSideCheck(str, 
len))
+              case str: String => varcharTypeWriteSideCheck(str, len)
+              case other => other
+            }
+            v.asInstanceOf[T]
+          case _ => value
+        }
       normalizedFiled.name -> normalizedVal
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 76f7f42..70e6984 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -806,6 +806,18 @@ class FileSourceCharVarcharTestSuite extends 
CharVarcharTestSuite with SharedSpa
     }
   }
 
+  test("SPARK-35359: create table and insert data over length values") {
+    Seq("char", "varchar").foreach { typ =>
+      withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
+        withTable("t") {
+          sql(s"CREATE TABLE t (col $typ(2)) using $format")
+          sql("INSERT INTO t SELECT 'aaa'")
+          checkAnswer(sql("select * from t"), Row("aaa"))
+        }
+      }
+    }
+  }
+
   test("alter table set location w/ fit length values") {
     Seq("char", "varchar").foreach { typ =>
       withTempPath { dir =>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to