Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21733#discussion_r208489469
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -871,6 +871,16 @@ object SQLConf {
.intConf
.createWithDefault(2)
+ val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+ buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
+ .internal()
+ .doc("State format version used by streaming aggregation operations
in a streaming query. " +
+ "State between versions are tend to be incompatible, so state
format version shouldn't " +
+ "be modified after running.")
+ .intConf
+ .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
+ .createWithDefault(2)
--- End diff --
If you intend to change the default to the new version, then you HAVE TO
add a test that ensures that existing streaming aggregation checkpoints
(generated in Spark 2.3.1 for example) will not fail to recover.
Similar to this test -
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala#L883
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]