This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a90a49828f4 [SPARK-39211][SQL] Support JSON scans with DEFAULT values
a90a49828f4 is described below

commit a90a49828f4484fa6c3dcfe5183bd4181f7cfd91
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Tue May 24 21:31:17 2022 +0800

    [SPARK-39211][SQL] Support JSON scans with DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support JSON scans when the table schema has associated DEFAULT column 
values.
    
    Example:
    
    ```
    create table t(i int) using json;
    insert into t values(42);
    alter table t add column s string default concat('abc', def');
    select * from t;
    > 42, 'abcdef'
    ```
    
    Interesting note: JSON does not distinguish between NULL values and the 
absence of values. Therefore inserting NULL and then selecting back the same 
column yields the default value (if any), since the insert did not change any 
storage.
    
    ### Why are the changes needed?
    
    This change makes it easier to build, query, and maintain tables backed by 
JSON data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36583 from dtenedor/default-json.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    |  5 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 63 ++++++++++++++++++++++
 .../org/apache/spark/sql/types/StructType.scala    | 30 +++--------
 .../apache/spark/sql/types/StructTypeSuite.scala   | 14 ++---
 .../org/apache/spark/sql/sources/InsertSuite.scala | 35 +++++++++---
 6 files changed, 110 insertions(+), 39 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index ff46672e67f..56ebfcc26c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -327,7 +327,7 @@ class UnivocityParser(
         case NonFatal(e) =>
           badRecordException = badRecordException.orElse(Some(e))
           // Use the corresponding DEFAULT value associated with the column, 
if any.
-          row.update(i, requiredSchema.defaultValues(i))
+          row.update(i, requiredSchema.existenceDefaultValues(i))
       }
       i += 1
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index abcbdb83813..7004d2a8f16 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
NoopFilters, StructFilters}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
@@ -421,12 +422,14 @@ class JacksonParser(
     var skipRow = false
 
     structFilters.reset()
+    resetExistenceDefaultsBitmask(schema)
     while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>
           try {
             row.update(index, fieldConverters(index).apply(parser))
             skipRow = structFilters.skipRow(row, index)
+            schema.existenceDefaultsBitmask(index) = false
           } catch {
             case e: SparkUpgradeException => throw e
             case NonFatal(e) if isRoot =>
@@ -437,10 +440,10 @@ class JacksonParser(
           parser.skipChildren()
       }
     }
-
     if (skipRow) {
       None
     } else if (badRecordException.isEmpty) {
+      applyExistenceDefaultValuesToRow(schema, row)
       Some(row)
     } else {
       throw PartialResultException(row, badRecordException.get)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index d2963a60409..262150174ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -18,11 +18,14 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
 import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -168,4 +171,64 @@ object ResolveDefaultColumns {
       str.toLowerCase()
     }
   }
+
+  /**
+   * Parses the text representing constant-folded default column literal 
values. These are known as
+   * "existence" default values because each one is the constant-folded result 
of the original
+   * default value first assigned to the column at table/column creation time. 
When scanning a field
+   * from any data source, if the corresponding value is not present in 
storage, the output row
+   * returns this "existence" default value instead of NULL.
+   * @return a sequence of either (1) NULL, if the column had no default 
value, or (2) an object of
+   *         Any type suitable for assigning into a row using the 
InternalRow.update method.
+   */
+  def getExistenceDefaultValues(schema: StructType): Array[Any] = {
+    schema.fields.map { field: StructField =>
+      val defaultValue: Option[String] = field.getExistenceDefaultValue()
+      defaultValue.map { text: String =>
+        val expr = try {
+          val expr = CatalystSqlParser.parseExpression(text)
+          expr match {
+            case _: ExprLiteral | _: AnsiCast | _: Cast => expr
+          }
+        } catch {
+          case _: ParseException | _: MatchError =>
+            throw 
QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
+        }
+        // The expression should be a literal value by this point, possibly 
wrapped in a cast
+        // function. This is enforced by the execution of commands that assign 
default values.
+        expr.eval()
+      }.orNull
+    }
+  }
+
+  /**
+   * Returns an array of boolean values equal in size to the result of 
[[getExistenceDefaultValues]]
+   * above, for convenience.
+   */
+  def getExistenceDefaultsBitmask(schema: StructType): Array[Boolean] = {
+    Array.fill[Boolean](schema.existenceDefaultValues.size)(true)
+  }
+
+  /**
+   * Resets the elements of the array initially returned from 
[[getExistenceDefaultsBitmask]] above.
+   * Afterwards, set element(s) to false before calling 
[[applyExistenceDefaultValuesToRow]] below.
+   */
+  def resetExistenceDefaultsBitmask(schema: StructType): Unit = {
+    for (i <- 0 until schema.existenceDefaultValues.size) {
+      schema.existenceDefaultsBitmask(i) = (schema.existenceDefaultValues(i) 
!= null)
+    }
+  }
+
+  /**
+   * Updates a subset of columns in the row with default values from the 
metadata in the schema.
+   */
+  def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow): 
Unit = {
+    if (schema.hasExistenceDefaultValues) {
+      for (i <- 0 until schema.existenceDefaultValues.size) {
+        if (schema.existenceDefaultsBitmask(i)) {
+          row.update(i, schema.existenceDefaultValues(i))
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 464d1ba1ef9..06460513c8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -25,10 +25,11 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Attribute, 
AttributeReference, Cast, InterpretedOrdering, Literal => ExprLiteral}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser, ParseException}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, InterpretedOrdering}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser}
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
@@ -513,28 +514,11 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
     InterpretedOrdering.forSchema(this.fields.map(_.dataType))
 
   /**
-   * Parses the text representing constant-folded default column literal 
values.
-   * @return a sequence of either (1) NULL, if the column had no default 
value, or (2) an object of
-   *         Any type suitable for assigning into a row using the 
InternalRow.update method.
+   * These define and cache existence default values for the struct fields for 
efficiency purposes.
    */
