Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21239#discussion_r186321416
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
---
@@ -71,8 +72,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
StateStoreId(checkpointLocation, operatorId, partition.index),
queryRunId)
+ // If we're in continuous processing mode, we should get the store
version for the current
+ // epoch rather than the one at planning time.
+ val currentVersion = EpochTracker.getCurrentEpoch match {
+ case -1 => storeVersion
--- End diff --
I dont like the fact that -1 is used as value for non-existence, especially
since this value is present deep inside the EpochTracker. Might as well use
Options to return None. Then the use of -1 will be contained within the
EpochTracker class.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]