This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 16ac82092bb7 [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning 16ac82092bb7 is described below commit 16ac82092bb775aafd010e2fb02b7ddc1eceea73 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Sat Feb 3 08:50:44 2024 +0300 [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning ### What changes were proposed in this pull request? This PR fixes a CSV parsing bug with existence default values and column pruning (https://issues.apache.org/jira/browse/SPARK-46890). The bug fix includes disabling column pruning specifically when checking the CSV header schema against the required schema expected by Catalyst. This makes the expected schema match what the CSV parser provides, since later we also happen instruct the CSV parser to disable column pruning and instead read each entire row in order to correctly assign the default value(s) during execution. ### Why are the changes needed? Before this change, queries from a subset of the columns in a CSV table whose `CREATE TABLE` statement contained default values would return an internal exception. For example: ``` CREATE TABLE IF NOT EXISTS products ( product_id INT, name STRING, price FLOAT default 0.0, quantity INT default 0 ) USING CSV OPTIONS ( header 'true', inferSchema 'false', enforceSchema 'false', path '/Users/maximgekk/tmp/products.csv' ); ``` The CSV file products.csv: ``` product_id,name,price,quantity 1,Apple,0.50,100 2,Banana,0.25,200 3,Orange,0.75,50 ``` The query fails: ``` spark-sql (default)> SELECT price FROM products; 24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 1 CSV file: file:///Users/Daniel.Tenedorio/tmp/products.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR adds test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44939 from dtenedor/debug-csv-default. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 15 ++++++++- .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 +-- .../execution/datasources/csv/CSVFileFormat.scala | 5 ++- .../v2/csv/CSVPartitionReaderFactory.scala | 6 +++- .../sql/execution/datasources/csv/CSVSuite.scala | 38 ++++++++++++++++++++++ 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index c5a6bf5076de..f4ade722791c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -26,8 +26,10 @@ import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, Unescape import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types.StructType class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], @@ -278,13 +280,24 @@ class CSVOptions( .getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) /** + * Returns true if column pruning is enabled and there are no existence column default values in + * the [[schema]]. + * * The column pruning feature can be enabled either via the CSV option `columnPruning` or * in non-multiline mode via initialization of CSV options by the SQL config: * `spark.sql.csv.parser.columnPruning.enabled`. * The feature is disabled in the `multiLine` mode because of the issue: * https://github.com/uniVocity/univocity-parsers/issues/529 + * + * We disable column pruning when there are any column defaults, instead preferring to reach in + * each row and then post-process it to substitute the default values after. */ - val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning) + def isColumnPruningEnabled(schema: StructType): Boolean = + isColumnPruningOptionEnabled && + !schema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + + private val isColumnPruningOptionEnabled: Boolean = + getBool(COLUMN_PRUNING, !multiLine && columnPruning) def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 34a8b3d09047..06057626461b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources.Filter @@ -71,8 +70,7 @@ class UnivocityParser( // positions. Generally assigned by input configuration options, except when input column(s) have // default values, in which case we omit the explicit indexes in order to know how many tokens // were present in each line instead. - private def columnPruning: Boolean = options.isColumnPruningEnabled && - !requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + private def columnPruning: Boolean = options.isColumnPruningEnabled(requiredSchema) // When column pruning is enabled, the parser only parses the required columns based on // their positions in the data schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 9516a7729481..3338006b7bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -105,7 +105,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession.sessionState.conf.csvColumnPruning, sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled + val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled(requiredSchema) // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) @@ -125,6 +125,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { actualRequiredSchema, parsedOptions, actualFilters) + // Use column pruning when specified by Catalyst, except when one or more columns have + // existence default value(s), since in that case we instruct the CSV parser to disable column + // pruning and instead read each entire row in order to correctly assign the default value(s). val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index cef5a71ca9c6..65eff0647ee2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -58,7 +58,11 @@ case class CSVPartitionReaderFactory( actualReadDataSchema, options, filters) - val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else actualDataSchema + val schema = if (options.isColumnPruningEnabled(readDataSchema)) { + actualReadDataSchema + } else { + actualDataSchema + } val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c7f25c633e0b..12a141944609 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -56,6 +56,7 @@ abstract class CSVSuite override protected def dataSourceFormat = "csv" protected val carsFile = "test-data/cars.csv" + protected val productsFile = "test-data/products.csv" private val carsMalformedFile = "test-data/cars-malformed.csv" private val carsFile8859 = "test-data/cars_iso-8859-1.csv" private val carsTsvFile = "test-data/cars.tsv" @@ -3248,6 +3249,43 @@ abstract class CSVSuite } } } + + test("SPARK-46890: CSV fails on a column with default and without enforcing schema") { + withTable("CarsTable") { + spark.sql( + s""" + |CREATE TABLE CarsTable( + | year INT, + | make STRING, + | model STRING, + | comment STRING DEFAULT '', + | blank STRING DEFAULT '') + |USING csv + |OPTIONS ( + | header "true", + | inferSchema "false", + | enforceSchema "false", + | path "${testFile(carsFile)}" + |) + """.stripMargin) + val expected = Seq( + Row("No comment"), + Row("Go get one now they are going fast")) + checkAnswer( + sql("SELECT comment FROM CarsTable WHERE year < 2014"), + expected) + checkAnswer( + spark.read.format("csv") + .options(Map( + "header" -> "true", + "inferSchema" -> "true", + "enforceSchema" -> "false")) + .load(testFile(carsFile)) + .select("comment") + .where("year < 2014"), + expected) + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org