This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e1776051ac [SPARK-42572][SQL][SS] Fix behavior for 
StateStoreProvider.validateStateRowFormat
5e1776051ac is described below

commit 5e1776051ac7ee58e41ffb9ced0f43c9e4bdbcc9
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Tue Feb 28 12:13:18 2023 +0900

    [SPARK-42572][SQL][SS] Fix behavior for 
StateStoreProvider.validateStateRowFormat
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/40073 accidentally changed the 
relationship of the two `if` statement in 
`StateStoreProvider.validateStateRowFormat`. Before they were inclusive, i.e.
    ```
    if (a) {
      // <code>
      if (b) {
        // <code>
      }
    }
    ```
    
    It was changed to parallel, i.e.
    ```
    if (a) {
      // <code>
    }
    if (b) {
        // <code>
    }
    ```
    
    This PR change it back to the original behavior and add a unit test to 
prevent it in the future.
    
    ### Why are the changes needed?
    
    As above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #40187 from WweiL/SPARK-42572-stateStore-logic-test.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/catalyst/util/UnsafeRowUtils.scala   |  5 +++--
 .../sql/execution/streaming/state/StateStore.scala | 11 +++++-----
 .../streaming/state/StateStoreSuite.scala          | 25 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 81b06cb466c..c31a51e67cf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -136,7 +136,7 @@ object UnsafeRowUtils {
    * @return None if all the checks pass. An error message if the row is not 
matched with the schema
    */
   def validateStructuralIntegrityWithReason(
-    row: UnsafeRow, expectedSchema: StructType): Option[String] = {
+      row: UnsafeRow, expectedSchema: StructType): Option[String] = {
     validateStructuralIntegrityWithReasonImpl(row, expectedSchema).map {
       errorMessage => s"Error message is: $errorMessage, " +
           s"UnsafeRow status: ${getStructuralIntegrityStatus(row, 
expectedSchema)}"
@@ -177,7 +177,8 @@ object UnsafeRowUtils {
   }
 
   def getStructuralIntegrityStatus(row: UnsafeRow, expectedSchema: 
StructType): String = {
-    val fieldStatusArr = expectedSchema.fields.zipWithIndex.map {
+    val minLength = Math.min(row.numFields(), expectedSchema.fields.length)
+    val fieldStatusArr = 
expectedSchema.fields.take(minLength).zipWithIndex.map {
       case (field, index) =>
         val offsetAndSizeStr = if (!UnsafeRow.isFixedLength(field.dataType)) {
           val (offset, size) = getOffsetAndSize(row, index)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index beb6500fe3a..30e660eb2ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -343,11 +343,12 @@ object StateStoreProvider {
     if (conf.formatValidationEnabled) {
       val validationError = 
UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
       validationError.foreach { error => throw new 
InvalidUnsafeRowException(error) }
-    }
-    if (conf.formatValidationCheckValue) {
-      val validationError =
-        UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, 
valueSchema)
-      validationError.foreach { error => throw new 
InvalidUnsafeRowException(error) }
+
+      if (conf.formatValidationCheckValue) {
+        val validationError =
+          UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, 
valueSchema)
+        validationError.foreach { error => throw new 
InvalidUnsafeRowException(error) }
+      }
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 9651d8bb687..6d38aec363f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -1253,6 +1253,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     }
   }
 
+  test("SPARK-42572: StateStoreProvider.validateStateRowFormat shouldn't 
check" +
+    " value row format when SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED is 
false") {
+    // By default, when there is an invalid pair of value row and value 
schema, it should throw
+    val keyRow = dataToKeyRow("key", 1)
+    val valueRow = dataToValueRow(2)
+    val e = intercept[InvalidUnsafeRowException] {
+      // Here valueRow doesn't match with prefixKeySchema
+      StateStoreProvider.validateStateRowFormat(
+        keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
+    }
+    assert(e.getMessage.contains("The streaming query failed by state format 
invalidation"))
+
+    // When sqlConf.stateStoreFormatValidationEnabled is set to false and
+    // StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
+    // don't check value row
+    val sqlConf = 
getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+      SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
+    sqlConf.setConf(SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED, false)
+    val storeConf = new StateStoreConf(sqlConf,
+      Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "true"))
+    // Shouldn't throw
+    StateStoreProvider.validateStateRowFormat(
+      keyRow, keySchema, valueRow, keySchema, storeConf)
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to