Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7849#discussion_r36036068
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 ---
    @@ -225,4 +232,93 @@ public void printPerfMetrics() {
         System.out.println("Total memory consumption (bytes): " + 
map.getTotalMemoryConsumption());
       }
     
    +  /**
    +   * Sorts the key, value data in this map in place, and return them as an 
iterator.
    +   *
    +   * The only memory that is allocated is the address/prefix array, 16 
bytes per record.
    +   */
    +  public KVIterator<UnsafeRow, UnsafeRow> sortedIterator() {
    +    int numElements = map.numElements();
    +    final int numKeyFields = groupingKeySchema.size();
    +    TaskMemoryManager memoryManager = map.getTaskMemoryManager();
    +
    +    UnsafeExternalRowSorter.PrefixComputer prefixComp =
    +      SortPrefixUtils.createPrefixGenerator(groupingKeySchema);
    +    PrefixComparator prefixComparator = 
SortPrefixUtils.getPrefixComparator(groupingKeySchema);
    +
    +    final BaseOrdering ordering = 
GenerateOrdering.create(groupingKeySchema);
    +    RecordComparator recordComparator = new RecordComparator() {
    +      private final UnsafeRow row1 = new UnsafeRow();
    +      private final UnsafeRow row2 = new UnsafeRow();
    +
    +      @Override
    +      public int compare(Object baseObj1, long baseOff1, Object baseObj2, 
long baseOff2) {
    +        row1.pointTo(baseObj1, baseOff1 + 4, numKeyFields, -1);
    +        row2.pointTo(baseObj2, baseOff2 + 4, numKeyFields, -1);
    +        return ordering.compare(row1, row2);
    +      }
    +    };
    +
    +    // Insert the records into the in-memory sorter.
    +    final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(
    +      memoryManager, recordComparator, prefixComparator, numElements);
    +
    +    BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
    +    UnsafeRow row = new UnsafeRow();
    +    while (iter.hasNext()) {
    +      final BytesToBytesMap.Location loc = iter.next();
    +      final Object baseObject = loc.getKeyAddress().getBaseObject();
    +      final long baseOffset = loc.getKeyAddress().getBaseOffset();
    +
    +      // Get encoded memory address
    +      MemoryBlock page = loc.getMemoryPage();
    +      long address = memoryManager.encodePageNumberAndOffset(page, 
baseOffset - 8);
    --- End diff --
    
    Maybe add a one-line comment here (or at the top of the loop) to explain 
that `baseObject` + `baseOffset` point to the beginning of the key data in the 
map, but that the KV-pair's length data is stored in the word immediately 
before that address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to