HeartSaVioR commented on a change in pull request #30210:
URL: https://github.com/apache/spark/pull/30210#discussion_r521376594
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1382,6 +1382,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED =
+ buildConf("spark.sql.streaming.statefulOperator.correctnessCheck")
Review comment:
I guess we need to add `.enabled` according to the naming policy, and
also it sounds more natural to say `checkCorrectness`.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
}
}
+ /**
+ * Checks for possible correctness issue in chained stateful operators. The
behavior is
+ * controlled by SQL config
`spark.sql.streaming.statefulOperator.correctnessCheck"`. Once it
Review comment:
nit: remove `"`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
}
}
+ /**
+ * Checks for possible correctness issue in chained stateful operators. The
behavior is
+ * controlled by SQL config
`spark.sql.streaming.statefulOperator.correctnessCheck"`. Once it
+ * is enabled, an analysis exception will be thrown. Otherwise, Spark will
just print a warning
+ * message which is the behavior before Spark 3.1.0.
Review comment:
I'm not sure about the practice here so just a 2 cents - as we put the
information to both config and migration doc, it feels a bit verbose to repeat
` which is the behavior before Spark 3.1.0`.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
##########
@@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
def testWithAllStateVersions(name: String)(func: => Unit): Unit = {
for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) {
test(s"$name - state format version $version") {
- withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key ->
version.toString) {
+ withSQLConf(
+ SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key ->
version.toString,
+ SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key ->
"false") {
Review comment:
If we need to turn off the config to run the test, I think the test
needs to be reconsidered - we may give misleading and discouraged patterns from
tests.
That's not necessarily to be addressed in this PR altogether (follow-up or a
new JIRA issue), as removing tests have been always a concern and that leads
another kind of discussion.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1382,6 +1382,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED =
+ buildConf("spark.sql.streaming.statefulOperator.correctnessCheck")
Review comment:
And once we change we also need to change UnsupportedOperationChecker as
well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]