jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1489977792


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+         OpsInCurBatchSeq: Seq[String],
+         ex: Throwable): Unit = {
+        checkError(ex.asInstanceOf[SparkRuntimeException],
+          "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+          Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+            "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+        )
+      }
+
+      // replace dropDuplicates with dropDuplicatesWithinWatermark
+      testStream(stream.withWatermark("eventTime", "10 seconds")
+        .dropDuplicatesWithinWatermark(), Append)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 2),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+          }
+        }
+      )
+
+      // replace operator
+      testStream(stream.groupBy("value").count(), Update)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
+          }
+        }
+      )
+
+      // add operator
+      testStream(stream.dropDuplicates()
+        .groupBy("value").count(), Update)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        ExpectFailure[SparkRuntimeException] {
+          (t: Throwable) => {
+            checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"), 
t)
+          }
+        }
+      )
+
+      // remove operator

Review Comment:
   > Do we disallow stateful query to be stateless?
   
   We don't allow even before adding the operator check. Streaming will throw 
error with message as "state path not found".
   
   > E.g. could you simply test the removal of stateful operator with 
checkpointDir rather than spinning up another checkpoint?
   
   Done. Restarting a stateless query from a stateful query will now trigger 
error with message as:
   ```bash
   [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful 
operator name does not match with the operator in state metadata. This likely 
to happen when user adds/removes/changes stateful operator of existing 
streaming query.
   Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: 
dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to