ankurdave opened a new pull request #29753:
URL: https://github.com/apache/spark/pull/29753
### What changes were proposed in this pull request?
When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold,
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY`
is false. This correctly prevents the map from growing, but `canGrowArray`
incorrectly remains true. Therefore the map keeps accepting new keys and
exceeds its growth threshold. If we attempt to spill the map in this state, the
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By
this point the task has typically consumed all available memory, so the
allocation of the new pointer array is likely to fail.
This PR fixes the issue by setting `canGrowArray` to false in this case.
This prevents the map from accepting new elements when it cannot grow to
accommodate them.
### Why are the changes needed?
Without this change, hash aggregations will fail when the number of groups
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million),
and when the grouping aggregation is the only memory-consuming operator in its
stage.
For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl`
fails when `tbl` contains 1 billion distinct values and when
`spark.sql.shuffle.partitions=1`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Reproducing this issue requires building a very large BytesToBytesMap.
Because this is infeasible to do in a unit test, this PR was tested manually by
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.
```java
public abstract class AbstractBytesToBytesMapSuite {
// ...
@Test
public void respectGrowthThresholdAtMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 *
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2,
1024, pageSizeBytes);
try {
// Insert keys into the map until it stops accepting new keys.
for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element "
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value,
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value,
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
}
// The map should grow to its max capacity.
long capacity = map.getArray().size() / 2;
Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);
// The map should stop accepting new keys once it has reached its
growth
// threshold, which is half the max capacity.
Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);
map.free();
} finally {
map.free();
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]