Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r43461576
--- Diff:
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -531,113 +645,59 @@ public int getValueLength() {
* @return true if the put() was successful and false if the put()
failed because memory could
* not be acquired.
*/
- public boolean putNewKey(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyLengthBytes,
- Object valueBaseObject,
- long valueBaseOffset,
- int valueLengthBytes) {
+ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
+ Object valueBase, long valueOffset, int valueLength) {
assert (!isDefined) : "Can only set value once for a key";
- assert (keyLengthBytes % 8 == 0);
- assert (valueLengthBytes % 8 == 0);
+ assert (keyLength % 8 == 0);
+ assert (valueLength % 8 == 0);
assert(bitset != null);
assert(longArray != null);
- if (numElements == MAX_CAPACITY) {
- throw new IllegalStateException("BytesToBytesMap has reached
maximum capacity");
+ if (numElements == MAX_CAPACITY || !canGrowArray) {
+ return false;
}
// Here, we'll copy the data into our data pages. Because we only
store a relative offset from
// the key address instead of storing the absolute address of the
value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value)
- final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
-
- // --- Figure out where to insert the new record
---------------------------------------------
-
- final MemoryBlock dataPage;
- final Object dataPageBaseObject;
- final long dataPageInsertOffset;
- boolean useOverflowPage = requiredSize > pageSizeBytes - 8;
- if (useOverflowPage) {
- // The record is larger than the page size, so allocate a special
overflow page just to hold
- // that record.
- final long overflowPageSize = requiredSize + 8;
- MemoryBlock overflowPage =
taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- logger.debug("Failed to acquire {} bytes of memory",
overflowPageSize);
- return false;
- }
- dataPages.add(overflowPage);
- dataPage = overflowPage;
- dataPageBaseObject = overflowPage.getBaseObject();
- dataPageInsertOffset = overflowPage.getBaseOffset();
- } else if (currentDataPage == null || pageSizeBytes - 8 - pageCursor
< requiredSize) {
- // The record can fit in a data page, but either we have not
allocated any pages yet or
- // the current page does not have enough space.
- if (currentDataPage != null) {
- // There wasn't enough space in the current page, so write an
end-of-page marker:
- final Object pageBaseObject = currentDataPage.getBaseObject();
- final long lengthOffsetInPage = currentDataPage.getBaseOffset()
+ pageCursor;
- Platform.putInt(pageBaseObject, lengthOffsetInPage,
END_OF_PAGE_MARKER);
- }
- if (!acquireNewPage()) {
+ final long recordLength = 8 + keyLength + valueLength;
+ if (currentPage == null || currentPage.size() - pageCursor <
recordLength) {
+ if (!acquireNewPage(recordLength + 4L)) {
return false;
}
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset();
- } else {
- // There is enough space in the current data page.
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset() +
pageCursor;
}
// --- Append the key and value data to the current data page
--------------------------------
-
- long insertCursor = dataPageInsertOffset;
-
- // Compute all of our offsets up-front:
- final long recordOffset = insertCursor;
- insertCursor += 4;
- final long keyLengthOffset = insertCursor;
- insertCursor += 4;
- final long keyDataOffsetInPage = insertCursor;
- insertCursor += keyLengthBytes;
- final long valueDataOffsetInPage = insertCursor;
- insertCursor += valueLengthBytes; // word used to store the value
size
-
- Platform.putInt(dataPageBaseObject, recordOffset,
- keyLengthBytes + valueLengthBytes + 4);
- Platform.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
- // Copy the key
- Platform.copyMemory(
- keyBaseObject, keyBaseOffset, dataPageBaseObject,
keyDataOffsetInPage, keyLengthBytes);
- // Copy the value
- Platform.copyMemory(valueBaseObject, valueBaseOffset,
dataPageBaseObject,
- valueDataOffsetInPage, valueLengthBytes);
-
- // --- Update bookeeping data structures
-----------------------------------------------------
-
- if (useOverflowPage) {
- // Store the end-of-page marker at the end of the data page
- Platform.putInt(dataPageBaseObject, insertCursor,
END_OF_PAGE_MARKER);
- } else {
- pageCursor += requiredSize;
- }
-
+ final Object base = currentPage.getBaseObject();
+ long offset = currentPage.getBaseOffset() + pageCursor;
+ final long recordOffset = offset;
+ Platform.putInt(base, offset, keyLength + valueLength + 4);
+ Platform.putInt(base, offset + 4, keyLength);
+ offset += 8;
+ Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
+ offset += keyLength;
+ Platform.copyMemory(valueBase, valueOffset, base, offset,
valueLength);
+
+ // --- Update bookkeeping data structures
-----------------------------------------------------
+ offset = currentPage.getBaseOffset();
+ Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
+ pageCursor += recordLength;
numElements++;
bitset.set(pos);
final long storedKeyAddress =
taskMemoryManager.encodePageNumberAndOffset(
- dataPage, recordOffset);
+ currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
isDefined = true;
+
if (numElements > growthThreshold && longArray.size() <
MAX_CAPACITY) {
- growAndRehash();
+ try {
+ growAndRehash();
+ } catch (OutOfMemoryError oom) {
--- End diff --
In other words, I think that we need to make sure that any operation where
we're catching OOM is atomic no matter where OOM occurs./
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]