c21 commented on a change in pull request #32242:
URL: https://github.com/apache/spark/pull/32242#discussion_r616409848
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
##########
@@ -87,6 +87,18 @@ class RowBasedHashMapGenerator(
|
| buckets = new int[numBuckets];
| java.util.Arrays.fill(buckets, -1);
+ |
+ | // Register a cleanup task with TaskContext to ensure that memory
is guaranteed to be
+ | // freed at the end of the task. This is necessary to avoid memory
leaks in when the
+ | // downstream operator does not fully consume the aggregation
map's output
+ | // (e.g. aggregate followed by limit).
+ | taskContext.addTaskCompletionListener(
Review comment:
This is needed as we need to clean up resource for query with limit and
hash aggregate. Generally no query does limit early stop for partial aggregate
so it's not a problem before this PR. However, we do have limit early stop for
final aggregate, and it causes test failure of `SQLQuerySuite.SPARK-21743:
top-most limit should not cause memory leak` in
https://github.com/c21/spark/runs/2386397792?check_suite_focus=true . So here
adding a listener to clean up first level hash map resource similar to second
level hash map -`UnsafeFixedWidthAggregationMap` in
https://github.com/apache/spark/pull/21738 .
Use older style of Java syntax here instead of lambda expression, as janino
does not support lambda expression compilation yet -
https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/UnitCompiler.java#L6998
.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]