Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21502#discussion_r193742604
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
    @@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
         }
       }
     
    +  test("SPARK-22575: remove allocated blocks when they are not needed 
anymore") {
    +    val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value")
    +    val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
    +    val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
    +    val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
    +      case b: BroadcastHashJoinExec => b
    +    }.size
    +    assert(numBroadCastHashJoin > 0)
    +    df3.collect()
    +    df3.destroy()
    +    val blockManager = sparkContext.env.blockManager
    +    val blocks = blockManager.getMatchingBlockIds(blockId => {
    +      blockId.isBroadcast && 
blockManager.getStatus(blockId).get.storageLevel.deserialized
    +    }).distinct
    +    val blockValues = blocks.flatMap { id =>
    +      blockManager.getSingle[Any](id)
    +    }
    --- End diff --
    
    I run the test 10000 times and I cannot reproduce the issue locally. Can 
you?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to