huanliwang-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589740422
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
override def close(): Unit = {
- synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+ synchronized {
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }
Review Comment:
the implementation of treeMap.clear is
```
public void clear() {
modCount++;
size = 0;
root = null;
}
```
where
```
private transient Entry<K,V> root;
```
so i think it just resets for the `root` and won't clear the values
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -388,6 +388,33 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
+ test("SPARK-48105: state store unload/close happens during the maintenance")
{
+ tryWithProviderResource(
+ newStoreProvider(opId = Random.nextInt(), partition = 0,
minDeltasForSnapshot = 1)) {
+ provider =>
+ val store =
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+ val keys = (1 to 20).map(i => ("a" + i))
+ keys.foreach(put(store, _, 0, 0))
+ // commit state store with 20 keys.
+ store.commit()
+ // get the state store iterator: mimic the case which the iterator is
hold in the
+ // maintenance thread.
+ val storeIterator = store.iterator()
+ // If the provider is loaded in another executor, it will be unloaded
and closed in
+ // current executor.
+ provider.close()
+ // the store iterator should still be valid as the maintenance thread
may have already
+ // hold it and is doing snapshotting even thought the state store is
unloaded.
+ val outputKeys = new mutable.ArrayBuffer[String]
+ while (storeIterator.hasNext) {
Review Comment:
yes, it fails consistently without the change in `close` function.
```
- SPARK-48105: state store unload/close happens during the maintenance ***
FAILED *** (725 milliseconds)
[info] Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16",
"a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did
not equal ArrayBuffer("a1", "a11", "a12", "a13", "a15", "a18", "a19", "a4",
"a6", "a8", "a9") (StateStoreSuite.scala:424)
[info] Analysis:
[info] Vector1(1: "a10" -> "a11", 2: "a11" -> "a12", 3: "a12" -> "a13", 4:
"a13" -> "a15", 5: "a14" -> "a18", 6: "a15" -> "a19", 7: "a16" -> "a4", 8:
"a17" -> "a6", 9: "a18" -> "a8", 10: "a19" -> "a9", 11: "a2" -> , 12: "a20" ->
, 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18:
"a8" -> , 19: "a9" -> )
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]