Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r42953237
--- Diff:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
---
@@ -312,107 +320,46 @@ private void growPointerArrayIfNecessary() throws
IOException {
* memory from the {@link ShuffleMemoryManager} and spill if the
requested memory can not be
* obtained.
*
- * @param requiredSpace the required space in the data page, in bytes,
including space for storing
+ * @param required the required space in the data page, in bytes,
including space for storing
* the record size. This must be less than or equal
to the page size (records
* that exceed the page size are handled via a
different code path which uses
* special overflow pages).
*/
- private void acquireNewPageIfNecessary(int requiredSpace) throws
IOException {
- assert (requiredSpace <= pageSizeBytes);
- if (requiredSpace > freeSpaceInCurrentPage) {
- logger.trace("Required space {} is less than free space in current
page ({})", requiredSpace,
- freeSpaceInCurrentPage);
+ private void acquireNewPageIfNecessary(int required) throws IOException {
+ if (currentPage == null ||
+ pageCursor + required > currentPage.getBaseOffset() +
currentPage.size()) {
// TODO: we should track metrics on the amount of space wasted when
we roll over to a new page
// without using the free space at the end of the current page. We
should also do this for
// BytesToBytesMap.
- if (requiredSpace > pageSizeBytes) {
- throw new IOException("Required space " + requiredSpace + " is
greater than page size (" +
- pageSizeBytes + ")");
- } else {
- acquireNewPage();
+ long granted = shuffleMemoryManager.tryToAcquire(Math.max(required,
pageSizeBytes), this);
+ if (granted < required) {
+ shuffleMemoryManager.release(granted, this);
+ throw new IOException("Unable to acquire " + required + " bytes of
memory");
}
+ currentPage = taskMemoryManager.allocatePage(granted);
+ pageCursor = currentPage.getBaseOffset();
+ allocatedPages.add(currentPage);
}
}
/**
- * Acquire a new page from the {@link ShuffleMemoryManager}.
- *
- * If there is not enough space to allocate the new page, spill all
existing ones
- * and try again. If there is still not enough space, report error to
the caller.
- */
- private void acquireNewPage() throws IOException {
- final long memoryAcquired =
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
- if (memoryAcquired < pageSizeBytes) {
- shuffleMemoryManager.release(memoryAcquired);
- spill();
- final long memoryAcquiredAfterSpilling =
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
- if (memoryAcquiredAfterSpilling != pageSizeBytes) {
- shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
- throw new IOException("Unable to acquire " + pageSizeBytes + "
bytes of memory");
- }
- }
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = pageSizeBytes;
- allocatedPages.add(currentPage);
- }
-
- /**
* Write a record to the sorter.
*/
- public void insertRecord(
- Object recordBaseObject,
- long recordBaseOffset,
- int lengthInBytes,
- long prefix) throws IOException {
+ public void insertRecord(Object recordBase, long recordOffset, int
length, long prefix)
+ throws IOException {
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
- final int totalSpaceRequired = lengthInBytes + 4;
-
- // --- Figure out where to insert the new record
----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize =
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
--- End diff --
Do we still need this `roundNumberOfBytesToNearestWord` method now that the
memory allocators do them rounding themselves?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]