sahnib commented on code in PR #46856:
URL: https://github.com/apache/spark/pull/46856#discussion_r1626183498
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3730,6 +3730,12 @@
],
"sqlState" : "42K06"
},
+ "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : {
+ "message" : [
+ "Provided key schema does not match existing state key schema. Please
check number and type of fields. Existing key_schema=<storedKeySchema> and new
key_schema=<newKeySchema>."
Review Comment:
[nit] Do you think it would be useful to put existing and new key schema in
separate lines for easier readability? Same comment for value schema error
class below.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -44,37 +42,37 @@ class StateSchemaCompatibilityChecker(
check(keySchema, valueSchema, ignoreValueSchema = false)
}
+ private def logSchemaIncompatibleError(schemaErrorMsg: String): Unit = {
+ val errorMsg = "Provided schema doesn't match the schema for existing
state! " +
+ "Please note that Spark allow difference of field name: check count of
fields " +
Review Comment:
[nit] `Spark allow difference of` -> `Spark allows differences in`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########
@@ -313,22 +314,13 @@ class StateSchemaCompatibilityCheckerSuite extends
SharedSparkSession {
runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
ignoreValueSchema = ignoreValueSchema)
- val e = intercept[StateSchemaNotCompatible] {
+ val e = intercept[SparkUnsupportedOperationException] {
runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
ignoreValueSchema = ignoreValueSchema)
}
-
- assert(e.getMessage.contains("Provided schema doesn't match to the schema
for existing state!"))
- assert(e.getMessage.contains(newKeySchema.toString()))
- assert(e.getMessage.contains(oldKeySchema.toString()))
-
- if (ignoreValueSchema) {
- assert(!e.getMessage.contains(newValueSchema.toString()))
- assert(!e.getMessage.contains(oldValueSchema.toString()))
- } else {
- assert(e.getMessage.contains(newValueSchema.toString()))
- assert(e.getMessage.contains(oldValueSchema.toString()))
- }
+ assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
+ e.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+ assert(e.getMessage.contains("does not match existing"))
Review Comment:
Can we validate if the key schema or value schema error class will be
thrown, based on the actual mismatch?
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -782,11 +782,12 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
testStream(aggregated, Update())(
StartStream(checkpointLocation = tempDir.getAbsolutePath),
AddData(inputData, 21),
- ExpectFailure[SparkException] { e =>
+ ExpectFailure[
+
org.apache.spark.sql.execution.streaming.state.StateStoreValueSchemaNotCompatible]
{ e =>
Review Comment:
[nit] we can import
`org.apache.spark.sql.execution.streaming.state.StateStoreValueSchemaNotCompatible`
above to make this more readable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]