Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5725#discussion_r29221663
  
    --- Diff: 
unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
    @@ -0,0 +1,552 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.unsafe.map;
    +
    +import java.lang.Override;
    +import java.lang.UnsupportedOperationException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.spark.unsafe.*;
    +import org.apache.spark.unsafe.array.ByteArrayMethods;
    +import org.apache.spark.unsafe.array.LongArray;
    +import org.apache.spark.unsafe.bitset.BitSet;
    +import org.apache.spark.unsafe.hash.Murmur3_x86_32;
    +import org.apache.spark.unsafe.memory.*;
    +
    +/**
    + * An append-only hash map where keys and values are contiguous regions of 
bytes.
    + * <p>
    + * This is backed by a power-of-2-sized hash table, using quadratic 
probing with triangular numbers,
    + * which is guaranteed to exhaust the space.
    + * <p>
    + * Note that even though we use long for indexing, the map can support up 
to 2^31 keys because
    + * we use 32 bit MurmurHash. In either case, if the key cardinality is so 
high, you should probably
    + * be using sorting instead of hashing for better cache locality.
    + * <p>
    + * This class is not thread safe.
    + */
    +public final class BytesToBytesMap {
    +
    +  private static final Murmur3_x86_32 HASHER = new Murmur3_x86_32(0);
    +
    +  private static final HashMapGrowthStrategy growthStrategy = 
HashMapGrowthStrategy.DOUBLING;
    +
    +  private final MemoryManager memoryManager;
    +
    +  /**
    +   * A linked list for tracking all allocated data pages so that we can 
free all of our memory.
    +   */
    +  private final List<MemoryBlock> dataPages = new 
LinkedList<MemoryBlock>();
    +
    +  /**
    +   * The data page that will be used to store keys and values for new 
hashtable entries. When this
    +   * page becomes full, a new page will be allocated and this pointer will 
change to point to that
    +   * new page.
    +   */
    +  private MemoryBlock currentDataPage = null;
    +
    +  /**
    +   * Offset into `currentDataPage` that points to the location where new 
data can be inserted into
    +   * the page.
    +   */
    +  private long pageCursor = 0;
    +
    +  /**
    +   * The size of the data pages that hold key and value data. Map entries 
cannot span multiple
    +   * pages, so this limits the maximum entry size.
    +   */
    +  private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
    +
    +  // This choice of page table size and page size means that we can 
address up to 500 gigabytes
    +  // of memory.
    +
    +  /**
    +   * A single array to store the key and value.
    +   *
    +   * Position {@code 2 * i} in the array is used to track a pointer to the 
key at index {@code i},
    +   * while position {@code 2 * i + 1} in the array holds key's full 32-bit 
hashcode.
    +   */
    +  private LongArray longArray;
    +  // TODO: we're wasting 32 bits of space here; we can probably store 
fewer bits of the hashcode
    +  // and exploit word-alignment to use fewer bits to hold the address.  
This might let us store
    +  // only one long per map entry, increasing the chance that this array 
will fit in cache at the
    +  // expense of maybe performing more lookups if we have hash collisions.  
Say that we stored only
    +  // 27 bits of the hashcode and 37 bits of the address.  37 bits is 
enough to address 1 terabyte
    +  // of RAM given word-alignment.  If we use 13 bits of this for our page 
table, that gives us a
    +  // maximum page size of 2^24 * 8 = ~134 megabytes per page. This change 
will require us to store
    +  // full base addresses in the page table for off-heap mode so that we 
can reconstruct the full
    +  // absolute memory addresses.
    +
    +  /**
    +   * A {@link BitSet} used to track location of the map where the key is 
set.
    +   * Size of the bitset should be half of the size of the long array.
    +   */
    +  private BitSet bitset;
    +
    +  private final double loadFactor;
    +
    +  /**
    +   * Number of keys defined in the map.
    +   */
    +  private int size;
    +
    +  /**
    +   * The map will be expanded once the number of keys exceeds this 
threshold.
    +   */
    +  private int growthThreshold;
    +
    +  /**
    +   * Mask for truncating hashcodes so that they do not exceed the long 
array's size.
    +   */
    +  private int mask;
    +
    +  /**
    +   * Return value of {@link BytesToBytesMap#lookup(Object, long, int)}.
    +   */
    +  private final Location loc;
    +
    +  private final boolean enablePerfMetrics;
    +
    +  private long timeSpentResizingMs = 0;
    +
    +  private long numProbes = 0;
    +
    +  private long numKeyLookups = 0;
    +
    +  private long numHashCollisions = 0;
    +
    +  public BytesToBytesMap(
    +      MemoryManager memoryManager,
    +      int initialCapacity,
    +      double loadFactor,
    +      boolean enablePerfMetrics) {
    +    this.memoryManager = memoryManager;
    +    this.loadFactor = loadFactor;
    +    this.loc = new Location();
    +    this.enablePerfMetrics = enablePerfMetrics;
    +    allocate(initialCapacity);
    +  }
    +
    +  public BytesToBytesMap(MemoryManager memoryManager, int initialCapacity) 
{
    +    this(memoryManager, initialCapacity, 0.70, false);
    +  }
    +
    +  public BytesToBytesMap(
    +      MemoryManager memoryManager,
    +      int initialCapacity,
    +      boolean enablePerfMetrics) {
    +    this(memoryManager, initialCapacity, 0.70, enablePerfMetrics);
    +  }
    +
    +  @Override
    +  protected void finalize() throws Throwable {
    +    try {
    +      // In case the programmer forgot to call `free()`, try to perform 
that cleanup now:
    +      free();
    +    } finally {
    +      super.finalize();
    +    }
    +  }
    +
    +  /**
    +   * Returns the number of keys defined in the map.
    +   */
    +  public int size() { return size; }
    +
    +  /**
    +   * Returns an iterator for iterating over the entries of this map.
    +   *
    +   * For efficiency, all calls to `next()` will return the same {@link 
Location} object.
    +   *
    +   * If any other lookups or operations are performed on this map while 
iterating over it, including
    +   * `lookup()`, the behavior of the returned iterator is undefined.
    +   */
    +  public Iterator<Location> iterator() {
    +    return new Iterator<Location>() {
    +
    +      private int nextPos = bitset.nextSetBit(0);
    +
    +      @Override
    +      public boolean hasNext() {
    +        return nextPos != -1;
    +      }
    +
    +      @Override
    +      public Location next() {
    +        final int pos = nextPos;
    +        nextPos = bitset.nextSetBit(nextPos + 1);
    +        return loc.with(pos, 0, true);
    +      }
    +
    +      @Override
    +      public void remove() {
    +        throw new UnsupportedOperationException();
    +      }
    +    };
    +  }
    +
    +  /**
    +   * Looks up a key, and return a {@link Location} handle that can be used 
to test existence
    +   * and read/write values.
    +   *
    +   * This function always return the same {@link Location} instance to 
avoid object allocation.
    +   */
    +  public Location lookup(
    +      Object keyBaseObject,
    +      long keyBaseOffset,
    +      int keyRowLengthBytes) {
    +    if (enablePerfMetrics) {
    +      numKeyLookups++;
    +    }
    +    final int hashcode = HASHER.hashUnsafeWords(keyBaseObject, 
keyBaseOffset, keyRowLengthBytes);
    +    int pos = hashcode & mask;
    +    int step = 1;
    +    while (true) {
    +      if (enablePerfMetrics) {
    +        numProbes++;
    +      }
    +      if (!bitset.isSet(pos)) {
    +        // This is a new key.
    +        return loc.with(pos, hashcode, false);
    +      } else {
    +        long stored = longArray.get(pos * 2 + 1);
    +        if ((int) (stored) == hashcode) {
    +          // Full hash code matches.  Let's compare the keys for equality.
    +          loc.with(pos, hashcode, true);
    +          if (loc.getKeyLength() == keyRowLengthBytes) {
    +            final MemoryLocation keyAddress = loc.getKeyAddress();
    +            final Object storedKeyBaseObject = keyAddress.getBaseObject();
    +            final long storedKeyBaseOffset = keyAddress.getBaseOffset();
    +            final boolean areEqual = 
ByteArrayMethods.wordAlignedArrayEquals(
    +              keyBaseObject,
    +              keyBaseOffset,
    +              storedKeyBaseObject,
    +              storedKeyBaseOffset,
    +              keyRowLengthBytes
    +            );
    +            if (areEqual) {
    +              return loc;
    +            } else {
    +              if (enablePerfMetrics) {
    +                numHashCollisions++;
    +              }
    +            }
    +          }
    +        }
    +      }
    +      pos = (pos + step) & mask;
    +      step++;
    +    }
    +  }
    +
    +  /**
    +   * Handle returned by {@link BytesToBytesMap#lookup(Object, long, int)} 
function.
    +   */
    +  public final class Location {
    +    /** An index into the hash map's Long array */
    +    private int pos;
    +    /** True if this location points to a position where a key is defined, 
false otherwise */
    +    private boolean isDefined;
    +    /**
    +     * The hashcode of the most recent key passed to
    +     * {@link BytesToBytesMap#lookup(Object, long, int)}. Caching this 
hashcode here allows us to
    +     * avoid re-hashing the key when storing a value for that key.
    +     */
    +    private int keyHashcode;
    +    private final MemoryLocation keyMemoryLocation = new MemoryLocation();
    +    private final MemoryLocation valueMemoryLocation = new 
MemoryLocation();
    +    private int keyLength;
    +    private int valueLength;
    +
    +    private void updateAddressesAndSizes(long fullKeyAddress) {
    +        final Object page = memoryManager.getPage(fullKeyAddress);
    +        final long keyOffsetInPage = 
memoryManager.getOffsetInPage(fullKeyAddress);
    +        keyMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8);
    +        keyLength = (int) PlatformDependent.UNSAFE.getLong(page, 
keyOffsetInPage);
    +        valueMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8 + 
keyLength + 8);
    +        valueLength = (int) PlatformDependent.UNSAFE.getLong(page, 
keyOffsetInPage + 8 + keyLength);
    +    }
    +
    +    Location with(int pos, int keyHashcode, boolean isDefined) {
    +      this.pos = pos;
    +      this.isDefined = isDefined;
    +      this.keyHashcode = keyHashcode;
    +      if (isDefined) {
    +        final long fullKeyAddress = longArray.get(pos * 2);
    +        updateAddressesAndSizes(fullKeyAddress);
    +      }
    +      return this;
    +    }
    +
    +    /**
    +     * Returns true if the key is defined at this position, and false 
otherwise.
    +     */
    +    public boolean isDefined() {
    +      return isDefined;
    +    }
    +
    +    /**
    +     * Returns the address of the key defined at this position.
    +     * This points to the first byte of the key data.
    +     * Unspecified behavior if the key is not defined.
    +     * For efficiency reasons, calls to this method always returns the 
same MemoryLocation object.
    +     */
    +    public MemoryLocation getKeyAddress() {
    +      assert (isDefined);
    +      return keyMemoryLocation;
    +    }
    +
    +    /**
    +     * Returns the length of the key defined at this position.
    +     * Unspecified behavior if the key is not defined.
    +     */
    +    public int getKeyLength() {
    +      assert (isDefined);
    +      return keyLength;
    +    }
    +
    +    /**
    +     * Returns the address of the value defined at this position.
    +     * This points to the first byte of the value data.
    +     * Unspecified behavior if the key is not defined.
    +     * For efficiency reasons, calls to this method always returns the 
same MemoryLocation object.
    +     */
    +    public MemoryLocation getValueAddress() {
    +      assert (isDefined);
    +      return valueMemoryLocation;
    +    }
    +
    +    /**
    +     * Returns the length of the value defined at this position.
    +     * Unspecified behavior if the key is not defined.
    +     */
    +    public int getValueLength() {
    +      assert (isDefined);
    +      return valueLength;
    +    }
    +
    +    /**
    +     * Store a new key and value. This method may only be called once for 
a given key; if you want
    +     * to update the value associated with a key, then you can directly 
manipulate the bytes stored
    +     * at the value address.
    +     * <p>
    +     * It is only valid to call this method immediately after calling 
`lookup()` using the same key.
    +     * <p>
    +     * After calling this method, calls to `get[Key|Value]Address()` and 
`get[Key|Value]Length`
    +     * will return information on the data stored by this `putNewKey` call.
    +     * <p>
    +     * As an example usage, here's the proper way to store a new key:
    +     * <p>
    +     * <pre>
    +     *   Location loc = map.lookup(keyBaseOffset, keyBaseObject, 
keyLengthInBytes);
    +     *   if (!loc.isDefined()) {
    +     *     loc.putNewKey(keyBaseOffset, keyBaseObject, keyLengthInBytes, 
...)
    +     *   }
    +     * </pre>
    +     * <p>
    +     * Unspecified behavior if the key is not defined.
    +     */
    +    public void putNewKey(
    +        Object keyBaseObject,
    +        long keyBaseOffset,
    +        int keyLengthBytes,
    +        Object valueBaseObject,
    +        long valueBaseOffset,
    +        int valueLengthBytes) {
    +      assert (!isDefined) : "Can only set value once for a key";
    +      isDefined = true;
    +      assert (keyLengthBytes % 8 == 0);
    +      assert (valueLengthBytes % 8 == 0);
    +      // Here, we'll copy the data into our data pages. Because we only 
store a relative offset from
    +      // the key address instead of storing the absolute address of the 
value, the key and value
    +      // must be stored in the same memory page.
    +      final long requiredSize = 8 + 8 + keyLengthBytes + valueLengthBytes;
    --- End diff --
    
    should say what the 8 + 8 are (key len + value len?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to