Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21700#discussion_r202925739
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
---
@@ -99,43 +102,84 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))
- updateVersionTo(3)
+ // this trigger exceeding cache and 1 will be evicted
+ currentVersion = updateVersionTo(provider, currentVersion, 3)
assert(getData(provider) === Set("a" -> 3))
loadedMaps = provider.getClonedLoadedMaps()
- assert(loadedMaps.size() === 3)
+ assert(loadedMaps.size() === 2)
assert(loadedMaps.firstKey() === 3L)
- assert(loadedMaps.lastKey() === 1L)
+ assert(loadedMaps.lastKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
+ }
+
+ test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set
to 1") {
+ val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
+ numOfVersToRetainInMemory = 1)
+
+ var currentVersion = 0
+
+ def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
--- End diff --
+1 one using making it ConcurrentMap. Maybe even better, you can use scala
implicit classses to add methods to `HDFSBackedStateStoreProvider`
```
implicit class ProviderHelper(provider: StateStoreProvider) {
def toStringIntMap(): Map[String, Int] = { .... }
}
```
This should avoid this problem. Either way, I hate having these duplicate
methods, so we should fix it one way or the other.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]