sahnib commented on code in PR #46856:
URL: https://github.com/apache/spark/pull/46856#discussion_r1626183498


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3730,6 +3730,12 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : {
+    "message" : [
+      "Provided key schema does not match existing state key schema. Please 
check number and type of fields. Existing key_schema=<storedKeySchema> and new 
key_schema=<newKeySchema>."

Review Comment:
   [nit] Do you think it would be useful to put existing and new key schema in 
separate lines for easier readability? Same comment for value schema error 
class below. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -44,37 +42,37 @@ class StateSchemaCompatibilityChecker(
     check(keySchema, valueSchema, ignoreValueSchema = false)
   }
 
+  private def logSchemaIncompatibleError(schemaErrorMsg: String): Unit = {
+    val errorMsg = "Provided schema doesn't match the schema for existing 
state! " +
+      "Please note that Spark allow difference of field name: check count of 
fields " +

Review Comment:
   [nit] `Spark allow difference of` -> `Spark allows differences in` 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########
@@ -313,22 +314,13 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
       ignoreValueSchema = ignoreValueSchema)
 
-    val e = intercept[StateSchemaNotCompatible] {
+    val e = intercept[SparkUnsupportedOperationException] {
       runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
         ignoreValueSchema = ignoreValueSchema)
     }
-
-    assert(e.getMessage.contains("Provided schema doesn't match to the schema 
for existing state!"))
-    assert(e.getMessage.contains(newKeySchema.toString()))
-    assert(e.getMessage.contains(oldKeySchema.toString()))
-
-    if (ignoreValueSchema) {
-      assert(!e.getMessage.contains(newValueSchema.toString()))
-      assert(!e.getMessage.contains(oldValueSchema.toString()))
-    } else {
-      assert(e.getMessage.contains(newValueSchema.toString()))
-      assert(e.getMessage.contains(oldValueSchema.toString()))
-    }
+    assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
+      e.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+    assert(e.getMessage.contains("does not match existing"))

Review Comment:
   Can we validate if the key schema or value schema error class will be 
thrown, based on the actual mismatch? 



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -782,11 +782,12 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
       testStream(aggregated, Update())(
         StartStream(checkpointLocation = tempDir.getAbsolutePath),
         AddData(inputData, 21),
-        ExpectFailure[SparkException] { e =>
+        ExpectFailure[
+          
org.apache.spark.sql.execution.streaming.state.StateStoreValueSchemaNotCompatible]
 { e =>

Review Comment:
   [nit] we can import 
`org.apache.spark.sql.execution.streaming.state.StateStoreValueSchemaNotCompatible`
 above to make this more readable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to