viirya commented on a change in pull request #31219:
URL: https://github.com/apache/spark/pull/31219#discussion_r559271273
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
##########
@@ -67,63 +63,21 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
require(!StateStore.isMaintenanceRunning)
}
- def updateVersionTo(
- provider: StateStoreProvider,
- currentVersion: Int,
- targetVersion: Int): Int = {
- var newCurrentVersion = currentVersion
- for (i <- newCurrentVersion until targetVersion) {
- newCurrentVersion = incrementVersion(provider, i)
- }
- require(newCurrentVersion === targetVersion)
- newCurrentVersion
- }
-
- def incrementVersion(provider: StateStoreProvider, currentVersion: Int): Int
= {
- val store = provider.getStore(currentVersion)
- put(store, "a", currentVersion + 1)
- store.commit()
- currentVersion + 1
- }
-
- def checkLoadedVersions(
- loadedMaps: util.SortedMap[Long, ProviderMapType],
- count: Int,
- earliestKey: Long,
- latestKey: Long): Unit = {
- assert(loadedMaps.size() === count)
- assert(loadedMaps.firstKey() === earliestKey)
- assert(loadedMaps.lastKey() === latestKey)
- }
-
- def checkVersion(
- loadedMaps: util.SortedMap[Long, ProviderMapType],
- version: Long,
- expectedData: Map[String, Int]): Unit = {
-
- val originValueMap = loadedMaps.get(version).asScala.map { entry =>
- rowToString(entry._1) -> rowToInt(entry._2)
- }.toMap
-
- assert(originValueMap === expectedData)
- }
-
test("retaining only two latest versions when
MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") {
- val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
- numOfVersToRetainInMemory = 2)
+ val provider = newStoreProvider(minDeltasForSnapshot = 10,
numOfVersToRetainInMemory = 2)
var currentVersion = 0
// commit the ver 1 : cache will have one element
currentVersion = incrementVersion(provider, currentVersion)
- assert(getData(provider) === Set("a" -> 1))
+ assert(getLatestData(provider) === Set("a" -> 1))
Review comment:
`getData(provider)` actually calls `getData(provider, -1)` so it is
semantically equal to `getLatestData(provider)`. Because I move `getData` to
`StateStoreSuiteBase` as an abstract method here, I cannot keep using default
parameter when calling them in `StateStoreSuiteBase`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]