-  private [sql] lazy val defaultValues: Array[Any] =
-    fields.map { field: StructField =>
-      val defaultValue: Option[String] = field.getExistenceDefaultValue()
-      defaultValue.map { text: String =>
-        val expr = try {
-          val expr = CatalystSqlParser.parseExpression(text)
-          expr match {
-            case _: ExprLiteral | _: AnsiCast | _: Cast => expr
-          }
-        } catch {
-          case _: ParseException | _: MatchError =>
-            throw 
QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
-        }
-        // The expression should be a literal value by this point, possibly 
wrapped in a cast
-        // function. This is enforced by the execution of commands that assign 
default values.
-        expr.eval()
-      }.getOrElse(null)
-    }
+  private[sql] lazy val existenceDefaultValues: Array[Any] = 
getExistenceDefaultValues(this)
+  private[sql] lazy val existenceDefaultsBitmask: Array[Boolean] = 
getExistenceDefaultsBitmask(this)
+  private[sql] lazy val hasExistenceDefaultValues = 
existenceDefaultValues.exists(_ != null)
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 3aca7b1e52e..940a8e5e2ec 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -457,10 +457,10 @@ class StructTypeSuite extends SparkFunSuite with 
SQLHelper {
           
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
           .build()),
       StructField("c3", BooleanType)))
-    assert(source1.defaultValues.size == 3)
-    assert(source1.defaultValues(0) == 42)
-    assert(source1.defaultValues(1) == UTF8String.fromString("abc"))
-    assert(source1.defaultValues(2) == null)
+    assert(source1.existenceDefaultValues.size == 3)
+    assert(source1.existenceDefaultValues(0) == 42)
+    assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc"))
+    assert(source1.existenceDefaultValues(2) == null)
 
     // Negative test: StructType.defaultValues fails because the existence 
default value parses and
     // resolves successfully, but evaluates to a non-literal expression.
@@ -472,7 +472,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
           .build())))
     val error = "fails to parse as a valid literal value"
   assert(intercept[AnalysisException] {
-      source2.defaultValues
+      source2.existenceDefaultValues
     }.getMessage.contains(error))
 
     // Negative test: StructType.defaultValues fails because the existence 
default value fails to
@@ -484,7 +484,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
           
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid")
           .build())))
     assert(intercept[AnalysisException] {
-      source3.defaultValues
+      source3.existenceDefaultValues
     }.getMessage.contains(error))
 
     // Negative test: StructType.defaultValues fails because the existence 
default value fails to
@@ -500,7 +500,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
             "(SELECT 'abc' FROM missingtable)")
           .build())))
     assert(intercept[AnalysisException] {
-      source4.defaultValues
+      source4.existenceDefaultValues
     }.getMessage.contains(error))
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1580a33a9eb..247c6bdb355 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1572,23 +1572,44 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
 
     // This represents one test configuration over a data source.
-    case class Config(dataSource: String, sqlConf: Seq[(String, String)] = 
Seq())
+    case class Config(
+      dataSource: String,
+      sqlConf: Seq[Option[(String, String)]] = Seq())
+    // Run the test several times using each configuration.
     Seq(
+      Config(dataSource = "json",
+        Seq(
+          Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false"))),
       Config(dataSource = "csv",
         Seq(
-          SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false"))
+          None,
+          Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))
     ).foreach { config: Config =>
-      // First run the test with default settings.
-      runTest(config.dataSource)
-      // Then run the test again with each pair of custom SQLConf values.
-      config.sqlConf.foreach { kv: (String, String) =>
-        withSQLConf(kv) {
+      config.sqlConf.foreach {
+        _.map { kv: (String, String) =>
+          withSQLConf(kv) {
+            // Run the test with the pair of custom SQLConf values.
+            runTest(config.dataSource)
+          }
+        }.getOrElse {
+          // Run the test with default settings.
           runTest(config.dataSource)
         }
       }
     }
   }
 
+  test("SPARK-39211 INSERT into JSON table, ADD COLUMNS with DEFAULTs, then 
SELECT them") {
+    // By default, INSERT commands into JSON tables do not store NULL values. 
Therefore, if such
+    // destination table columns have DEFAULT values, SELECTing out the same 
columns will return the
+    // default values (instead of NULL) since nothing is present in storage.
+    withTable("t") {
+      sql("create table t(a string default 'abc') using json")
+      sql("insert into t values(null)")
+      checkAnswer(spark.table("t"), Row("abc"))
+    }
+  }
+
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to