Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21738#discussion_r201208356
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
---
@@ -82,27 +82,34 @@ public static boolean
supportsAggregationBufferSchema(StructType schema) {
* @param emptyAggregationBuffer the default value for new keys (a
"zero" of the agg. function)
* @param aggregationBufferSchema the schema of the aggregation buffer,
used for row conversion.
* @param groupingKeySchema the schema of the grouping key, used for row
conversion.
- * @param taskMemoryManager the memory manager used to allocate our
Unsafe memory structures.
+ * @param taskContext the current task context.
* @param initialCapacity the initial capacity of the map (a sizing hint
to avoid re-hashing).
* @param pageSizeBytes the data page size, in bytes; limits the maximum
record size.
*/
public UnsafeFixedWidthAggregationMap(
InternalRow emptyAggregationBuffer,
StructType aggregationBufferSchema,
StructType groupingKeySchema,
- TaskMemoryManager taskMemoryManager,
+ TaskContext taskContext,
int initialCapacity,
long pageSizeBytes) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.currentAggregationBuffer = new
UnsafeRow(aggregationBufferSchema.length());
this.groupingKeyProjection =
UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
- this.map =
- new BytesToBytesMap(taskMemoryManager, initialCapacity,
pageSizeBytes, true);
+ this.map = new BytesToBytesMap(
+ taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes,
true);
// Initialize the buffer for aggregation value
final UnsafeProjection valueProjection =
UnsafeProjection.create(aggregationBufferSchema);
this.emptyAggregationBuffer =
valueProjection.apply(emptyAggregationBuffer).getBytes();
+
+ // Register a cleanup task with TaskContext to ensure that memory is
guaranteed to be freed at
+ // the end of the task. This is necessary to avoid memory leaks in
when the downstream operator
+ // does not fully consume the sorter's output (e.g. sort followed by
limit).
--- End diff --
super nit: the sorter -> the aggregation, sort followed by limit ->
aggregation followed by limit.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]