HeartSaVioR commented on issue #27333: [SPARK-29438][SS][FOLLOWUP] Add regression tests for Streaming Aggregation and flatMapGroupsWithState URL: https://github.com/apache/spark/pull/27333#issuecomment-580955411 > If possible, introduce the same bug in aggregate and mapGroupsWithState and see whether these tests fail. You may be able to introduce this by inject the incorrect partition id in the StateStoreRDD when getting the state store. Confirmed below code change makes new tests fail for same reason of SPARK-29438: ``` diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 4a69a48fed..6e58017a64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -69,9 +69,6 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { var store: StateStore = null - val storeProviderId = StateStoreProviderId( - StateStoreId(checkpointLocation, operatorId, partition.index), - queryRunId) // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. @@ -85,6 +82,16 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( storeVersion } + val newPartIdx = if (currentVersion > 0) { + 2 + partition.index + } else { + partition.index + } + + val storeProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, newPartIdx), + queryRunId) + store = StateStore.get( storeProviderId, keySchema, valueSchema, indexOrdinal, currentVersion, storeConf, hadoopConfBroadcast.value.value) ``` > Also for completeness sake, we should add a test for streaming deduplication. OK I'll add it sooner. Thanks!
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
