jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1489977792
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
+ ex: Throwable): Unit = {
+ checkError(ex.asInstanceOf[SparkRuntimeException],
+ "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+ Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+ "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+ )
+ }
+
+ // replace dropDuplicates with dropDuplicatesWithinWatermark
+ testStream(stream.withWatermark("eventTime", "10 seconds")
+ .dropDuplicatesWithinWatermark(), Append)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 2),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+ }
+ }
+ )
+
+ // replace operator
+ testStream(stream.groupBy("value").count(), Update)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
+ }
+ }
+ )
+
+ // add operator
+ testStream(stream.dropDuplicates()
+ .groupBy("value").count(), Update)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"),
t)
+ }
+ }
+ )
+
+ // remove operator
Review Comment:
> Do we disallow stateful query to be stateless?
We don't allow even before adding the operator check. Streaming will throw
error with message as "state path not found".
> E.g. could you simply test the removal of stateful operator with
checkpointDir rather than spinning up another checkpoint?
Done. Restarting a stateless query from a stateful query will now trigger
error with message as:
```bash
[STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful
operator name does not match with the operator in state metadata. This likely
to happen when user adds/removes/changes stateful operator of existing
streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName:
dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]