Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r43451586
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -370,96 +344,45 @@ private void growPointerArrayIfNecessary() throws
IOException {
* Allocates more memory in order to insert an additional record. This
will request additional
* memory from the memory manager and spill if the requested memory can
not be obtained.
*
- * @param requiredSpace the required space in the data page, in bytes,
including space for storing
+ * @param required the required space in the data page, in bytes,
including space for storing
* the record size. This must be less than or equal
to the page size (records
* that exceed the page size are handled via a
different code path which uses
* special overflow pages).
*/
- private void acquireNewPageIfNecessary(int requiredSpace) throws
IOException {
- growPointerArrayIfNecessary();
- if (requiredSpace > freeSpaceInCurrentPage) {
- logger.trace("Required space {} is less than free space in current
page ({})", requiredSpace,
- freeSpaceInCurrentPage);
- // TODO: we should track metrics on the amount of space wasted when
we roll over to a new page
- // without using the free space at the end of the current page. We
should also do this for
- // BytesToBytesMap.
- if (requiredSpace > pageSizeBytes) {
- throw new IOException("Required space " + requiredSpace + " is
greater than page size (" +
- pageSizeBytes + ")");
- } else {
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- spill();
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- throw new IOException("Unable to acquire " + pageSizeBytes + "
bytes of memory");
- }
- }
- currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = pageSizeBytes;
- allocatedPages.add(currentPage);
- }
+ private void acquireNewPageIfNecessary(int required) {
+ if (currentPage == null ||
+ pageCursor + required > currentPage.getBaseOffset() +
currentPage.size() ) {
+ // TODO: try to find space in previous pages
+ currentPage = allocatePage(required);
+ pageCursor = currentPage.getBaseOffset();
+ allocatedPages.add(currentPage);
}
}
/**
* Write a record to the shuffle sorter.
*/
- public void insertRecord(
- Object recordBaseObject,
- long recordBaseOffset,
- int lengthInBytes,
- int partitionId) throws IOException {
+ public void insertRecord(Object recordBase, long recordOffset, int
length, int partitionId)
+ throws IOException {
- if (numRecordsInsertedSinceLastSpill > numElementsForSpillThreshold) {
+ // for tests
+ assert(inMemSorter != null);
+ if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
spill();
}
growPointerArrayIfNecessary();
- // Need 4 bytes to store the record length.
- final int totalSpaceRequired = lengthInBytes + 4;
-
- // --- Figure out where to insert the new record
----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
--- End diff --
Sure; just add a JIRA subtask so we don't forget.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]