Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r43461487
--- Diff:
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -531,113 +645,59 @@ public int getValueLength() {
* @return true if the put() was successful and false if the put()
failed because memory could
* not be acquired.
*/
- public boolean putNewKey(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyLengthBytes,
- Object valueBaseObject,
- long valueBaseOffset,
- int valueLengthBytes) {
+ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
+ Object valueBase, long valueOffset, int valueLength) {
assert (!isDefined) : "Can only set value once for a key";
- assert (keyLengthBytes % 8 == 0);
- assert (valueLengthBytes % 8 == 0);
+ assert (keyLength % 8 == 0);
+ assert (valueLength % 8 == 0);
assert(bitset != null);
assert(longArray != null);
- if (numElements == MAX_CAPACITY) {
- throw new IllegalStateException("BytesToBytesMap has reached
maximum capacity");
+ if (numElements == MAX_CAPACITY || !canGrowArray) {
+ return false;
}
// Here, we'll copy the data into our data pages. Because we only
store a relative offset from
// the key address instead of storing the absolute address of the
value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value)
- final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
-
- // --- Figure out where to insert the new record
---------------------------------------------
-
- final MemoryBlock dataPage;
- final Object dataPageBaseObject;
- final long dataPageInsertOffset;
- boolean useOverflowPage = requiredSize > pageSizeBytes - 8;
- if (useOverflowPage) {
- // The record is larger than the page size, so allocate a special
overflow page just to hold
- // that record.
- final long overflowPageSize = requiredSize + 8;
- MemoryBlock overflowPage =
taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- logger.debug("Failed to acquire {} bytes of memory",
overflowPageSize);
- return false;
- }
- dataPages.add(overflowPage);
- dataPage = overflowPage;
- dataPageBaseObject = overflowPage.getBaseObject();
- dataPageInsertOffset = overflowPage.getBaseOffset();
- } else if (currentDataPage == null || pageSizeBytes - 8 - pageCursor
< requiredSize) {
- // The record can fit in a data page, but either we have not
allocated any pages yet or
- // the current page does not have enough space.
- if (currentDataPage != null) {
- // There wasn't enough space in the current page, so write an
end-of-page marker:
- final Object pageBaseObject = currentDataPage.getBaseObject();
- final long lengthOffsetInPage = currentDataPage.getBaseOffset()
+ pageCursor;
- Platform.putInt(pageBaseObject, lengthOffsetInPage,
END_OF_PAGE_MARKER);
- }
- if (!acquireNewPage()) {
+ final long recordLength = 8 + keyLength + valueLength;
+ if (currentPage == null || currentPage.size() - pageCursor <
recordLength) {
+ if (!acquireNewPage(recordLength + 4L)) {
return false;
}
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset();
- } else {
- // There is enough space in the current data page.
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset() +
pageCursor;
}
// --- Append the key and value data to the current data page
--------------------------------
-
- long insertCursor = dataPageInsertOffset;
-
- // Compute all of our offsets up-front:
- final long recordOffset = insertCursor;
- insertCursor += 4;
- final long keyLengthOffset = insertCursor;
- insertCursor += 4;
- final long keyDataOffsetInPage = insertCursor;
- insertCursor += keyLengthBytes;
- final long valueDataOffsetInPage = insertCursor;
- insertCursor += valueLengthBytes; // word used to store the value
size
-
- Platform.putInt(dataPageBaseObject, recordOffset,
- keyLengthBytes + valueLengthBytes + 4);
- Platform.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
- // Copy the key
- Platform.copyMemory(
- keyBaseObject, keyBaseOffset, dataPageBaseObject,
keyDataOffsetInPage, keyLengthBytes);
- // Copy the value
- Platform.copyMemory(valueBaseObject, valueBaseOffset,
dataPageBaseObject,
- valueDataOffsetInPage, valueLengthBytes);
-
- // --- Update bookeeping data structures
-----------------------------------------------------
-
- if (useOverflowPage) {
- // Store the end-of-page marker at the end of the data page
- Platform.putInt(dataPageBaseObject, insertCursor,
END_OF_PAGE_MARKER);
- } else {
- pageCursor += requiredSize;
- }
-
+ final Object base = currentPage.getBaseObject();
+ long offset = currentPage.getBaseOffset() + pageCursor;
+ final long recordOffset = offset;
+ Platform.putInt(base, offset, keyLength + valueLength + 4);
+ Platform.putInt(base, offset + 4, keyLength);
+ offset += 8;
+ Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
+ offset += keyLength;
+ Platform.copyMemory(valueBase, valueOffset, base, offset,
valueLength);
+
+ // --- Update bookkeeping data structures
-----------------------------------------------------
+ offset = currentPage.getBaseOffset();
+ Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
+ pageCursor += recordLength;
numElements++;
bitset.set(pos);
final long storedKeyAddress =
taskMemoryManager.encodePageNumberAndOffset(
- dataPage, recordOffset);
+ currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
isDefined = true;
+
if (numElements > growthThreshold && longArray.size() <
MAX_CAPACITY) {
- growAndRehash();
+ try {
+ growAndRehash();
+ } catch (OutOfMemoryError oom) {
--- End diff --
As a counter-argument to catching OutOfMemoryError: we have to treat OOM as
an interrupt which can occur anywhere that we might be allocating memory in the
methods that we call here, so an adversary could choose to have those OOMs
occur in really bad places.
Imagine that we're in growAndRehash, which calls `allocate()` to obtain a
new bit set and array:
https://github.com/davies/spark/blob/cda4b2afa45acef529eb24afa7ec0c5370de237f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L865
Imagine that we've mutated `longArray` and then throw an OOM when trying to
mutate the `bitset`:
https://github.com/davies/spark/blob/cda4b2afa45acef529eb24afa7ec0c5370de237f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L742
In this case, I think that the map will be in an inconsistent state.
So in summary, I'm not necessarily opposed to the idea of catching OOM in a
small handful of places, but I'm kind of wary about doing it due to subtle
corner-cases like this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]