This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f5f67b851e2 [SPARK-39143][SQL] Support CSV scans with DEFAULT values f5f67b851e2 is described below commit f5f67b851e28afd898a2e3844c088c3041a199fe Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Wed May 18 08:38:43 2022 +0900 [SPARK-39143][SQL] Support CSV scans with DEFAULT values ### What changes were proposed in this pull request? Support CSV scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using csv; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by CSV data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36501 from dtenedor/default-csv. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 17 +++-- .../spark/sql/errors/QueryCompilationErrors.scala | 6 ++ .../org/apache/spark/sql/types/StructField.scala | 11 ++++ .../org/apache/spark/sql/types/StructType.scala | 28 ++++++++- .../apache/spark/sql/types/StructTypeSuite.scala | 63 +++++++++++++++++++ .../org/apache/spark/sql/sources/InsertSuite.scala | 73 ++++++++++++++++++++++ 6 files changed, 192 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 56166950e67..ff46672e67f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -67,9 +68,16 @@ class UnivocityParser( private val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray + // True if we should inform the Univocity CSV parser to select which fields to read by their + // positions. Generally assigned by input configuration options, except when input column(s) have + // default values, in which case we omit the explicit indexes in order to know how many tokens + // were present in each line instead. + private def columnPruning: Boolean = options.columnPruning && + !requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + // When column pruning is enabled, the parser only parses the required columns based on // their positions in the data schema. - private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema + private val parsedSchema = if (columnPruning) requiredSchema else dataSchema val tokenizer: CsvParser = { val parserSetting = options.asParserSettings @@ -266,7 +274,7 @@ class UnivocityParser( */ val parse: String => Option[InternalRow] = { // This is intentionally a val to create a function once and reuse. - if (options.columnPruning && requiredSchema.isEmpty) { + if (columnPruning && requiredSchema.isEmpty) { // If `columnPruning` enabled and partition attributes scanned only, // `schema` gets empty. (_: String) => Some(InternalRow.empty) @@ -276,7 +284,7 @@ class UnivocityParser( } } - private val getToken = if (options.columnPruning) { + private val getToken = if (columnPruning) { (tokens: Array[String], index: Int) => tokens(index) } else { (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index)) @@ -318,7 +326,8 @@ class UnivocityParser( case e: SparkUpgradeException => throw e case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) - row.setNullAt(i) + // Use the corresponding DEFAULT value associated with the column, if any. + row.update(i, requiredSchema.defaultValues(i)) } i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 912f65aa58c..3d133d6cfab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2445,4 +2445,10 @@ object QueryCompilationErrors extends QueryErrorsBase { "Failed to execute MERGE INTO command because one of its INSERT or UPDATE assignments " + "contains a DEFAULT column reference as part of another expression; this is not allowed") } + + def failedToParseExistenceDefaultAsLiteral(fieldName: String, defaultValue: String): Throwable = { + throw new AnalysisException( + s"Invalid DEFAULT value for column $fieldName: $defaultValue fails to parse as a valid " + + "literal value") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala index c80745ff6b5..1fdde3e5219 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -118,6 +118,17 @@ case class StructField( } } + /** + * Return the existence default value of this StructField. + */ + private[sql] def getExistenceDefaultValue(): Option[String] = { + if (metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) { + Option(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + } else { + None + } + } + private def getDDLComment = getComment() .map(escapeSingleQuotedString) .map(" COMMENT '" + _ + "'") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index ec08ee4838f..464d1ba1ef9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -25,8 +25,8 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} +import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Attribute, AttributeReference, Cast, InterpretedOrdering, Literal => ExprLiteral} +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser, ParseException} import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils} import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat @@ -511,6 +511,30 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru @transient private[sql] lazy val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType)) + + /** + * Parses the text representing constant-folded default column literal values. + * @return a sequence of either (1) NULL, if the column had no default value, or (2) an object of + * Any type suitable for assigning into a row using the InternalRow.update method. + */ + private [sql] lazy val defaultValues: Array[Any] = + fields.map { field: StructField => + val defaultValue: Option[String] = field.getExistenceDefaultValue() + defaultValue.map { text: String => + val expr = try { + val expr = CatalystSqlParser.parseExpression(text) + expr match { + case _: ExprLiteral | _: AnsiCast | _: Cast => expr + } + } catch { + case _: ParseException | _: MatchError => + throw QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text) + } + // The expression should be a literal value by this point, possibly wrapped in a cast + // function. This is enforced by the execution of commands that assign default values. + expr.eval() + }.getOrElse(null) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala index 16f122334f3..ef29f7b9cbb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala @@ -22,12 +22,14 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DayTimeIntervalType => DT} import org.apache.spark.sql.types.{YearMonthIntervalType => YM} import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.StructType.fromDDL import org.apache.spark.sql.types.YearMonthIntervalType._ +import org.apache.spark.unsafe.types.UTF8String class StructTypeSuite extends SparkFunSuite with SQLHelper { @@ -436,4 +438,65 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { } assert(e.getMessage.contains("Failed to merge decimal types")) } + + test("SPARK-39143: Test parsing default column values out of struct types") { + // Positive test: the StructType.defaultValues evaluation is successful. + val source1 = StructType(Array( + StructField("c1", LongType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT") + .build()), + StructField("c2", StringType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'abc'") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'") + .build()), + StructField("c3", BooleanType))) + assert(source1.defaultValues.size == 3) + assert(source1.defaultValues(0) == 42) + assert(source1.defaultValues(1) == UTF8String.fromString("abc")) + assert(source1.defaultValues(2) == null) + + // Negative test: StructType.defaultValues fails because the existence default value parses and + // resolves successfully, but evaluates to a non-literal expression. + val source2 = StructType( + Array(StructField("c1", IntegerType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "1 + 1") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "1 + 1") + .build()))) + val error = "fails to parse as a valid literal value" + assert(intercept[AnalysisException] { + source2.defaultValues + }.getMessage.contains(error)) + + // Negative test: StructType.defaultValues fails because the existence default value fails to + // parse. + val source3 = StructType(Array( + StructField("c1", IntegerType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "invalid") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid") + .build()))) + assert(intercept[AnalysisException] { + source3.defaultValues + }.getMessage.contains(error)) + + // Negative test: StructType.defaultValues fails because the existence default value fails to + // resolve. + val source4 = StructType(Array( + StructField("c1", IntegerType, true, + new MetadataBuilder() + .putString( + ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, + "(SELECT 'abc' FROM missingtable)") + .putString( + ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, + "(SELECT 'abc' FROM missingtable)") + .build()))) + assert(intercept[AnalysisException] { + source4.defaultValues + }.getMessage.contains(error)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index a2237b377cf..8fbaafbead3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1512,6 +1512,79 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them: Positive tests") { + def runTest(dataSource: String): Unit = { + val createTableIntCol = s"create table t(a string, i int) using $dataSource" + // Adding a column with a valid default value into a table containing existing data works + // successfully. Querying data from the altered table returns the new value. + withTable("t") { + sql(createTableIntCol) + sql("insert into t values('xyz', 42)") + sql("alter table t add column (s string default concat('abc', 'def'))") + checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef")) + checkAnswer(sql("select i, s from t"), Row(42, "abcdef")) + } + // Same as above, but a following command alters the column to change the default value. + // This returns the previous value, not the new value, since the behavior semantics are + // the same as if the first command had performed a backfill of the new default value in + // the existing rows. + withTable("t") { + sql(createTableIntCol) + sql("insert into t values('xyz', 42)") + sql("alter table t add column (s string default concat('abc', 'def'))") + sql("alter table t alter column s set default concat('ghi', 'jkl')") + checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef")) + checkAnswer(sql("select i, s from t"), Row(42, "abcdef")) + } + // Adding a column with a default value and then inserting explicit NULL values works. + // Querying data back from the table differentiates between the explicit NULL values and + // default values. + withTable("t") { + sql(createTableIntCol) + sql("insert into t values('xyz', 42)") + sql("alter table t add column (s string default concat('abc', 'def'))") + sql("insert into t values(null, null, null)") + sql("alter table t add column (x boolean default true)") + checkAnswer(spark.table("t"), + Seq( + Row("xyz", 42, "abcdef", true), + Row(null, null, null, true))) + checkAnswer(sql("select i, s, x from t"), + Seq( + Row(42, "abcdef", true), + Row(null, null, true))) + } + // Adding two columns where only the first has a valid default value works successfully. + // Querying data from the altered table returns the default value as well as NULL for the + // second column. + withTable("t") { + sql(createTableIntCol) + sql("insert into t values('xyz', 42)") + sql("alter table t add column (s string default concat('abc', 'def'))") + sql("alter table t add column (x string)") + checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null)) + checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null)) + } + } + + // This represents one test configuration over a data source. + case class Config(dataSource: String, sqlConf: Seq[(String, String)] = Seq()) + Seq( + Config(dataSource = "csv", + Seq( + SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")) + ).foreach { config: Config => + // First run the test with default settings. + runTest(config.dataSource) + // Then run the test again with each pair of custom SQLConf values. + config.sqlConf.foreach { kv: (String, String) => + withSQLConf(kv) { + runTest(config.dataSource) + } + } + } + } + test("Stop task set if FileAlreadyExistsException was thrown") { Seq(true, false).foreach { fastFail => withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org