Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21700#discussion_r201866930
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
---
@@ -99,43 +102,84 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))
- updateVersionTo(3)
+ // this trigger exceeding cache and 1 will be evicted
+ currentVersion = updateVersionTo(provider, currentVersion, 3)
assert(getData(provider) === Set("a" -> 3))
loadedMaps = provider.getClonedLoadedMaps()
- assert(loadedMaps.size() === 3)
+ assert(loadedMaps.size() === 2)
assert(loadedMaps.firstKey() === 3L)
- assert(loadedMaps.lastKey() === 1L)
+ assert(loadedMaps.lastKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
+ }
+
+ test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set
to 1") {
+ val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
+ numOfVersToRetainInMemory = 1)
+
+ var currentVersion = 0
+
+ def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
--- End diff --
I've just allowed redundant function definition cause there's no way to use
`provider.MapType` in parameter type unless provider is defined. If we really
want to get rid of redundant function definition, we may have to change it to
ConcurrentMap directly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]