HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490093460


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +215,79 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  Seq("Replace", "Add", "Remove").foreach { operation =>
+    test(s"$operation stateful operator will trigger error with guidance") {
+      withTempDir { checkpointDir =>
+        val inputData = MemoryStream[Int]
+        val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+        testStream(stream.dropDuplicates())(
+          StartStream(checkpointLocation = checkpointDir.toString),
+          AddData(inputData, 1),
+          ProcessAllAvailable(),
+          StopStream)
+
+        val (opsInMetadataSeq, opsInCurBatchSeq, restartStream) = operation 
match {
+          case "Add" =>
+            (
+              Map(0L -> "dedupe"),
+              Map(0L -> "stateStoreSave", 1L -> "dedupe"),
+              stream.dropDuplicates().groupBy("value").count())
+          case "Replace" =>
+            (Map(0L -> "dedupe"), Map(0L -> "stateStoreSave"), 
stream.groupBy("value").count())
+          case "Remove" =>
+            (Map(0L -> "dedupe"), Map.empty[Long, String], stream)
+        }
+
+        testStream(restartStream, Update)(
+          StartStream(checkpointLocation = checkpointDir.toString),
+          AddData(inputData, 3),
+          ExpectFailure[SparkRuntimeException] { t => {

Review Comment:
   super nit: another {} is unnecessary after `{ t =>`. multiple lines are 
allowed after `=>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to