HeartSaVioR commented on a change in pull request #30210:
URL: https://github.com/apache/spark/pull/30210#discussion_r521376594



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1382,6 +1382,21 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED =
+    buildConf("spark.sql.streaming.statefulOperator.correctnessCheck")

Review comment:
       I guess we need to add `.enabled` according to the naming policy, and 
also it sounds more natural to say `checkCorrectness`.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
+   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.correctnessCheck"`. Once it

Review comment:
       nit: remove `"`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
     }
   }
 
+  /**
+   * Checks for possible correctness issue in chained stateful operators. The 
behavior is
+   * controlled by SQL config 
`spark.sql.streaming.statefulOperator.correctnessCheck"`. Once it
+   * is enabled, an analysis exception will be thrown. Otherwise, Spark will 
just print a warning
+   * message which is the behavior before Spark 3.1.0.

Review comment:
       I'm not sure about the practice here so just a 2 cents - as we put the 
information to both config and migration doc, it feels a bit verbose to repeat 
` which is the behavior before Spark 3.1.0`. 

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
##########
@@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
   def testWithAllStateVersions(name: String)(func: => Unit): Unit = {
     for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) {
       test(s"$name - state format version $version") {
-        withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> 
version.toString) {
+        withSQLConf(
+            SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> 
version.toString,
+            SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> 
"false") {

Review comment:
       If we need to turn off the config to run the test, I think the test 
needs to be reconsidered - we may give misleading and discouraged patterns from 
tests.
   
   That's not necessarily to be addressed in this PR altogether (follow-up or a 
new JIRA issue), as removing tests have been always a concern and that leads 
another kind of discussion.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1382,6 +1382,21 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED =
+    buildConf("spark.sql.streaming.statefulOperator.correctnessCheck")

Review comment:
       And once we change we also need to change UnsupportedOperationChecker as 
well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to