HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1488847231
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -82,6 +84,39 @@ class IncrementalExecution(
.map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
.getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
+ private def stateCheckpointLocationExists(stateCheckpointLocation: Path):
Boolean = {
+ val fileManager =
+ CheckpointFileManager.create(stateCheckpointLocation, hadoopConf)
+ fileManager.exists(stateCheckpointLocation)
+ }
+
+ // A map of all (operatorId -> operatorName) in the state metadata
+ private lazy val opMapInMetadata: Map[Long, String] = {
+ var ret = Map.empty[Long, String]
+ if (stateCheckpointLocationExists(new Path(checkpointLocation))) {
Review Comment:
nit: let's just inline this method if it is referenced only once.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -387,8 +433,29 @@ class IncrementalExecution(
rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
}
+ private def checkOperatorValidWithMetadata(): Unit = {
+ (opMapInMetadata.keySet ++ opMapInPhysicalPlan.keySet).foreach { opId =>
+ val opInMetadata = opMapInMetadata.getOrElse(opId, "not found")
+ val opInCurBatch = opMapInPhysicalPlan.getOrElse(opId, "not found")
+ if (opInMetadata != opInCurBatch) {
+ throw
QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError(
+ opMapInMetadata.values.toSeq,
Review Comment:
Shall we print out association between opId and opName in error message? It
may be uneasy to understand what is mismatching only with opNames.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
Review Comment:
nit: indentation is off, 4 spaces
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -1702,6 +1702,18 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
new NoSuchElementException("State is either not defined or has already
been removed")
}
+ def statefulOperatorNotMatchInStateMetadataError(
+ opsInMetadataSeq: Seq[String],
Review Comment:
Likewise I commented, should have provided information about association
between opId and opName. Only opNames does not seem to be sufficient.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
Review Comment:
nit: "with debug message" - maybe better to say "with guidance".
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -45,7 +47,6 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
assert(operatorMetadata.operatorInfo == expectedMetadata.operatorInfo &&
operatorMetadata.stateStoreInfo.sameElements(expectedMetadata.stateStoreInfo))
}
-
Review Comment:
nit: revert this change
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -387,8 +433,29 @@ class IncrementalExecution(
rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
}
+ private def checkOperatorValidWithMetadata(): Unit = {
Review Comment:
Shall we inline all the logic e.g. building opMapInMetadata and
opMapInPhysicalPlan to here? I don't see we use these fields other than here.
Let's scope fields and methods be narrower whenever possible.
That said, You don't need to use rule to build opMapInPhysicalPlan. Let's
just use foreach to traverse the plan and build opMapInPhysicalPlan.
##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -3311,6 +3311,13 @@
],
"sqlState" : "42601"
},
+ "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
+ "message" : [
+ "Streaming stateful operator name does not match with the operator in
state metadata. This likely to happen when user changes stateful operator of
existing streaming query.",
Review Comment:
nit: when user adds/removes/changes
##########
docs/sql-error-conditions.md:
##########
@@ -2085,6 +2085,13 @@ The checkpoint seems to be only run with older Spark
version(s). Run the streami
'`<optionName>`' must be specified.
+### STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA
+
+[SQLSTATE:
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Streaming stateful operator name does not match with the operator in state
metadata. This likely to happen when user changes stateful operator of existing
streaming query.
Review Comment:
nit: when user adds/removes/changes
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
+ ex: Throwable): Unit = {
+ checkError(ex.asInstanceOf[SparkRuntimeException],
+ "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+ Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+ "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+ )
+ }
+
+ // replace dropDuplicates with dropDuplicatesWithinWatermark
+ testStream(stream.withWatermark("eventTime", "10 seconds")
+ .dropDuplicatesWithinWatermark(), Append)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 2),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+ }
+ }
+ )
+
+ // replace operator
Review Comment:
I guess it's redundant?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
+ ex: Throwable): Unit = {
+ checkError(ex.asInstanceOf[SparkRuntimeException],
+ "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+ Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+ "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+ )
+ }
+
+ // replace dropDuplicates with dropDuplicatesWithinWatermark
+ testStream(stream.withWatermark("eventTime", "10 seconds")
+ .dropDuplicatesWithinWatermark(), Append)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 2),
+ ExpectFailure[SparkRuntimeException] {
Review Comment:
nit: use `{ t =>` to save one indentation & two lines
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
+ ex: Throwable): Unit = {
+ checkError(ex.asInstanceOf[SparkRuntimeException],
+ "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+ Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+ "OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+ )
+ }
+
+ // replace dropDuplicates with dropDuplicatesWithinWatermark
+ testStream(stream.withWatermark("eventTime", "10 seconds")
+ .dropDuplicatesWithinWatermark(), Append)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 2),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+ }
+ }
+ )
+
+ // replace operator
+ testStream(stream.groupBy("value").count(), Update)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
+ }
+ }
+ )
+
+ // add operator
+ testStream(stream.dropDuplicates()
+ .groupBy("value").count(), Update)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ ExpectFailure[SparkRuntimeException] {
+ (t: Throwable) => {
+ checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"),
t)
+ }
+ }
+ )
+
+ // remove operator
Review Comment:
Please split down to separate test case if this is fully isolated with other
check.
Btw, this actually brings up food for thought. Do we disallow stateful query
to be stateless? E.g. could you simply test the removal of stateful operator
with checkpointDir rather than spinning up another checkpoint?
It's OK if we have been supporting the case (although undocumented) and we
keep supporting the case. If not, we could just test the case via using
checkpointDir.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}
+
+ test("Operator metadata path non-existence should not fail query") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ StopStream
+ )
+
+ // Delete operator metadata path
+ val metadataPath = new Path(checkpointDir.toString,
s"state/0/_metadata/metadata")
+ val fm = CheckpointFileManager.create(new
Path(checkpointDir.getCanonicalPath), hadoopConf)
+ fm.delete(metadataPath)
+
+ // Restart the query
+ testStream(aggregated, Complete)(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 2)),
+ StopStream
+ )
+ }
+ }
+
+ test("Changing operator - " +
+ "replace, add, remove operators will trigger error with debug message") {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[Int]
+ val stream = inputData.toDF().withColumn("eventTime",
timestamp_seconds($"value"))
+
+ testStream(stream.dropDuplicates())(
+ StartStream(checkpointLocation = checkpointDir.toString),
+ AddData(inputData, 1),
+ ProcessAllAvailable(),
+ StopStream
+ )
+
+ def checkOpChangeError(OpsInMetadataSeq: Seq[String],
Review Comment:
nit: if you are using more than two lines to define the method, param should
start at second line of definition. (In other words, all params should appear
at the same indentation.)
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent
Also, param should start with lowercase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]