HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1488847231


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -82,6 +84,39 @@ class IncrementalExecution(
     .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
     .getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
 
+  private def stateCheckpointLocationExists(stateCheckpointLocation: Path): 
Boolean = {
+    val fileManager =
+      CheckpointFileManager.create(stateCheckpointLocation, hadoopConf)
+    fileManager.exists(stateCheckpointLocation)
+  }
+
+  // A map of all (operatorId -> operatorName) in the state metadata
+  private lazy val opMapInMetadata: Map[Long, String] = {
+    var ret = Map.empty[Long, String]
+    if (stateCheckpointLocationExists(new Path(checkpointLocation))) {

Review Comment:
   nit: let's just inline this method if it is referenced only once.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -387,8 +433,29 @@ class IncrementalExecution(
       rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
     }
 
+    private def checkOperatorValidWithMetadata(): Unit = {
+     (opMapInMetadata.keySet ++ opMapInPhysicalPlan.keySet).foreach { opId =>
+       val opInMetadata = opMapInMetadata.getOrElse(opId, "not found")
+       val opInCurBatch = opMapInPhysicalPlan.getOrElse(opId, "not found")
+       if (opInMetadata != opInCurBatch) {
+         throw 
QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError(
+           opMapInMetadata.values.toSeq,

Review Comment:
   Shall we print out association between opId and opName in error message? It 
may be uneasy to understand what is mismatching only with opNames.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+         OpsInCurBatchSeq: Seq[String],

Review Comment:
   nit: indentation is off, 4 spaces



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -1702,6 +1702,18 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       new NoSuchElementException("State is either not defined or has already 
been removed")
   }
 
+  def statefulOperatorNotMatchInStateMetadataError(
+      opsInMetadataSeq: Seq[String],

Review Comment:
   Likewise I commented, should have provided information about association 
between opId and opName. Only opNames does not seem to be sufficient.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {

Review Comment:
   nit: "with debug message" - maybe better to say "with guidance".



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -45,7 +47,6 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     assert(operatorMetadata.operatorInfo == expectedMetadata.operatorInfo &&
       
operatorMetadata.stateStoreInfo.sameElements(expectedMetadata.stateStoreInfo))
   }
-

Review Comment:
   nit: revert this change



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -387,8 +433,29 @@ class IncrementalExecution(
       rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
     }
 
+    private def checkOperatorValidWithMetadata(): Unit = {

Review Comment:
   Shall we inline all the logic e.g. building opMapInMetadata and 
opMapInPhysicalPlan to here? I don't see we use these fields other than here. 
Let's scope fields and methods be narrower whenever possible. 
   
   That said, You don't need to use rule to build opMapInPhysicalPlan. Let's 
just use foreach to traverse the plan and build opMapInPhysicalPlan.



##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -3311,6 +3311,13 @@
     ],
     "sqlState" : "42601"
   },
+  "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
+    "message" : [
+      "Streaming stateful operator name does not match with the operator in 
state metadata. This likely to happen when user changes stateful operator of 
existing streaming query.",

Review Comment:
   nit: when user adds/removes/changes



##########
docs/sql-error-conditions.md:
##########
@@ -2085,6 +2085,13 @@ The checkpoint seems to be only run with older Spark 
version(s). Run the streami
 
 '`<optionName>`' must be specified.
 
+### STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA
+
+[SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Streaming stateful operator name does not match with the operator in state 
metadata. This likely to happen when user changes stateful operator of existing 
streaming query.

Review Comment:
   nit: when user adds/removes/changes



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+         OpsInCurBatchSeq: Seq[String],
+         ex: Throwable): Unit = {
+        checkError(ex.asInstanceOf[SparkRuntimeException],
+          "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+          Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+            "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+        )
+      }
+
+      // replace dropDuplicates with dropDuplicatesWithinWatermark
+      testStream(stream.withWatermark("eventTime", "10 seconds")
+        .dropDuplicatesWithinWatermark(), Append)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 2),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+          }
+        }
+      )
+
+      // replace operator

Review Comment:
   I guess it's redundant?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+         OpsInCurBatchSeq: Seq[String],
+         ex: Throwable): Unit = {
+        checkError(ex.asInstanceOf[SparkRuntimeException],
+          "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+          Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+            "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+        )
+      }
+
+      // replace dropDuplicates with dropDuplicatesWithinWatermark
+      testStream(stream.withWatermark("eventTime", "10 seconds")
+        .dropDuplicatesWithinWatermark(), Append)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 2),
+        ExpectFailure[SparkRuntimeException] {

Review Comment:
   nit: use `{ t =>` to save one indentation & two lines



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+         OpsInCurBatchSeq: Seq[String],
+         ex: Throwable): Unit = {
+        checkError(ex.asInstanceOf[SparkRuntimeException],
+          "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+          Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+            "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+        )
+      }
+
+      // replace dropDuplicates with dropDuplicatesWithinWatermark
+      testStream(stream.withWatermark("eventTime", "10 seconds")
+        .dropDuplicatesWithinWatermark(), Append)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 2),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+          }
+        }
+      )
+
+      // replace operator
+      testStream(stream.groupBy("value").count(), Update)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
+          }
+        }
+      )
+
+      // add operator
+      testStream(stream.dropDuplicates()
+        .groupBy("value").count(), Update)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"), 
t)
+          }
+        }
+      )
+
+      // remove operator

Review Comment:
   Please split down to separate test case if this is fully isolated with other 
check.
   
   Btw, this actually brings up food for thought. Do we disallow stateful query 
to be stateless? E.g. could you simply test the removal of stateful operator 
with checkpointDir rather than spinning up another checkpoint?
   
   It's OK if we have been supporting the case (although undocumented) and we 
keep supporting the case. If not, we could just test the case via using 
checkpointDir.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   nit: if you are using more than two lines to define the method, param should 
start at second line of definition. (In other words, all params should appear 
at the same indentation.)
   
   https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent
   
   Also, param should start with lowercase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to