HeartSaVioR commented on issue #27333: [SPARK-29438][SS][FOLLOWUP] Add 
regression tests for Streaming Aggregation and flatMapGroupsWithState
URL: https://github.com/apache/spark/pull/27333#issuecomment-580955411
 
 
   > If possible, introduce the same bug in aggregate and mapGroupsWithState 
and see whether these tests fail. You may be able to introduce this by inject 
the incorrect partition id in the StateStoreRDD when getting the state store.
   
   Confirmed below code change makes new tests fail for same reason of 
SPARK-29438:
   
   ```
   diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
   index 4a69a48fed..6e58017a64 100644
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
   @@ -69,9 +69,6 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
   
      override def compute(partition: Partition, ctxt: TaskContext): 
Iterator[U] = {
        var store: StateStore = null
   -    val storeProviderId = StateStoreProviderId(
   -      StateStoreId(checkpointLocation, operatorId, partition.index),
   -      queryRunId)
   
        // If we're in continuous processing mode, we should get the store 
version for the current
        // epoch rather than the one at planning time.
   @@ -85,6 +82,16 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
          storeVersion
        }
   
   +    val newPartIdx = if (currentVersion > 0) {
   +      2 + partition.index
   +    } else {
   +      partition.index
   +    }
   +
   +    val storeProviderId = StateStoreProviderId(
   +      StateStoreId(checkpointLocation, operatorId, newPartIdx),
   +      queryRunId)
   +
        store = StateStore.get(
          storeProviderId, keySchema, valueSchema, indexOrdinal, currentVersion,
          storeConf, hadoopConfBroadcast.value.value)
   ```
   
   > Also for completeness sake, we should add a test for streaming 
deduplication.
   
   OK I'll add it sooner. Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to