Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7891#discussion_r36065681
  
    --- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
    @@ -378,32 +407,72 @@ public void insertRecord(
       public void insertKVRecord(
           Object keyBaseObj, long keyOffset, int keyLen,
           Object valueBaseObj, long valueOffset, int valueLen, long prefix) 
throws IOException {
    +
    +    growPointerArrayIfNecessary();
         final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
    -    if (!haveSpaceForRecord(totalSpaceRequired)) {
    -      allocateSpaceForRecord(totalSpaceRequired);
    +
    +    // --- Figure out where to insert the new record 
----------------------------------------------
    +
    +    final MemoryBlock dataPage;
    +    long dataPagePosition;
    +    boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
    +    if (useOverflowPage) {
    +      long overflowPageSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
    +      // The record is larger than the page size, so allocate a special 
overflow page just to hold
    +      // that record.
    +      final long memoryGranted = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
    +      if (memoryGranted != overflowPageSize) {
    +        shuffleMemoryManager.release(memoryGranted);
    +        spill();
    +        final long memoryGrantedAfterSpill = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
    +        if (memoryGrantedAfterSpill != overflowPageSize) {
    +          shuffleMemoryManager.release(memoryGrantedAfterSpill);
    +          throw new IOException("Unable to acquire " + overflowPageSize + 
" bytes of memory");
    +        }
    +      }
    +      MemoryBlock overflowPage = 
taskMemoryManager.allocatePage(overflowPageSize);
    +      allocatedPages.add(overflowPage);
    +      dataPage = overflowPage;
    +      dataPagePosition = overflowPage.getBaseOffset();
    +    } else {
    +      // The record is small enough to fit in a regular data page, but the 
current page might not
    +      // have enough space to hold it (or no pages have been allocated 
yet).
    +      acquireNewPageIfNecessary(totalSpaceRequired);
    +      dataPage = currentPage;
    +      dataPagePosition = currentPagePosition;
         }
    +    final Object dataPageBaseObject = dataPage.getBaseObject();
    +
    +    // --- Insert the record 
----------------------------------------------------------------------
     
         final long recordAddress =
    -      taskMemoryManager.encodePageNumberAndOffset(currentPage, 
currentPagePosition);
    -    final Object dataPageBaseObject = currentPage.getBaseObject();
    -    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, 
currentPagePosition, keyLen + valueLen + 4);
    -    currentPagePosition += 4;
    +      taskMemoryManager.encodePageNumberAndOffset(dataPage, 
dataPagePosition);
    +    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, 
keyLen + valueLen + 4);
    +    dataPagePosition += 4;
     
    -    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, 
currentPagePosition, keyLen);
    -    currentPagePosition += 4;
    +    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, 
keyLen);
    +    dataPagePosition += 4;
     
         PlatformDependent.copyMemory(
    -      keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, 
keyLen);
    -    currentPagePosition += keyLen;
    +      keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
    +    dataPagePosition += keyLen;
     
         PlatformDependent.copyMemory(
    -      valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, 
valueLen);
    -    currentPagePosition += valueLen;
    +      valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, 
valueLen);
     
    -    freeSpaceInCurrentPage -= totalSpaceRequired;
         inMemSorter.insertRecord(recordAddress, prefix);
    +
    +    // --- Update bookeeping data structures 
------------------------------------------------------
    +    if (!useOverflowPage) {
    --- End diff --
    
    same here -- maybe makes more sense to put them in the else branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to