This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 0f2e3ecb994 [SPARK-35912][SQL][FOLLOW-UP] Add a legacy configuration for respecting nullability in DataFrame.schema.csv/json(ds) 0f2e3ecb994 is described below commit 0f2e3ecb9943aec91204c168b6402f3e5de53ca2 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Thu May 5 16:23:28 2022 +0900 [SPARK-35912][SQL][FOLLOW-UP] Add a legacy configuration for respecting nullability in DataFrame.schema.csv/json(ds) ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/33436, that adds a legacy configuration. It's found that it can break a valid usacase (https://github.com/apache/spark/pull/33436/files#r863271189): ```scala import org.apache.spark.sql.types._ val ds = Seq("a,", "a,b").toDS spark.read.schema( StructType( StructField("f1", StringType, nullable = false) :: StructField("f2", StringType, nullable = false) :: Nil) ).option("mode", "DROPMALFORMED").csv(ds).show() ``` **Before:** ``` +---+---+ | f1| f2| +---+---+ | a| b| +---+---+ ``` **After:** ``` +---+----+ | f1| f2| +---+----+ | a|null| | a| b| +---+----+ ``` This PR adds a configuration to restore **Before** behaviour. ### Why are the changes needed? To avoid breakage of valid usecases. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new configuration `spark.sql.legacy.respectNullabilityInTextDatasetConversion` (`false` by default) to respect the nullability in `DataFrameReader.schema(schema).csv(dataset)` and `DataFrameReader.schema(schema).json(dataset)` when the user-specified schema is provided. ### How was this patch tested? Unittests were added. Closes #36435 from HyukjinKwon/SPARK-35912. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 6689b97ec76abe5bab27f02869f8f16b32530d1a) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- docs/sql-migration-guide.md | 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 13 +++++++++++-- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 10 ++++++++++ .../spark/sql/execution/datasources/json/JsonSuite.scala | 14 +++++++++++++- 5 files changed, 46 insertions(+), 4 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index b6bfb0ed2be..a7757d6c9a0 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -30,7 +30,7 @@ license: | - Since Spark 3.3, the functions `lpad` and `rpad` have been overloaded to support byte sequences. When the first argument is a byte sequence, the optional padding pattern must also be a byte sequence and the result is a BINARY value. The default padding pattern in this case is the zero byte. To restore the legacy behavior of always returning string types, set `spark.sql.legacy.lpadRpadAlwaysReturnString` to `true`. - - Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields. + - Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields. To restore the legacy behavior of respecting the nullability, set `spark.sql.legacy.respectNullabilityInTextDatasetConversion` to `true`. - Since Spark 3.3, when the date or timestamp pattern is not specified, Spark converts an input string to a date/timestamp using the `CAST` expression approach. The changes affect CSV/JSON datasources and parsing of partition values. In Spark 3.2 or earlier, when the date or timestamp pattern is not set, Spark uses the default patterns: `yyyy-MM-dd` for dates and `yyyy-MM-dd HH:mm:ss` for timestamps. After the changes, Spark still recognizes the pattern together with diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 76f3d1f5a84..b6230f71383 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2983,6 +2983,17 @@ object SQLConf { .intConf .createOptional + val LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION = + buildConf("spark.sql.legacy.respectNullabilityInTextDatasetConversion") + .internal() + .doc("When true, the nullability in the user-specified schema for " + + "`DataFrameReader.schema(schema).json(jsonDataset)` and " + + "`DataFrameReader.schema(schema).csv(csvDataset)` is respected. Otherwise, they are " + + "turned to a nullable schema forcibly.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled") .doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " + "displayed if and only if the REPL supports the eager evaluation. Currently, the " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cab0ea2b30a..344f40eef45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String @@ -406,7 +407,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { sparkSession.sessionState.conf.columnNameOfCorruptRecord) userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _)) - val schema = userSpecifiedSchema.map(_.asNullable).getOrElse { + val schema = userSpecifiedSchema.map { + case s if !SQLConf.get.getConf( + SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable + case other => other + }.getOrElse { TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions) } @@ -478,7 +483,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { None } - val schema = userSpecifiedSchema.map(_.asNullable).getOrElse { + val schema = userSpecifiedSchema.map { + case s if !SQLConf.get.getConf( + SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable + case other => other + }.getOrElse { TextInputCSVDataSource.inferFromDataset( sparkSession, csvDataset, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7cbe6ed9fce..dd42f48d716 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2693,6 +2693,16 @@ abstract class CSVSuite assert(df.schema == expected) checkAnswer(df, Row(1, null) :: Nil) } + + withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") { + checkAnswer( + spark.read.schema( + StructType( + StructField("f1", StringType, nullable = false) :: + StructField("f2", StringType, nullable = false) :: Nil) + ).option("mode", "DROPMALFORMED").csv(Seq("a,", "a,b").toDS), + Row("a", "b")) + } } test("SPARK-36536: use casting when datetime pattern is not set") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0897ad2ff30..bc7c6e56ece 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3165,7 +3165,7 @@ abstract class JsonSuite Seq(missingFieldInput, nullValueInput).foreach { jsonString => Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode => val json = spark.createDataset( - spark.sparkContext.parallelize(jsonString:: Nil))(Encoders.STRING) + spark.sparkContext.parallelize(jsonString :: Nil))(Encoders.STRING) val df = spark.read .option("mode", mode) .schema(schema) @@ -3174,6 +3174,18 @@ abstract class JsonSuite checkAnswer(df, Row(1, null) :: Nil) } } + + withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") { + checkAnswer( + spark.read.schema( + StructType( + StructField("f1", LongType, nullable = false) :: + StructField("f2", LongType, nullable = false) :: Nil) + ).option("mode", "DROPMALFORMED").json(Seq("""{"f1": 1}""").toDS), + // It is for testing legacy configuration. This is technically a bug as + // `0` has to be `null` but the schema is non-nullable. + Row(1, 0)) + } } test("SPARK-36379: proceed parsing with root nulls in permissive mode") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org