[jira] [Commented] (SPARK-25998) TorrentBroadcast holds strong reference to broadcast object
[ https://issues.apache.org/jira/browse/SPARK-25998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769708#comment-16769708 ] t oo commented on SPARK-25998: -- any chance for a backport to 2.x? > TorrentBroadcast holds strong reference to broadcast object > --- > > Key: SPARK-25998 > URL: https://issues.apache.org/jira/browse/SPARK-25998 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Brandon Krieger >Assignee: Brandon Krieger >Priority: Major > Fix For: 3.0.0 > > > If we do a large number of broadcast joins while holding onto the Dataset > reference, it will hold onto a large amount of memory for the value of the > broadcast object. The broadcast object is also held in the MemoryStore, but > that will clean itself up to prevent its memory usage from going over a > certain level. In my use case, I don't want to release the reference to the > Dataset (which would allow the broadcast object to be GCed) because I want to > be able to unpersist it at some point in the future (when it is no longer > relevant). > See the following repro in Spark shell: > {code:java} > import org.apache.spark.sql.functions._ > import org.apache.spark.SparkEnv > val startDf = (1 to 100).toDF("num").withColumn("num", > $"num".cast("string")).cache() > val leftDf = startDf.withColumn("num", concat($"num", lit("0"))) > val rightDf = startDf.withColumn("num", concat($"num", lit("1"))) > val broadcastJoinedDf = leftDf.join(broadcast(rightDf), > leftDf.col("num").eqNullSafe(rightDf.col("num"))) > broadcastJoinedDf.count > // Take a heap dump, see UnsafeHashedRelation with hard references in > MemoryStore and Dataset > // Force the MemoryStore to clear itself > SparkEnv.get.blockManager.stop > // Trigger GC, then take another Heap Dump. The UnsafeHashedRelation is now > referenced only by the Dataset. > {code} > If we make the TorrentBroadcast hold a weak reference to the broadcast > object, the second heap dump will show nothing; the UnsafeHashedRelation has > been GCed. > Given that the broadcast object can be reloaded from the MemoryStore, it > seems like it would be alright to use a WeakReference instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25998) TorrentBroadcast holds strong reference to broadcast object
[ https://issues.apache.org/jira/browse/SPARK-25998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682017#comment-16682017 ] Apache Spark commented on SPARK-25998: -- User 'bkrieger' has created a pull request for this issue: https://github.com/apache/spark/pull/22995 > TorrentBroadcast holds strong reference to broadcast object > --- > > Key: SPARK-25998 > URL: https://issues.apache.org/jira/browse/SPARK-25998 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Brandon Krieger >Priority: Major > > If we do a large number of broadcast joins while holding onto the Dataset > reference, it will hold onto a large amount of memory for the value of the > broadcast object. The broadcast object is also held in the MemoryStore, but > that will clean itself up to prevent its memory usage from going over a > certain level. In my use case, I don't want to release the reference to the > Dataset (which would allow the broadcast object to be GCed) because I want to > be able to unpersist it at some point in the future (when it is no longer > relevant). > See the following repro in Spark shell: > {code:java} > import org.apache.spark.sql.functions._ > import org.apache.spark.SparkEnv > val startDf = (1 to 100).toDF("num").withColumn("num", > $"num".cast("string")).cache() > val leftDf = startDf.withColumn("num", concat($"num", lit("0"))) > val rightDf = startDf.withColumn("num", concat($"num", lit("1"))) > val broadcastJoinedDf = leftDf.join(broadcast(rightDf), > leftDf.col("num").eqNullSafe(rightDf.col("num"))) > broadcastJoinedDf.count > // Take a heap dump, see UnsafeHashedRelation with hard references in > MemoryStore and Dataset > // Force the MemoryStore to clear itself > SparkEnv.get.blockManager.stop > // Trigger GC, then take another Heap Dump. The UnsafeHashedRelation is now > referenced only by the Dataset. > {code} > If we make the TorrentBroadcast hold a weak reference to the broadcast > object, the second heap dump will show nothing; the UnsafeHashedRelation has > been GCed. > Given that the broadcast object can be reloaded from the MemoryStore, it > seems like it would be alright to use a WeakReference instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org