ankurdave opened a new pull request #29744:
URL: https://github.com/apache/spark/pull/29744
### What changes were proposed in this pull request?
When BytesToBytesMap is close to `MAX_CAPACITY` and needs to grow, `numKeys
>= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false.
This correctly prevents the map from growing, but `canGrowArray` incorrectly
remains true. Therefore the map keeps accepting new keys and exceeds its growth
threshold. If we attempt to spill the map in this state, the
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By
this point the task has typically consumed all available memory, so the
allocation of the new pointer array is likely to fail.
This PR fixes the issue by setting `canGrowArray` to false in this case.
This prevents the map from accepting new elements when it cannot grow to
accommodate them.
### Why are the changes needed?
Without this change, hash aggregations will fail when the number of groups
per task is close to `MAX_CAPACITY = 2^29` (approximately 500 million), and
when the grouping aggregation is the only memory-consuming operator in its
stage.
For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl`
fails when `tbl` contains 1 billion distinct values and when
`spark.sql.shuffle.partitions=1`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Reproducing this issue requires building a very large BytesToBytesMap.
Because this is infeasible to do in a unit test, this PR was tested manually by
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the
test failed in 8.5 minutes. With this PR, the test passes in 1.5 minutes.
```java
public abstract class AbstractBytesToBytesMapSuite {
// ...
@Test
public void stopAcceptingNewKeysNearMaxCapacity() {
TestMemoryManager memoryManager2 =
new TestMemoryManager(
new SparkConf()
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
.set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 *
1024L)
.set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
.set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
TaskMemoryManager taskMemoryManager2 = new
TaskMemoryManager(memoryManager2, 0);
final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2,
1024, pageSizeBytes);
try {
// Fill the map up to its maximum capacity.
for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
if (i % (1024 * 1024) == 0) System.out.println("Inserting element "
+ i);
final long[] value = new long[]{i};
BytesToBytesMap.Location loc = map.lookup(value,
Platform.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
boolean success =
loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value,
Platform.LONG_ARRAY_OFFSET, 8);
if (!success) break;
}
// The map should stop accepting new keys once it has reached its
growth threshold.
long capacity = map.getArray().size() / 2;
Assert.assertTrue(map.numKeys() <= capacity / 2);
map.free();
} finally {
map.free();
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]