Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/21502#discussion_r193724774
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
---
@@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with
SQLTestUtils {
}
}
+ test("SPARK-22575: remove allocated blocks when they are not needed
anymore") {
+ val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+ val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+ case b: BroadcastHashJoinExec => b
+ }.size
+ assert(numBroadCastHashJoin > 0)
+ df3.collect()
+ df3.destroy()
+ val blockManager = sparkContext.env.blockManager
+ val blocks = blockManager.getMatchingBlockIds(blockId => {
+ blockId.isBroadcast &&
blockManager.getStatus(blockId).get.storageLevel.deserialized
+ }).distinct
+ val blockValues = blocks.flatMap { id =>
+ blockManager.getSingle[Any](id)
+ }
--- End diff --
Here maybe the root cause for the unstable UT failure and the block can't
be deleted soon, I added a sleep and can pass every times, you can have a try.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]