Repository: spark Updated Branches: refs/heads/master a11db942a -> 7c92351f4
[MINOR][CORE] Cleanup dead code and duplication in Mem. Management ## What changes were proposed in this pull request? * Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`. It became unused as a result of 85b0a157543201895557d66306b38b3ca52f2151 (SPARK-15962) introducing word alignment for unsafe arrays. * Cleaned up duplicate code in memory management and unsafe sorters * The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to ## How was this patch tested? * Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either. * Dried up code is covered by existing tests. Author: Armin <m...@obrown.io> Closes #19254 from original-brownbear/cleanup-mem-consumer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92351f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92351f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92351f Branch: refs/heads/master Commit: 7c92351f43ac4b1710e3c80c78f7978dad491ed2 Parents: a11db94 Author: Armin <m...@obrown.io> Authored: Tue Sep 19 10:06:32 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Sep 19 10:06:32 2017 +0100 ---------------------------------------------------------------------- .../org/apache/spark/memory/MemoryConsumer.java | 26 ++++++++-------- .../spark/unsafe/map/BytesToBytesMap.java | 24 ++++++--------- .../unsafe/sort/UnsafeExternalSorter.java | 32 +++++++++----------- .../expressions/codegen/UnsafeRowWriter.java | 16 ---------- 4 files changed, 37 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 4099fb0..0efae16 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -89,13 +89,7 @@ public abstract class MemoryConsumer { long required = size * 8L; MemoryBlock page = taskMemoryManager.allocatePage(required, this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { - got = page.size(); - taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += required; return new LongArray(page); @@ -116,13 +110,7 @@ public abstract class MemoryConsumer { protected MemoryBlock allocatePage(long required) { MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { - got = page.size(); - taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += page.size(); return page; @@ -152,4 +140,14 @@ public abstract class MemoryConsumer { taskMemoryManager.releaseExecutionMemory(size, this); used -= size; } + + private void throwOom(final MemoryBlock page, final long required) { + long got = 0; + if (page != null) { + got = page.size(); + taskMemoryManager.freePage(page, this); + } + taskMemoryManager.showMemoryUsage(); + throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 610ace3..4fadfe3 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -283,13 +283,7 @@ public final class BytesToBytesMap extends MemoryConsumer { } else { currentPage = null; if (reader != null) { - // remove the spill file from disk - File file = spillWriters.removeFirst().getFile(); - if (file != null && file.exists()) { - if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - } - } + handleFailedDelete(); } try { Closeables.close(reader, /* swallowIOException = */ false); @@ -307,13 +301,7 @@ public final class BytesToBytesMap extends MemoryConsumer { public boolean hasNext() { if (numRecords == 0) { if (reader != null) { - // remove the spill file from disk - File file = spillWriters.removeFirst().getFile(); - if (file != null && file.exists()) { - if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - } - } + handleFailedDelete(); } } return numRecords > 0; @@ -403,6 +391,14 @@ public final class BytesToBytesMap extends MemoryConsumer { public void remove() { throw new UnsupportedOperationException(); } + + private void handleFailedDelete() { + // remove the spill file from disk + File file = spillWriters.removeFirst().getFile(); + if (file != null && file.exists() && !file.delete()) { + logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index de44640..39eda00 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -219,15 +219,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, inMemSorter.numRecords()); spillWriters.add(spillWriter); - final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final Object baseObject = sortedRecords.getBaseObject(); - final long baseOffset = sortedRecords.getBaseOffset(); - final int recordLength = sortedRecords.getRecordLength(); - spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); - } - spillWriter.close(); + spillIterator(inMemSorter.getSortedIterator(), spillWriter); } final long spillSize = freeMemory(); @@ -488,6 +480,18 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } } + private static void spillIterator(UnsafeSorterIterator inMemIterator, + UnsafeSorterSpillWriter spillWriter) throws IOException { + while (inMemIterator.hasNext()) { + inMemIterator.loadNext(); + final Object baseObject = inMemIterator.getBaseObject(); + final long baseOffset = inMemIterator.getBaseOffset(); + final int recordLength = inMemIterator.getRecordLength(); + spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix()); + } + spillWriter.close(); + } + /** * An UnsafeSorterIterator that support spilling. */ @@ -503,6 +507,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.numRecords = inMemIterator.getNumRecords(); } + @Override public int getNumRecords() { return numRecords; } @@ -521,14 +526,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); - while (inMemIterator.hasNext()) { - inMemIterator.loadNext(); - final Object baseObject = inMemIterator.getBaseObject(); - final long baseOffset = inMemIterator.getBaseOffset(); - final int recordLength = inMemIterator.getRecordLength(); - spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix()); - } - spillWriter.close(); + spillIterator(inMemIterator, spillWriter); spillWriters.add(spillWriter); nextUpstream = spillWriter.getReader(serializerManager); http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index 4776617..5d9515c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -109,22 +109,6 @@ public class UnsafeRowWriter { Platform.putLong(holder.buffer, fieldOffset, offsetAndSize); } - // Do word alignment for this row and grow the row buffer if needed. - // todo: remove this after we make unsafe array data word align. - public void alignToWords(int numBytes) { - final int remainder = numBytes & 0x07; - - if (remainder > 0) { - final int paddingBytes = 8 - remainder; - holder.grow(paddingBytes); - - for (int i = 0; i < paddingBytes; i++) { - Platform.putByte(holder.buffer, holder.cursor, (byte) 0); - holder.cursor++; - } - } - } - public void write(int ordinal, boolean value) { final long offset = getFieldOffset(ordinal); Platform.putLong(holder.buffer, offset, 0L); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org