This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5e1776051ac [SPARK-42572][SQL][SS] Fix behavior for StateStoreProvider.validateStateRowFormat 5e1776051ac is described below commit 5e1776051ac7ee58e41ffb9ced0f43c9e4bdbcc9 Author: Wei Liu <wei....@databricks.com> AuthorDate: Tue Feb 28 12:13:18 2023 +0900 [SPARK-42572][SQL][SS] Fix behavior for StateStoreProvider.validateStateRowFormat ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/40073 accidentally changed the relationship of the two `if` statement in `StateStoreProvider.validateStateRowFormat`. Before they were inclusive, i.e. ``` if (a) { // <code> if (b) { // <code> } } ``` It was changed to parallel, i.e. ``` if (a) { // <code> } if (b) { // <code> } ``` This PR change it back to the original behavior and add a unit test to prevent it in the future. ### Why are the changes needed? As above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #40187 from WweiL/SPARK-42572-stateStore-logic-test. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/catalyst/util/UnsafeRowUtils.scala | 5 +++-- .../sql/execution/streaming/state/StateStore.scala | 11 +++++----- .../streaming/state/StateStoreSuite.scala | 25 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala index 81b06cb466c..c31a51e67cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala @@ -136,7 +136,7 @@ object UnsafeRowUtils { * @return None if all the checks pass. An error message if the row is not matched with the schema */ def validateStructuralIntegrityWithReason( - row: UnsafeRow, expectedSchema: StructType): Option[String] = { + row: UnsafeRow, expectedSchema: StructType): Option[String] = { validateStructuralIntegrityWithReasonImpl(row, expectedSchema).map { errorMessage => s"Error message is: $errorMessage, " + s"UnsafeRow status: ${getStructuralIntegrityStatus(row, expectedSchema)}" @@ -177,7 +177,8 @@ object UnsafeRowUtils { } def getStructuralIntegrityStatus(row: UnsafeRow, expectedSchema: StructType): String = { - val fieldStatusArr = expectedSchema.fields.zipWithIndex.map { + val minLength = Math.min(row.numFields(), expectedSchema.fields.length) + val fieldStatusArr = expectedSchema.fields.take(minLength).zipWithIndex.map { case (field, index) => val offsetAndSizeStr = if (!UnsafeRow.isFixedLength(field.dataType)) { val (offset, size) = getOffsetAndSize(row, index) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index beb6500fe3a..30e660eb2ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -343,11 +343,12 @@ object StateStoreProvider { if (conf.formatValidationEnabled) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema) validationError.foreach { error => throw new InvalidUnsafeRowException(error) } - } - if (conf.formatValidationCheckValue) { - val validationError = - UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema) - validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + + if (conf.formatValidationCheckValue) { + val validationError = + UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema) + validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 9651d8bb687..6d38aec363f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1253,6 +1253,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + test("SPARK-42572: StateStoreProvider.validateStateRowFormat shouldn't check" + + " value row format when SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED is false") { + // By default, when there is an invalid pair of value row and value schema, it should throw + val keyRow = dataToKeyRow("key", 1) + val valueRow = dataToValueRow(2) + val e = intercept[InvalidUnsafeRowException] { + // Here valueRow doesn't match with prefixKeySchema + StateStoreProvider.validateStateRowFormat( + keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf()) + } + assert(e.getMessage.contains("The streaming query failed by state format invalidation")) + + // When sqlConf.stateStoreFormatValidationEnabled is set to false and + // StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true, + // don't check value row + val sqlConf = getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, + SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get) + sqlConf.setConf(SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED, false) + val storeConf = new StateStoreConf(sqlConf, + Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "true")) + // Shouldn't throw + StateStoreProvider.validateStateRowFormat( + keyRow, keySchema, valueRow, keySchema, storeConf) + } + /** Return a new provider with a random id */ def newStoreProvider(): ProviderClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org