jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490116430


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   Got it! Thanks Jungtaek!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to