xuanyuanking opened a new pull request #26164: [SPARK-21492][SQL] Fix memory 
leak in SortMergeJoin
URL: https://github.com/apache/spark/pull/26164
 
 
   ### What changes were proposed in this pull request?
   We shall have a new mechanism that the downstream operators may notify its 
parents that they may release the output data stream. In this PR, we implement 
the mechanism as below:
   - Add function named `cleanupResources` in SparkPlan, which default call 
children's `cleanupResources` function, the operator which need a resource 
cleanup should rewrite this with the self cleanup and also call 
`super.cleanupResources`, like SortExec in this PR.
   - Add logic support on the trigger side, in this PR is SortMergeJoinExec, 
which make sure and call the `cleanupResources` to do the cleanup job for all 
its upstream(children) operator.
   - Add a conf `spark.sql.sortMergeJoinExec.eagerCleanupResources` to control 
this behavior for safety, default value is true.
   
   ### Why are the changes needed?
   Bugfix for SortMergeJoin memory leak, and implement a general framework for 
SparkPlan resource cleanup.
   
   ### Does this PR introduce any user-facing change?
   No.
   
   ### How was this patch tested?
   UT: Add new test suite JoinWithResourceCleanSuite to check both standard and 
code generation scenario.
   
   Integrate Test: Test with driver/executor default memory set 1g, local mode 
10 thread. Set `spark.sql.sortMergeJoinExec.eagerCleanupResources=fasle` the 
below test(thanks @taosaildrone for providing this test  
[here](https://github.com/apache/spark/pull/23762#issuecomment-463303175)) will 
fail by OOM, while open the conf it'll pass.
   
   ```
   from pyspark.sql.functions import rand, col
   
   spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   # spark.conf.set("spark.sql.sortMergeJoinExec.eagerCleanupResources", "true")
   
   r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
   r1 = r1.withColumn('value', rand())
   r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
   r2 = r2.withColumn('value2', rand())
   joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
   joined = joined.coalesce(1)
   joined.explain()
   joined.show()
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to