Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21700#discussion_r202927538
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
    @@ -99,43 +102,84 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
         assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
         assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))
     
    -    updateVersionTo(3)
    +    // this trigger exceeding cache and 1 will be evicted
    +    currentVersion = updateVersionTo(provider, currentVersion, 3)
         assert(getData(provider) === Set("a" -> 3))
         loadedMaps = provider.getClonedLoadedMaps()
    -    assert(loadedMaps.size() === 3)
    +    assert(loadedMaps.size() === 2)
         assert(loadedMaps.firstKey() === 3L)
    -    assert(loadedMaps.lastKey() === 1L)
    +    assert(loadedMaps.lastKey() === 2L)
         assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
         assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
    +  }
    +
    +  test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set 
to 1") {
    +    val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
    +      numOfVersToRetainInMemory = 1)
    +
    +    var currentVersion = 0
    +
    +    def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
    --- End diff --
    
    On second thought, if you make the convenience method `checkVersion` i 
mentioned above, you may not have to do this at all. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to