Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5868#discussion_r29816469
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSpillWriter.java
 ---
    @@ -0,0 +1,344 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.unsafe;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +
    +import org.apache.spark.storage.*;
    +import scala.Tuple2;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.spark.SparkConf;
    +import org.apache.spark.TaskContext;
    +import org.apache.spark.executor.ShuffleWriteMetrics;
    +import org.apache.spark.serializer.SerializerInstance;
    +import org.apache.spark.shuffle.ShuffleMemoryManager;
    +import org.apache.spark.unsafe.PlatformDependent;
    +import org.apache.spark.unsafe.memory.MemoryBlock;
    +import org.apache.spark.unsafe.memory.TaskMemoryManager;
    +
    +/**
    + * An external sorter that is specialized for sort-based shuffle.
    + * <p>
    + * Incoming records are appended to data pages. When all records have been 
inserted (or when the
    + * current thread's shuffle memory limit is reached), the in-memory 
records are sorted according to
    + * their partition ids (using a {@link UnsafeShuffleSorter}). The sorted 
records are then written
    + * to a single output file (or multiple files, if we've spilled). The 
format of the output files is
    + * the same as the format of the final output file written by
    + * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output 
partition's records are
    + * written as a single serialized, compressed stream that can be read with 
a new decompression and
    + * deserialization stream.
    + * <p>
    + * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this 
sorter does not merge its
    + * spill files. Instead, this merging is performed in {@link 
UnsafeShuffleWriter}, which uses a
    + * specialized merge procedure that avoids extra 
serialization/deserialization.
    + */
    +public final class UnsafeShuffleSpillWriter {
    +
    +  private final Logger logger = 
LoggerFactory.getLogger(UnsafeShuffleSpillWriter.class);
    +
    +  private static final int SER_BUFFER_SIZE = 1024 * 1024;  // TODO: tune 
this / don't duplicate
    +  private static final int PAGE_SIZE = 1024 * 1024;  // TODO: tune this
    +
    +  private final int initialSize;
    +  private final int numPartitions;
    +  private final TaskMemoryManager memoryManager;
    +  private final ShuffleMemoryManager shuffleMemoryManager;
    +  private final BlockManager blockManager;
    +  private final TaskContext taskContext;
    +  private final boolean spillingEnabled;
    +  private ShuffleWriteMetrics writeMetrics;
    +
    +  /** The buffer size to use when writing spills using 
DiskBlockObjectWriter */
    +  private final int fileBufferSize;
    +
    +  /**
    +   * Memory pages that hold the records being sorted. The pages in this 
list are freed when
    +   * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
    +   * this might not be necessary if we maintained a pool of re-usable 
pages in the TaskMemoryManager
    +   * itself).
    +   */
    +  private final LinkedList<MemoryBlock> allocatedPages = new 
LinkedList<MemoryBlock>();
    +
    +  private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
    +
    +  // All three of these variables are reset after spilling:
    +  private UnsafeShuffleSorter sorter;
    +  private MemoryBlock currentPage = null;
    +  private long currentPagePosition = -1;
    +
    +  public UnsafeShuffleSpillWriter(
    +      TaskMemoryManager memoryManager,
    +      ShuffleMemoryManager shuffleMemoryManager,
    +      BlockManager blockManager,
    +      TaskContext taskContext,
    +      int initialSize,
    +      int numPartitions,
    +      SparkConf conf) throws IOException {
    +    this.memoryManager = memoryManager;
    +    this.shuffleMemoryManager = shuffleMemoryManager;
    +    this.blockManager = blockManager;
    +    this.taskContext = taskContext;
    +    this.initialSize = initialSize;
    +    this.numPartitions = numPartitions;
    +    this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
    +    // Use getSizeAsKb (not bytes) to maintain backwards compatibility for 
units
    +    this.fileBufferSize = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
    +    openSorter();
    +  }
    +
    +  // TODO: metrics tracking + integration with shuffle write metrics
    +
    +  /**
    +   * Allocates a new sorter. Called when opening the spill writer for the 
first time and after
    +   * each spill.
    +   */
    +  private void openSorter() throws IOException {
    +    this.writeMetrics = new ShuffleWriteMetrics();
    +    // TODO: connect write metrics to task metrics?
    +    // TODO: move this sizing calculation logic into a static method of 
sorter:
    +    final long memoryRequested = initialSize * 8L;
    +    if (spillingEnabled) {
    +      final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(memoryRequested);
    +      if (memoryAcquired != memoryRequested) {
    +        shuffleMemoryManager.release(memoryAcquired);
    +        throw new IOException("Could not acquire memory!");
    +      }
    +    }
    +
    +    this.sorter = new UnsafeShuffleSorter(initialSize);
    +  }
    +
    +  /**
    +   * Sorts the in-memory records, writes the sorted records to a spill 
file, and frees the in-memory
    +   * data structures associated with this sort. New data structures are 
not automatically allocated.
    +   */
    +  private SpillInfo writeSpillFile() throws IOException {
    +    // This call performs the actual sort.
    +    final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords =
    +      sorter.getSortedIterator();
    +
    +    // Currently, we need to open a new DiskBlockObjectWriter for each 
partition; we can avoid this
    +    // after SPARK-5581 is fixed.
    +    BlockObjectWriter writer = null;
    +
    +    // Small writes to DiskBlockObjectWriter will be fairly inefficient. 
Since there doesn't seem to
    +    // be an API to directly transfer bytes from managed memory to the 
disk writer, we buffer
    +    // records in a byte array. This array only needs to be big enough to 
hold a single record.
    +    final byte[] arr = new byte[SER_BUFFER_SIZE];
    +
    +    // Because this output will be read during shuffle, its compression 
codec must be controlled by
    +    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so 
we need to use
    +    // createTempShuffleBlock here; see SPARK-3426 for more details.
    +    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    +      blockManager.diskBlockManager().createTempShuffleBlock();
    +    final File file = spilledFileInfo._2();
    +    final BlockId blockId = spilledFileInfo._1();
    +    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, 
blockId);
    +
    +    // Unfortunately, we need a serializer instance in order to construct 
a DiskBlockObjectWriter.
    +    // Our write path doesn't actually use this serializer (since we end 
up calling the `write()`
    +    // OutputStream methods), but DiskBlockObjectWriter still calls some 
methods on it. To work
    +    // around this, we pass a dummy no-op serializer.
    +    final SerializerInstance ser = new DummySerializerInstance();
    +    writer = blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSize, writeMetrics);
    +
    +    int currentPartition = -1;
    +    while (sortedRecords.hasNext()) {
    +      sortedRecords.loadNext();
    +      final int partition = 
sortedRecords.packedRecordPointer.getPartitionId();
    +      assert (partition >= currentPartition);
    +      if (partition != currentPartition) {
    +        // Switch to the new partition
    +        if (currentPartition != -1) {
    +          writer.commitAndClose();
    +          spillInfo.partitionLengths[currentPartition] = 
writer.fileSegment().length();
    +        }
    +        currentPartition = partition;
    +        writer = blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSize, writeMetrics);
    +      }
    +
    +      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
    +      final int recordLength = PlatformDependent.UNSAFE.getInt(
    +        memoryManager.getPage(recordPointer), 
memoryManager.getOffsetInPage(recordPointer));
    +      PlatformDependent.copyMemory(
    +        memoryManager.getPage(recordPointer),
    +        memoryManager.getOffsetInPage(recordPointer) + 4, // skip over 
record length
    +        arr,
    +        PlatformDependent.BYTE_ARRAY_OFFSET,
    +        recordLength);
    +      assert (writer != null);  // To suppress an IntelliJ warning
    +      writer.write(arr, 0, recordLength);
    +      // TODO: add a test that detects whether we leave this call out:
    +      writer.recordWritten();
    +    }
    +
    +    if (writer != null) {
    +      writer.commitAndClose();
    +      // If `writeSpillFile()` was called from `closeAndGetSpills()` and 
no records were inserted,
    +      // then the spill file might be empty. Note that it might be better 
to avoid calling
    +      // writeSpillFile() in that case.
    +      if (currentPartition != -1) {
    +        spillInfo.partitionLengths[currentPartition] = 
writer.fileSegment().length();
    +        spills.add(spillInfo);
    +      }
    +    }
    +    return spillInfo;
    +  }
    +
    +  /**
    +   * Sort and spill the current records in response to memory pressure.
    +   */
    +  private void spill() throws IOException {
    +    final long threadId = Thread.currentThread().getId();
    +    logger.info("Thread " + threadId + " spilling sort data of " +
    --- End diff --
    
    debug level


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to