anishshri-db commented on code in PR #46856:
URL: https://github.com/apache/spark/pull/46856#discussion_r1628220489


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -44,37 +42,37 @@ class StateSchemaCompatibilityChecker(
     check(keySchema, valueSchema, ignoreValueSchema = false)
   }
 
+  private def logSchemaIncompatibleError(schemaErrorMsg: String): Unit = {
+    val errorMsg = "Provided schema doesn't match the schema for existing 
state! " +
+      "Please note that Spark allows difference in field names: please check 
count of fields " +
+      "and data type of each field.\n" +
+      schemaErrorMsg +
+      s"If you want to force running query without schema validation, please 
set " +
+      s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
+      "Please note that running query with incompatible schema could cause 
indeterministic " +
+      "behavior."
+    logError(errorMsg)
+  }
+
   def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: 
Boolean): Unit = {
     if (fm.exists(schemaFileLocation)) {
       logDebug(s"Schema file for provider $providerId exists. Comparing with 
provided schema.")
       val (storedKeySchema, storedValueSchema) = readSchemaFile()
       if (storedKeySchema.equals(keySchema) &&
         (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
         // schema is exactly same
-      } else if (!schemasCompatible(storedKeySchema, keySchema) ||
-        (!ignoreValueSchema && !schemasCompatible(storedValueSchema, 
valueSchema))) {
+      } else if (!schemasCompatible(storedKeySchema, keySchema)) {
         val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" +
           s"- Existing key schema: $storedKeySchema\n"
-
-        // If it is requested to skip checking the value schema, we also don't 
expose the value
-        // schema information to the error message.
-        val errorMsgForValueSchema = if (!ignoreValueSchema) {
-          s"- Provided value schema: $valueSchema\n" +
-            s"- Existing value schema: $storedValueSchema\n"
-        } else {
-          ""
-        }
-        val errorMsg = "Provided schema doesn't match to the schema for 
existing state! " +
-          "Please note that Spark allow difference of field name: check count 
of fields " +
-          "and data type of each field.\n" +
-          errorMsgForKeySchema +
-          errorMsgForValueSchema +
-          s"If you want to force running query without schema validation, 
please set " +
-          s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
-          "Please note running query with incompatible schema could cause 
indeterministic" +
-          " behavior."
-        logError(errorMsg)
-        throw StateSchemaNotCompatible(errorMsg)
+        logSchemaIncompatibleError(errorMsgForKeySchema)

Review Comment:
   Sure - done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to