cloud-fan commented on code in PR #56293:
URL: https://github.com/apache/spark/pull/56293#discussion_r3432991495
##########
core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java:
##########
@@ -394,33 +556,107 @@ private MemoryBlock allocatePage(
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
+ boolean pageAllocated = false;
+ int retryCount = 0;
+ long allocationSize = acquired;
+ long partialAllocationSize = 0;
+ boolean tryingPartialAllocation = false;
+ boolean minimumRetryAfterPartialAllocationFailure = false;
try {
- page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
- } catch (OutOfMemoryError e) {
- if (retryCount == 0) {
- logger.warn("Failed to allocate a page ({} bytes) for {} times, try
again.", e,
- MDC.of(LogKeys.PAGE_SIZE, acquired),
- MDC.of(LogKeys.NUM_RETRY, retryCount));
- } else {
- logger.warn("Failed to allocate a page ({} bytes) for {} times, try
again.",
- MDC.of(LogKeys.PAGE_SIZE, acquired),
- MDC.of(LogKeys.NUM_RETRY, retryCount));
+ while (true) {
+ try {
+ page = tungstenMemoryAllocator.allocate(allocationSize);
+ break;
+ } catch (OutOfMemoryError e) {
+ logPageAllocationFailure(allocationSize, retryCount, e);
+ if (tryingPartialAllocation) {
+ if (minimumSize > 0 && minimumSize < allocationSize) {
+ // Reuse the retained grant for one final attempt at the
caller's usable minimum.
+ long surplus = Math.subtractExact(acquired, minimumSize);
+ releaseExecutionMemory(surplus, consumer);
+ acquired = minimumSize;
+ allocationSize = minimumSize;
+ minimumRetryAfterPartialAllocationFailure = true;
+ retryCount++;
+ continue;
+ }
+ return null;
+ }
+ long released = recoverFromPageAllocationFailure(allocationSize,
consumer);
+ if (released > 0) {
+ long remaining = allocationSize - partialAllocationSize;
+ partialAllocationSize += Math.min(released, remaining);
+ } else if (partialAllocationSize > 0 && partialAllocationSize <
allocationSize) {
+ // Preserve one bounded attempt to combine the memory made
available by spilling with
+ // any remaining free-tail grant, then fall back to a partial page
for callers that can
+ // use one. The additional grant may already include the spilled
memory, so do not add
+ // it to partialAllocationSize.
+ long additionalAcquired =
+ acquireAdditionalExecutionMemoryForPageAllocation(size,
consumer);
+ if (additionalAcquired > 0) {
+ long overlap =
+ additionalAcquired >= partialAllocationSize ?
partialAllocationSize : 0L;
+ if (overlap > 0) {
+ releaseExecutionMemory(overlap, consumer);
+ }
+ acquired = Math.addExact(acquired, additionalAcquired - overlap);
+ }
+ allocationSize =
+ additionalAcquired > 0 ? additionalAcquired :
partialAllocationSize;
+ tryingPartialAllocation = true;
+ } else if (partialAllocationSize == 0) {
+ // Preserve one bounded attempt to acquire a smaller free-tail
grant. The previous
+ // recursive implementation could return a partial page this way
after retaining the
+ // rejected grant, but could also retry without bound.
+ long additionalAcquired =
+ acquireAdditionalExecutionMemoryForPageAllocation(size,
consumer);
+ if (additionalAcquired <= 0) {
+ return null;
+ }
+ acquired = Math.addExact(acquired, additionalAcquired);
+ allocationSize = additionalAcquired;
+ tryingPartialAllocation = true;
+ } else if (minimumSize > 0 && minimumSize < allocationSize) {
+ // The original grant is still reserved. If the caller padded a
smaller allocation to
+ // the configured page size, make one bounded attempt at the
minimum usable size without
Review Comment:
This minimum-drop branch returns an exact-fit page (`allocationSize =
minimumSize`) just like the new branch at L573-581, but it doesn't set
`minimumRetryAfterPartialAllocationFailure`, so the page is never recorded in
`pagesAllocatedFromMinimumRetry` (set only at L639). It fires after spilling
made progress yet the allocator still OOMs and the surplus is released — i.e.
execution memory is available — so its page is pathological in exactly the same
way as the branch it does mark.
Because `BytesToBytesMap.acquireNewPage` rejects only *marked* pages (L847),
it will accept a page from this branch and re-enter the per-record
page-table-slot exhaustion this delta is meant to prevent, reachable whenever a
second spillable consumer is present (so recovery spilling makes progress).
Consider setting the flag in both minimum-drop branches, e.g. via a shared
helper. (No test currently drives this path for B2B — the new B2B tests inject
allocator OOM but register no spillable peer, so they only exercise L573.)
##########
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java:
##########
@@ -836,11 +836,19 @@ public boolean append(Object kbase, long koff, int klen,
Object vbase, long voff
* @return whether there is enough space to allocate the new page.
*/
private boolean acquireNewPage(long required) {
+ final MemoryBlock page;
try {
- currentPage = allocatePage(required);
+ page = allocatePage(required);
} catch (SparkOutOfMemoryError e) {
return false;
}
+ // Retaining exact-fit minimum-retry pages would consume one page-table
slot per map entry.
+ if (required < pageSizeBytes && page.size() == required &&
+ isPageAllocationFromMinimumRetry(page)) {
Review Comment:
This rejection is gated on `isPageAllocationFromMinimumRetry`, which
`TaskMemoryManager` sets only for the new branch (L573-581), not for the
pre-existing minimum-drop at L619-627 — yet both yield pathological exact-fit
pages. As written, an exact-fit page from the L619 path slips through and is
accepted, re-introducing the page-table-slot exhaustion in spill-active
scenarios. See the comment on `TaskMemoryManager.java:621` for the full trace.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]