HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490087695


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
     checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
       Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData.toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        StopStream
+      )
+
+      // Delete operator metadata path
+      val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+      val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+      fm.delete(metadataPath)
+
+      // Restart the query
+      testStream(aggregated, Complete)(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 2)),
+        StopStream
+      )
+    }
+  }
+
+  test("Changing operator - " +
+    "replace, add, remove operators will trigger error with debug message") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+      testStream(stream.dropDuplicates())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(inputData, 1),
+        ProcessAllAvailable(),
+        StopStream
+      )
+
+      def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   Maybe community has to run the formatter at once for the whole codebase. I'm 
not sure scalafmt can deal with the whole styles though. It is still good to 
familiarize Scala style guide for Databricks; it doesn't only contain styles 
automation can handle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to