jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1489982761
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
Review Comment:
Do we have any automation tool for checking this other than
`./dev/scalafmt`? This command is listed on the [spark developer tool
wiki](https://spark.apache.org/developer-tools.html#:~:text=the%20style%20guide.-,Formatting%20code,-To%20format%20Scala),
and is actually quite messy - it will touch all existing files other than only
formatting my code change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]