Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/5868#discussion_r30109992
--- Diff:
core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import scala.Tuple2;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.storage.*;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+
+/**
+ * An external sorter that is specialized for sort-based shuffle.
+ * <p>
+ * Incoming records are appended to data pages. When all records have been
inserted (or when the
+ * current thread's shuffle memory limit is reached), the in-memory
records are sorted according to
+ * their partition ids (using a {@link UnsafeShuffleSorter}). The sorted
records are then written
+ * to a single output file (or multiple files, if we've spilled). The
format of the output files is
+ * the same as the format of the final output file written by
+ * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output
partition's records are
+ * written as a single serialized, compressed stream that can be read with
a new decompression and
+ * deserialization stream.
+ * <p>
+ * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this
sorter does not merge its
+ * spill files. Instead, this merging is performed in {@link
UnsafeShuffleWriter}, which uses a
+ * specialized merge procedure that avoids extra
serialization/deserialization.
+ */
+final class UnsafeShuffleExternalSorter {
+
+ private final Logger logger =
LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
+
+ private static final int PAGE_SIZE =
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
+ @VisibleForTesting
+ static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+ @VisibleForTesting
+ static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
+
+ private final int initialSize;
+ private final int numPartitions;
+ private final TaskMemoryManager memoryManager;
+ private final ShuffleMemoryManager shuffleMemoryManager;
+ private final BlockManager blockManager;
+ private final TaskContext taskContext;
+ private final boolean spillingEnabled;
+ private final ShuffleWriteMetrics writeMetrics;
+
+ /** The buffer size to use when writing spills using
DiskBlockObjectWriter */
+ private final int fileBufferSize;
+
+ /**
+ * Memory pages that hold the records being sorted. The pages in this
list are freed when
+ * spilling, although in principle we could recycle these pages across
spills (on the other hand,
+ * this might not be necessary if we maintained a pool of re-usable
pages in the TaskMemoryManager
+ * itself).
+ */
+ private final LinkedList<MemoryBlock> allocatedPages = new
LinkedList<MemoryBlock>();
+
+ private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
+
+ // All three of these variables are reset after spilling:
+ private UnsafeShuffleSorter sorter;
+ private MemoryBlock currentPage = null;
+ private long currentPagePosition = -1;
+ private long freeSpaceInCurrentPage = 0;
+
+ public UnsafeShuffleExternalSorter(
+ TaskMemoryManager memoryManager,
+ ShuffleMemoryManager shuffleMemoryManager,
+ BlockManager blockManager,
+ TaskContext taskContext,
+ int initialSize,
+ int numPartitions,
+ SparkConf conf,
+ ShuffleWriteMetrics writeMetrics) throws IOException {
+ this.memoryManager = memoryManager;
+ this.shuffleMemoryManager = shuffleMemoryManager;
+ this.blockManager = blockManager;
+ this.taskContext = taskContext;
+ this.initialSize = initialSize;
+ this.numPartitions = numPartitions;
+ this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility for
units
+ this.fileBufferSize = (int)
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.writeMetrics = writeMetrics;
+ openSorter();
+ }
+
+ /**
+ * Allocates a new sorter. Called when opening the spill writer for the
first time and after
+ * each spill.
+ */
+ private void openSorter() throws IOException {
+ // TODO: move this sizing calculation logic into a static method of
sorter:
+ final long memoryRequested = initialSize * 8L;
+ if (spillingEnabled) {
+ final long memoryAcquired =
shuffleMemoryManager.tryToAcquire(memoryRequested);
+ if (memoryAcquired != memoryRequested) {
+ shuffleMemoryManager.release(memoryAcquired);
+ throw new IOException("Could not acquire memory!");
+ }
+ }
+
+ this.sorter = new UnsafeShuffleSorter(initialSize);
+ }
+
+ /**
+ * Sorts the in-memory records and writes the sorted records to a spill
file.
+ * This method does not free the sort data structures.
+ *
+ * @param isSpill if true, this indicates that we're writing a spill and
that bytes written should
+ * be counted towards shuffle spill metrics rather than
shuffle write metrics.
+ */
+ private void writeSpillFile(boolean isSpill) throws IOException {
+
+ final ShuffleWriteMetrics writeMetricsToUse;
+
+ if (isSpill) {
+ // We're spilling, so bytes written should be counted towards spill
rather than write.
+ // Create a dummy WriteMetrics object to absorb these metrics, since
we don't want to count
+ // them towards shuffle bytes written.
+ writeMetricsToUse = new ShuffleWriteMetrics();
+ } else {
+ // We're writing the final non-spill file, so we _do_ want to count
this as shuffle bytes.
+ writeMetricsToUse = writeMetrics;
+ }
+
+ // This call performs the actual sort.
+ final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords =
+ sorter.getSortedIterator();
+
+ // Currently, we need to open a new DiskBlockObjectWriter for each
partition; we can avoid this
+ // after SPARK-5581 is fixed.
+ BlockObjectWriter writer;
+
+ // Small writes to DiskBlockObjectWriter will be fairly inefficient.
Since there doesn't seem to
+ // be an API to directly transfer bytes from managed memory to the
disk writer, we buffer
+ // data through a byte array. This array does not need to be large
enough to hold a single
+ // record;
+ final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+
+ // Because this output will be read during shuffle, its compression
codec must be controlled by
+ // spark.shuffle.compress instead of spark.shuffle.spill.compress, so
we need to use
+ // createTempShuffleBlock here; see SPARK-3426 for more details.
+ final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
+ blockManager.diskBlockManager().createTempShuffleBlock();
+ final File file = spilledFileInfo._2();
+ final TempShuffleBlockId blockId = spilledFileInfo._1();
+ final SpillInfo spillInfo = new SpillInfo(numPartitions, file,
blockId);
+
+ // Unfortunately, we need a serializer instance in order to construct
a DiskBlockObjectWriter.
+ // Our write path doesn't actually use this serializer (since we end
up calling the `write()`
+ // OutputStream methods), but DiskBlockObjectWriter still calls some
methods on it. To work
+ // around this, we pass a dummy no-op serializer.
+ final SerializerInstance ser = DummySerializerInstance.INSTANCE;
+
+ writer = blockManager.getDiskWriter(blockId, file, ser,
fileBufferSize, writeMetricsToUse);
+
+ int currentPartition = -1;
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final int partition =
sortedRecords.packedRecordPointer.getPartitionId();
+ assert (partition >= currentPartition);
+ if (partition != currentPartition) {
+ // Switch to the new partition
+ if (currentPartition != -1) {
+ writer.commitAndClose();
+ spillInfo.partitionLengths[currentPartition] =
writer.fileSegment().length();
+ }
+ currentPartition = partition;
+ writer = blockManager.getDiskWriter(blockId, file, ser,
fileBufferSize, writeMetricsToUse);
+ }
+
+ final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
+ final Object recordPage = memoryManager.getPage(recordPointer);
+ final long recordOffsetInPage =
memoryManager.getOffsetInPage(recordPointer);
+ int dataRemaining = PlatformDependent.UNSAFE.getInt(recordPage,
recordOffsetInPage);
+ long recordReadPosition = recordOffsetInPage + 4; // skip over
record length
+ while (dataRemaining > 0) {
+ final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE,
dataRemaining);
+ PlatformDependent.copyMemory(
+ recordPage,
+ recordReadPosition,
+ writeBuffer,
+ PlatformDependent.BYTE_ARRAY_OFFSET,
+ toTransfer);
+ writer.write(writeBuffer, 0, toTransfer);
+ recordReadPosition += toTransfer;
+ dataRemaining -= toTransfer;
+ }
+ // TODO: add a test that detects whether we leave this call out:
+ writer.recordWritten();
+ }
+
+ if (writer != null) {
+ writer.commitAndClose();
+ // If `writeSpillFile()` was called from `closeAndGetSpills()` and
no records were inserted,
+ // then the spill file might be empty. Note that it might be better
to avoid calling
+ // writeSpillFile() in that case.
+ if (currentPartition != -1) {
+ spillInfo.partitionLengths[currentPartition] =
writer.fileSegment().length();
+ spills.add(spillInfo);
+ }
+ }
+
+ if (isSpill) {
+
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
+ // Consistent with ExternalSorter, we do not count this IO towards
shuffle write time.
+ // This means that this IO time is not accounted for anywhere;
SPARK-3577 will fix this.
+ //
writeMetrics.incShuffleWriteTime(writeMetricsToUse.shuffleWriteTime());
+
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+ }
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ */
+ @VisibleForTesting
+ void spill() throws IOException {
+ final long threadId = Thread.currentThread().getId();
+ logger.info("Thread " + threadId + " spilling sort data of " +
+ org.apache.spark.util.Utils.bytesToString(getMemoryUsage()) + " to
disk (" +
+ (spills.size() + (spills.size() > 1 ? " times" : " time")) + " so
far)");
+
+ writeSpillFile(true);
+ final long sorterMemoryUsage = sorter.getMemoryUsage();
+ sorter = null;
+ shuffleMemoryManager.release(sorterMemoryUsage);
+ final long spillSize = freeMemory();
+ taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+
+ openSorter();
+ }
+
+ private long getMemoryUsage() {
+ return sorter.getMemoryUsage() + (allocatedPages.size() * (long)
PAGE_SIZE);
+ }
+
+ private long freeMemory() {
+ long memoryFreed = 0;
+ for (MemoryBlock block : allocatedPages) {
+ memoryManager.freePage(block);
+ shuffleMemoryManager.release(block.size());
+ memoryFreed += block.size();
+ }
+ allocatedPages.clear();
+ currentPage = null;
+ currentPagePosition = -1;
+ freeSpaceInCurrentPage = 0;
+ return memoryFreed;
+ }
+
+ /**
+ * Force all memory and spill files to be deleted; called by shuffle
error-handling code.
+ */
+ public void cleanupAfterError() {
+ freeMemory();
+ for (SpillInfo spill : spills) {
+ if (spill.file.exists() && !spill.file.delete()) {
+ logger.error("Unable to delete spill file {}",
spill.file.getPath());
+ }
+ }
+ if (spillingEnabled && sorter != null) {
+ shuffleMemoryManager.release(sorter.getMemoryUsage());
+ sorter = null;
+ }
+ }
+
+ /**
+ * Checks whether there is enough space to insert a new record into the
sorter.
+ *
+ * @param requiredSpace the required space in the data page, in bytes,
including space for storing
+ * the record size.
+
+ * @return true if the record can be inserted without requiring more
allocations, false otherwise.
+ */
+ private boolean haveSpaceForRecord(int requiredSpace) {
+ assert (requiredSpace > 0);
+ // The sort array will automatically expand when inserting a new
record, so we only need to
+ // worry about it having free space when spilling is enabled.
+ final boolean sortBufferHasSpace = !spillingEnabled ||
sorter.hasSpaceForAnotherRecord();
+ final boolean dataPageHasSpace = requiredSpace <=
freeSpaceInCurrentPage;
+ return (sortBufferHasSpace && dataPageHasSpace);
+ }
+
+ /**
+ * Allocates more memory in order to insert an additional record. If
spilling is enabled, this
+ * will request additional memory from the {@link ShuffleMemoryManager}
and spill if the requested
+ * memory can not be obtained. If spilling is disabled, then this will
allocate memory without
+ * coordinating with the ShuffleMemoryManager.
+ *
+ * @param requiredSpace the required space in the data page, in bytes,
including space for storing
+ * the record size.
+ */
+ private void allocateSpaceForRecord(int requiredSpace) throws
IOException {
+ if (spillingEnabled && !sorter.hasSpaceForAnotherRecord()) {
+ logger.debug("Attempting to expand sort buffer");
+ final long oldSortBufferMemoryUsage = sorter.getMemoryUsage();
+ final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2;
--- End diff --
i thought we are doing one page at a time. doubling seems too much?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]