This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new aa5c72f [SPARK-35359][SQL] Insert data with char/varchar datatype will fail when data length exceed length limitation aa5c72f is described below commit aa5c72f8caa5d63a92dcd28fcb263682a3f0e250 Author: fhygh <283452...@qq.com> AuthorDate: Tue May 18 00:13:40 2021 +0800 [SPARK-35359][SQL] Insert data with char/varchar datatype will fail when data length exceed length limitation ### What changes were proposed in this pull request? This PR is used to fix this bug: ``` set spark.sql.legacy.charVarcharAsString=true; create table chartb01(a char(3)); insert into chartb01 select 'aaaaa'; ``` here we expect the data of table chartb01 is 'aaa', but it runs failed. ### Why are the changes needed? Improve backward compatibility ``` spark-sql> > create table tchar01(col char(2)) using parquet; Time taken: 0.767 seconds spark-sql> > insert into tchar01 select 'aaa'; ERROR | Executor task launch worker for task 0.0 in stage 0.0 (TID 0) | Aborting task | org.apache.spark.util.Utils.logError(Logging.scala:94) java.lang.RuntimeException: Exceeds char/varchar type length limitation: 2 at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:31) at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:44) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:279) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1500) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:288) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:212) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1466) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No (the legacy config is false by default). ### How was this patch tested? Added unit tests. Closes #32501 from fhygh/master. Authored-by: fhygh <283452...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 3a3f8ca6f421b9bc51e0059c954262489aa41f5d) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/TableOutputResolver.scala | 6 +++- .../apache/spark/sql/util/PartitioningUtils.scala | 36 ++++++++++++---------- .../apache/spark/sql/CharVarcharTestSuite.scala | 12 ++++++++ 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index d5c407b..32bdb82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -100,7 +100,11 @@ object TableOutputResolver { case _ => Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)) } - val exprWithStrLenCheck = CharVarcharUtils.stringLengthCheck(casted, tableAttr) + val exprWithStrLenCheck = if (conf.charVarcharAsString) { + casted + } else { + CharVarcharUtils.stringLengthCheck(casted, tableAttr) + } // Renaming is needed for handling the following cases like // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2 // 2) Target tables have column metadata diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 0047254..cf30c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{CharType, StructType, VarcharType} import org.apache.spark.unsafe.types.UTF8String @@ -44,23 +45,24 @@ private[sql] object PartitioningUtils { throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") } - val normalizedVal = normalizedFiled.dataType match { - case CharType(len) if value != null && value != DEFAULT_PARTITION_NAME => - val v = value match { - case Some(str: String) => Some(charTypeWriteSideCheck(str, len)) - case str: String => charTypeWriteSideCheck(str, len) - case other => other - } - v.asInstanceOf[T] - case VarcharType(len) if value != null && value != DEFAULT_PARTITION_NAME => - val v = value match { - case Some(str: String) => Some(varcharTypeWriteSideCheck(str, len)) - case str: String => varcharTypeWriteSideCheck(str, len) - case other => other - } - v.asInstanceOf[T] - case _ => value - } + val normalizedVal = + if (SQLConf.get.charVarcharAsString) value else normalizedFiled.dataType match { + case CharType(len) if value != null && value != DEFAULT_PARTITION_NAME => + val v = value match { + case Some(str: String) => Some(charTypeWriteSideCheck(str, len)) + case str: String => charTypeWriteSideCheck(str, len) + case other => other + } + v.asInstanceOf[T] + case VarcharType(len) if value != null && value != DEFAULT_PARTITION_NAME => + val v = value match { + case Some(str: String) => Some(varcharTypeWriteSideCheck(str, len)) + case str: String => varcharTypeWriteSideCheck(str, len) + case other => other + } + v.asInstanceOf[T] + case _ => value + } normalizedFiled.name -> normalizedVal } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 76f7f42..70e6984 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -806,6 +806,18 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa } } + test("SPARK-35359: create table and insert data over length values") { + Seq("char", "varchar").foreach { typ => + withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { + withTable("t") { + sql(s"CREATE TABLE t (col $typ(2)) using $format") + sql("INSERT INTO t SELECT 'aaa'") + checkAnswer(sql("select * from t"), Row("aaa")) + } + } + } + } + test("alter table set location w/ fit length values") { Seq("char", "varchar").foreach { typ => withTempPath { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org