Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/5725#discussion_r29202027
--- Diff:
unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.map;
+
+import java.lang.Override;
+import java.lang.UnsupportedOperationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.spark.unsafe.*;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.bitset.BitSet;
+import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.*;
+
+/**
+ * An append-only hash map where keys and values are contiguous regions of
bytes.
+ * <p>
+ * This is backed by a power-of-2-sized hash table, using quadratic
probing with triangular numbers,
+ * which is guaranteed to exhaust the space.
+ * <p>
+ * Note that even though we use long for indexing, the map can support up
to 2^31 keys because
+ * we use 32 bit MurmurHash. In either case, if the key cardinality is so
high, you should probably
+ * be using sorting instead of hashing for better cache locality.
+ * <p>
+ * This class is not thread safe.
+ */
+public final class BytesToBytesMap {
+
+ private static final Murmur3_x86_32 HASHER = new Murmur3_x86_32(0);
+
+ private static final HashMapGrowthStrategy growthStrategy =
HashMapGrowthStrategy.DOUBLING;
+
+ /** Bit mask for the lower 51 bits of a long. */
+ private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
+
+ /** Bit mask for the upper 13 bits of a long */
+ private static final long MASK_LONG_UPPER_13_BITS =
~MASK_LONG_LOWER_51_BITS;
+
+ /** Bit mask for the lower 32 bits of a long */
+ private static final long MASK_LONG_LOWER_32_BITS = 0xFFFFFFFFL;
+
+ private final MemoryAllocator allocator;
+
+ /**
+ * Tracks whether we're using in-heap or off-heap addresses.
+ */
+ private final boolean inHeap;
+
+ /**
+ * A linked list for tracking all allocated data pages so that we can
free all of our memory.
+ */
+ private final List<MemoryBlock> dataPages = new
LinkedList<MemoryBlock>();
+
+ /**
+ * The data page that will be used to store keys and values for new
hashtable entries. When this
+ * page becomes full, a new page will be allocated and this pointer will
change to point to that
+ * new page.
+ */
+ private MemoryBlock currentDataPage = null;
+
+ /**
+ * Offset into `currentDataPage` that points to the location where new
data can be inserted into
+ * the page.
+ */
+ private long pageCursor = 0;
+
+ /**
+ * Similar to an operating system's page table, this array maps page
numbers into base object
+ * pointers, allowing us to translate between the hashtable's internal
64-bit address
+ * representation and the baseObject+offset representation which we use
to support both in- and
+ * off-heap addresses. When using an off-heap allocator, every entry in
this map will be `null`.
+ * When using an in-heap allocator, the entries in this map will point
to pages' base objects.
+ * Entries are added to this map as new data pages are allocated.
+ */
+ private final Object[] pageTable = new Object[PAGE_TABLE_SIZE];
+
+ /**
+ * When using an in-heap allocator, this holds the current page number.
+ */
+ private int currentPageNumber = -1;
+
+ /**
+ * The number of entries in the page table.
+ */
+ private static final int PAGE_TABLE_SIZE = (int) 1L << 13;
+
+ /**
+ * The size of the data pages that hold key and value data. Map entries
cannot span multiple
+ * pages, so this limits the maximum entry size.
+ */
+ private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
+
+ // This choice of page table size and page size means that we can
address up to 500 gigabytes
+ // of memory.
+
+ /**
+ * A single array to store the key and value.
+ *
+ * Position {@code 2 * i} in the array is used to track a pointer to the
key at index {@code i},
+ * while position {@code 2 * i + 1} in the array holds the upper bits of
the key's hashcode plus
+ * the relative offset from the key pointer to the value at index {@code
i}.
+ */
+ private LongArray longArray;
+
+ /**
+ * A {@link BitSet} used to track location of the map where the key is
set.
+ * Size of the bitset should be half of the size of the long array.
+ */
+ private BitSet bitset;
+
+ private final double loadFactor;
+
+ /**
+ * Number of keys defined in the map.
+ */
+ private int size;
+
+ /**
+ * The map will be expanded once the number of keys exceeds this
threshold.
+ */
+ private int growthThreshold;
+
+ /**
+ * Mask for truncating hashcodes so that they do not exceed the long
array's size.
+ */
+ private int mask;
+
+ /**
+ * Return value of {@link BytesToBytesMap#lookup(Object, long, int)}.
+ */
+ private final Location loc;
+
+ private final boolean enablePerfMetrics;
+
+ private long timeSpentResizingMs = 0;
+
+ private long numProbes = 0;
+
+ private long numKeyLookups = 0;
+
+ private long numHashCollisions = 0;
+
+ public BytesToBytesMap(
+ MemoryAllocator allocator,
+ int initialCapacity,
+ double loadFactor,
+ boolean enablePerfMetrics) {
+ this.inHeap = allocator instanceof HeapMemoryAllocator;
+ this.allocator = allocator;
+ this.loadFactor = loadFactor;
+ this.loc = new Location();
+ this.enablePerfMetrics = enablePerfMetrics;
+ allocate(initialCapacity);
+ }
+
+ public BytesToBytesMap(MemoryAllocator allocator, int initialCapacity) {
+ this(allocator, initialCapacity, 0.70, false);
+ }
+
+ public BytesToBytesMap(
+ MemoryAllocator allocator,
+ int initialCapacity,
+ boolean enablePerfMetrics) {
+ this(allocator, initialCapacity, 0.70, enablePerfMetrics);
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ // In case the programmer forgot to call `free()`, try to perform
that cleanup now:
+ free();
+ } finally {
+ super.finalize();
+ }
+ }
+
+ /**
+ * Returns the number of keys defined in the map.
+ */
+ public int size() { return size; }
+
+ /**
+ * Returns an iterator for iterating over the entries of this map.
+ *
+ * For efficiency, all calls to `next()` will return the same {@link
Location} object.
+ *
+ * If any other lookups or operations are performed on this map while
iterating over it, including
+ * `lookup()`, the behavior of the returned iterator is undefined.
+ */
+ public Iterator<Location> iterator() {
+ return new Iterator<Location>() {
+
+ private int nextPos = bitset.nextSetBit(0);
+
+ @Override
+ public boolean hasNext() {
+ return nextPos != -1;
+ }
+
+ @Override
+ public Location next() {
+ final int pos = nextPos;
+ nextPos = bitset.nextSetBit(nextPos + 1);
+ return loc.with(pos, 0, true);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Looks up a key, and return a {@link Location} handle that can be used
to test existence
+ * and read/write values.
+ *
+ * This function always return the same {@link Location} instance to
avoid object allocation.
+ */
+ public Location lookup(
+ Object keyBaseObject,
+ long keyBaseOffset,
+ int keyRowLengthBytes) {
+ if (enablePerfMetrics) {
+ numKeyLookups++;
+ }
+ final int hashcode = HASHER.hashUnsafeWords(keyBaseObject,
keyBaseOffset, keyRowLengthBytes);
+ int pos = hashcode & mask;
+ int step = 1;
+ while (true) {
+ if (enablePerfMetrics) {
+ numProbes++;
+ }
+ if (!bitset.isSet(pos)) {
+ // This is a new key.
+ return loc.with(pos, hashcode, false);
+ } else {
+ long stored = longArray.get(pos * 2 + 1);
+ if (((int) (stored & MASK_LONG_LOWER_32_BITS)) == hashcode) {
+ // Full hash code matches. Let's compare the keys for equality.
+ loc.with(pos, hashcode, true);
+ if (loc.getKeyLength() == keyRowLengthBytes) {
+ final MemoryLocation keyAddress = loc.getKeyAddress();
+ final Object storedKeyBaseObject = keyAddress.getBaseObject();
+ final long storedKeyBaseOffset = keyAddress.getBaseOffset();
+ final boolean areEqual =
ByteArrayMethods.wordAlignedArrayEquals(
+ keyBaseObject,
+ keyBaseOffset,
+ storedKeyBaseObject,
+ storedKeyBaseOffset,
+ keyRowLengthBytes
+ );
+ if (areEqual) {
+ return loc;
+ } else {
+ if (enablePerfMetrics) {
+ numHashCollisions++;
+ }
+ }
+ }
+ }
+ }
+ pos = (pos + step) & mask;
+ step++;
+ }
+ }
+
+ /**
+ * Handle returned by {@link BytesToBytesMap#lookup(Object, long, int)}
function.
+ */
+ public final class Location {
+ /** An index into the hash map's Long array */
+ private int pos;
+ /** True if this location points to a position where a key is defined,
false otherwise */
+ private boolean isDefined;
+ /**
+ * The hashcode of the most recent key passed to
+ * {@link BytesToBytesMap#lookup(Object, long, int)}. Caching this
hashcode here allows us to
+ * avoid re-hashing the key when storing a value for that key.
+ */
+ private int keyHashcode;
+ private final MemoryLocation keyMemoryLocation = new MemoryLocation();
+ private final MemoryLocation valueMemoryLocation = new
MemoryLocation();
+ private int keyLength;
+ private int valueLength;
+
+ private void updateAddressesAndSizes(long fullKeyAddress, long
offsetFromKeyToValue) {
+ if (inHeap) {
+ final Object page = getPage(fullKeyAddress);
+ final long keyOffsetInPage = getOffsetInPage(fullKeyAddress);
+ keyMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8);
+ valueMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8 +
offsetFromKeyToValue);
+ keyLength = (int) PlatformDependent.UNSAFE.getLong(page,
keyOffsetInPage);
+ valueLength =
+ (int) PlatformDependent.UNSAFE.getLong(page, keyOffsetInPage +
offsetFromKeyToValue);
+ } else {
+ keyMemoryLocation.setObjAndOffset(null, fullKeyAddress + 8);
+ valueMemoryLocation.setObjAndOffset(null, fullKeyAddress + 8 +
offsetFromKeyToValue);
+ keyLength = (int) PlatformDependent.UNSAFE.getLong(fullKeyAddress);
+ valueLength = (int)
PlatformDependent.UNSAFE.getLong(fullKeyAddress + offsetFromKeyToValue);
+ }
+ }
+
+ Location with(int pos, int keyHashcode, boolean isDefined) {
+ this.pos = pos;
+ this.isDefined = isDefined;
+ this.keyHashcode = keyHashcode;
+ if (isDefined) {
+ final long fullKeyAddress = longArray.get(pos * 2);
+ final long offsetFromKeyToValue =
+ (longArray.get(pos * 2 + 1) & ~MASK_LONG_LOWER_32_BITS) >>> 32;
+ updateAddressesAndSizes(fullKeyAddress, offsetFromKeyToValue);
+ }
+ return this;
+ }
+
+ /**
+ * Returns true if the key is defined at this position, and false
otherwise.
+ */
+ public boolean isDefined() {
+ return isDefined;
+ }
+
+ private Object getPage(long fullKeyAddress) {
+ assert (inHeap);
+ final int keyPageNumber = (int) ((fullKeyAddress &
MASK_LONG_UPPER_13_BITS) >>> 51);
+ assert (keyPageNumber >= 0 && keyPageNumber < PAGE_TABLE_SIZE);
+ assert (keyPageNumber <= currentPageNumber);
+ final Object page = pageTable[keyPageNumber];
+ assert (page != null);
+ return page;
+ }
+
+ private long getOffsetInPage(long fullKeyAddress) {
+ assert (inHeap);
+ return (fullKeyAddress & MASK_LONG_LOWER_51_BITS);
+ }
+
+ /**
+ * Returns the address of the key defined at this position.
+ * This points to the first byte of the key data.
+ * Unspecified behavior if the key is not defined.
+ * For efficiency reasons, calls to this method always returns the
same MemoryLocation object.
+ */
+ public MemoryLocation getKeyAddress() {
+ assert (isDefined);
+ return keyMemoryLocation;
+ }
+
+ /**
+ * Returns the length of the key defined at this position.
+ * Unspecified behavior if the key is not defined.
+ */
+ public int getKeyLength() {
+ assert (isDefined);
+ return keyLength;
+ }
+
+ /**
+ * Returns the address of the value defined at this position.
+ * This points to the first byte of the value data.
+ * Unspecified behavior if the key is not defined.
+ * For efficiency reasons, calls to this method always returns the
same MemoryLocation object.
+ */
+ public MemoryLocation getValueAddress() {
+ assert (isDefined);
+ return valueMemoryLocation;
+ }
+
+ /**
+ * Returns the length of the value defined at this position.
+ * Unspecified behavior if the key is not defined.
+ */
+ public int getValueLength() {
+ assert (isDefined);
+ return valueLength;
+ }
+
+ /**
+ * Store a new key and value. This method may only be called once for
a given key; if you want
+ * to update the value associated with a key, then you can directly
manipulate the bytes stored
+ * at the value address.
+ * <p>
+ * It is only valid to call this method immediately after calling
`lookup()` using the same key.
+ * <p>
+ * After calling this method, calls to `get[Key|Value]Address()` and
`get[Key|Value]Length`
+ * will return information on the data stored by this `putNewKey` call.
+ * <p>
+ * As an example usage, here's the proper way to store a new key:
+ * <p>
+ * <pre>
+ * Location loc = map.lookup(keyBaseOffset, keyBaseObject,
keyLengthInBytes);
+ * if (!loc.isDefined()) {
+ * loc.putNewKey(keyBaseOffset, keyBaseObject, keyLengthInBytes,
...)
+ * }
+ * </pre>
+ * <p>
+ * Unspecified behavior if the key is not defined.
+ */
+ public void putNewKey(
+ Object keyBaseObject,
+ long keyBaseOffset,
+ int keyLengthBytes,
+ Object valueBaseObject,
+ long valueBaseOffset,
+ int valueLengthBytes) {
+ assert (!isDefined) : "Can only set value once for a key";
+ isDefined = true;
+ assert (keyLengthBytes % 8 == 0);
+ assert (valueLengthBytes % 8 == 0);
+ // Here, we'll copy the data into our data pages. Because we only
store a relative offset from
+ // the key address instead of storing the absolute address of the
value, the key and value
+ // must be stored in the same memory page.
+ final long requiredSize = 8 + 8 + keyLengthBytes + valueLengthBytes;
+ assert(requiredSize <= PAGE_SIZE_BYTES);
+ size++;
+ bitset.set(pos);
+
+ // If there's not enough space in the current page, allocate a new
page:
+ if (currentDataPage == null || PAGE_SIZE_BYTES - pageCursor <
requiredSize) {
+ MemoryBlock newPage = allocator.allocate(PAGE_SIZE_BYTES);
+ dataPages.add(newPage);
+ pageCursor = 0;
+ currentPageNumber++;
+ pageTable[currentPageNumber] = newPage.getBaseObject();
+ currentDataPage = newPage;
+ }
+
+ // Compute all of our offsets up-front:
+ final Object pageBaseObject = currentDataPage.getBaseObject();
+ final long pageBaseOffset = currentDataPage.getBaseOffset();
+ final long keySizeOffsetInPage = pageBaseOffset + pageCursor;
+ pageCursor += 8;
+ final long keyDataOffsetInPage = pageBaseOffset + pageCursor;
+ pageCursor += keyLengthBytes;
+ final long valueSizeOffsetInPage = pageBaseOffset + pageCursor;
+ pageCursor += 8;
+ final long valueDataOffsetInPage = pageBaseOffset + pageCursor;
+ pageCursor += valueLengthBytes;
+ final long relativeOffsetFromKeyToValue = valueSizeOffsetInPage -
keySizeOffsetInPage;
--- End diff --
I think that this is redundant, actually, since the value always appears
after the key, we know the key's length, and we always store the key and value
in the same memory block.
If I remove this, we'll have an extra 32 bits of space in the long array
that we can use for something else. Moving to a 64-bit hashcode probably won't
make a huge difference. Is there another good use for this space?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]