Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r43464654
--- Diff:
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -219,61 +230,158 @@ public BytesToBytesMap(
*/
public int numElements() { return numElements; }
- public static final class BytesToBytesMapIterator implements
Iterator<Location> {
+ public final class MapIterator implements Iterator<Location> {
- private final int numRecords;
- private final Iterator<MemoryBlock> dataPagesIterator;
+ private int numRecords;
private final Location loc;
private MemoryBlock currentPage = null;
- private int currentRecordNumber = 0;
+ private int recordsInPage = 0;
private Object pageBaseObject;
private long offsetInPage;
// If this iterator destructive or not. When it is true, it frees each
page as it moves onto
// next one.
private boolean destructive = false;
- private BytesToBytesMap bmap;
+ private LinkedList<UnsafeSorterSpillWriter> spillWriters = new
LinkedList<>();
+ private UnsafeSorterSpillReader reader = null;
- private BytesToBytesMapIterator(
- int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location
loc,
- boolean destructive, BytesToBytesMap bmap) {
+ private MapIterator(int numRecords, Location loc, boolean destructive)
{
this.numRecords = numRecords;
- this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
this.destructive = destructive;
- this.bmap = bmap;
- if (dataPagesIterator.hasNext()) {
- advanceToNextPage();
+ if (destructive) {
+ destructiveIterator = this;
}
}
private void advanceToNextPage() {
- if (destructive && currentPage != null) {
- dataPagesIterator.remove();
- this.bmap.taskMemoryManager.freePage(currentPage);
+ synchronized (this) {
+ int nextIdx = dataPages.indexOf(currentPage) + 1;
+ if (destructive && currentPage != null) {
+ dataPages.remove(currentPage);
+ freePage(currentPage);
+ nextIdx --;
+ }
+ if (dataPages.size() > nextIdx) {
+ currentPage = dataPages.get(nextIdx);
+ pageBaseObject = currentPage.getBaseObject();
+ offsetInPage = currentPage.getBaseOffset();
+ recordsInPage = Platform.getInt(pageBaseObject, offsetInPage);
+ offsetInPage += 4;
+ } else {
+ currentPage = null;
+ if (reader != null) {
+ // remove the spill file from disk
+ File file = spillWriters.removeFirst().getFile();
+ if (file != null && file.exists()) {
+ if (!file.delete()) {
+ logger.error("Was unable to delete spill file {}",
file.getAbsolutePath());
+ }
+ }
+ }
+ try {
+ reader = spillWriters.getFirst().getReader(blockManager);
+ recordsInPage = -1;
--- End diff --
Ah, because we rely on `reader.hasNext()` when we're dealing with on-disk
data.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]