Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9241#discussion_r42953237
  
    --- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
    @@ -312,107 +320,46 @@ private void growPointerArrayIfNecessary() throws 
IOException {
        * memory from the {@link ShuffleMemoryManager} and spill if the 
requested memory can not be
        * obtained.
        *
    -   * @param requiredSpace the required space in the data page, in bytes, 
including space for storing
    +   * @param required the required space in the data page, in bytes, 
including space for storing
        *                      the record size. This must be less than or equal 
to the page size (records
        *                      that exceed the page size are handled via a 
different code path which uses
        *                      special overflow pages).
        */
    -  private void acquireNewPageIfNecessary(int requiredSpace) throws 
IOException {
    -    assert (requiredSpace <= pageSizeBytes);
    -    if (requiredSpace > freeSpaceInCurrentPage) {
    -      logger.trace("Required space {} is less than free space in current 
page ({})", requiredSpace,
    -        freeSpaceInCurrentPage);
    +  private void acquireNewPageIfNecessary(int required) throws IOException {
    +    if (currentPage == null ||
    +      pageCursor + required > currentPage.getBaseOffset() + 
currentPage.size()) {
           // TODO: we should track metrics on the amount of space wasted when 
we roll over to a new page
           // without using the free space at the end of the current page. We 
should also do this for
           // BytesToBytesMap.
    -      if (requiredSpace > pageSizeBytes) {
    -        throw new IOException("Required space " + requiredSpace + " is 
greater than page size (" +
    -          pageSizeBytes + ")");
    -      } else {
    -        acquireNewPage();
    +      long granted = shuffleMemoryManager.tryToAcquire(Math.max(required, 
pageSizeBytes), this);
    +      if (granted < required) {
    +        shuffleMemoryManager.release(granted, this);
    +        throw new IOException("Unable to acquire " + required + " bytes of 
memory");
           }
    +      currentPage = taskMemoryManager.allocatePage(granted);
    +      pageCursor = currentPage.getBaseOffset();
    +      allocatedPages.add(currentPage);
         }
       }
     
       /**
    -   * Acquire a new page from the {@link ShuffleMemoryManager}.
    -   *
    -   * If there is not enough space to allocate the new page, spill all 
existing ones
    -   * and try again. If there is still not enough space, report error to 
the caller.
    -   */
    -  private void acquireNewPage() throws IOException {
    -    final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
    -    if (memoryAcquired < pageSizeBytes) {
    -      shuffleMemoryManager.release(memoryAcquired);
    -      spill();
    -      final long memoryAcquiredAfterSpilling = 
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
    -      if (memoryAcquiredAfterSpilling != pageSizeBytes) {
    -        shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
    -        throw new IOException("Unable to acquire " + pageSizeBytes + " 
bytes of memory");
    -      }
    -    }
    -    currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
    -    currentPagePosition = currentPage.getBaseOffset();
    -    freeSpaceInCurrentPage = pageSizeBytes;
    -    allocatedPages.add(currentPage);
    -  }
    -
    -  /**
        * Write a record to the sorter.
        */
    -  public void insertRecord(
    -      Object recordBaseObject,
    -      long recordBaseOffset,
    -      int lengthInBytes,
    -      long prefix) throws IOException {
    +  public void insertRecord(Object recordBase, long recordOffset, int 
length, long prefix)
    +    throws IOException {
     
         growPointerArrayIfNecessary();
         // Need 4 bytes to store the record length.
    -    final int totalSpaceRequired = lengthInBytes + 4;
    -
    -    // --- Figure out where to insert the new record 
----------------------------------------------
    -
    -    final MemoryBlock dataPage;
    -    long dataPagePosition;
    -    boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
    -    if (useOverflowPage) {
    -      long overflowPageSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
    --- End diff --
    
    Do we still need this `roundNumberOfBytesToNearestWord` method now that the 
memory allocators do them rounding themselves?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to