This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new a2854ba5d852 [SPARK-46862][SQL][FOLLOWUP] Fix column pruning without schema enforcing in V1 CSV datasource a2854ba5d852 is described below commit a2854ba5d852e2001b96636a8964494c45fc27d3 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sat Jan 27 19:22:52 2024 +0300 [SPARK-46862][SQL][FOLLOWUP] Fix column pruning without schema enforcing in V1 CSV datasource ### What changes were proposed in this pull request? In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by https://github.com/apache/spark/pull/44872 while matching of CSV header to a schema in the V1 CSV datasource. ### Why are the changes needed? To fix the failure when column pruning happens and a schema is not enforced: ```scala scala> spark.read. | option("multiLine", true). | option("header", true). | option("escape", "\""). | option("enforceSchema", false). | csv("/Users/maximgekk/tmp/es-939111-data.csv"). | count() 24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 0 CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44910 from MaxGekk/check-header-column-pruning. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit bc51c9fea3645c6ae1d9e1e83b0f94f8b849be20) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../sql/execution/datasources/csv/CSVFileFormat.scala | 6 +++--- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 15 +++++++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 069ad9562a7d..0ff96f073f03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -100,12 +100,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val columnPruning = sparkSession.sessionState.conf.csvColumnPruning val parsedOptions = new CSVOptions( options, - columnPruning, + sparkSession.sessionState.conf.csvColumnPruning, sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) @@ -125,7 +125,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { actualRequiredSchema, parsedOptions, actualFilters) - val schema = if (columnPruning) actualRequiredSchema else actualDataSchema + val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 6690bf101fa7..a91adb787838 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3215,12 +3215,15 @@ abstract class CSVSuite withTempPath { path => Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) - val df = spark.read - .option("multiline", "true") - .option("header", "true") - .option("escape", "\"") - .csv(path.getCanonicalPath) - assert(df.count() === 5) + Seq(true, false).foreach { enforceSchema => + val df = spark.read + .option("multiLine", true) + .option("header", true) + .option("escape", "\"") + .option("enforceSchema", enforceSchema) + .csv(path.getCanonicalPath) + assert(df.count() === 5) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org