HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490093460
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +215,79 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ Seq("Replace", "Add", "Remove").foreach { operation =>
+ test(s"$operation stateful operator will trigger error with guidance") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream)
+
+ val (opsInMetadataSeq, opsInCurBatchSeq, restartStream) = operation
match {
+ case "Add" =>
+ (
+ Map(0L -> "dedupe"),
+ Map(0L -> "stateStoreSave", 1L -> "dedupe"),
+ stream.dropDuplicates().groupBy("value").count())
+ case "Replace" =>
+ (Map(0L -> "dedupe"), Map(0L -> "stateStoreSave"),
stream.groupBy("value").count())
+ case "Remove" =>
+ (Map(0L -> "dedupe"), Map.empty[Long, String], stream)
+ }
+
+ testStream(restartStream, Update)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ ExpectFailure[SparkRuntimeException] { t => {
Review Comment:
super nit: another {} is unnecessary after `{ t =>`. multiple lines are
allowed after `=>`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]