Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5868#discussion_r30176759
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.unsafe;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import scala.Tuple2;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.spark.SparkConf;
    +import org.apache.spark.TaskContext;
    +import org.apache.spark.executor.ShuffleWriteMetrics;
    +import org.apache.spark.serializer.SerializerInstance;
    +import org.apache.spark.shuffle.ShuffleMemoryManager;
    +import org.apache.spark.storage.*;
    +import org.apache.spark.unsafe.PlatformDependent;
    +import org.apache.spark.unsafe.memory.MemoryBlock;
    +import org.apache.spark.unsafe.memory.TaskMemoryManager;
    +
    +/**
    + * An external sorter that is specialized for sort-based shuffle.
    + * <p>
    + * Incoming records are appended to data pages. When all records have been 
inserted (or when the
    + * current thread's shuffle memory limit is reached), the in-memory 
records are sorted according to
    + * their partition ids (using a {@link UnsafeShuffleSorter}). The sorted 
records are then written
    + * to a single output file (or multiple files, if we've spilled). The 
format of the output files is
    + * the same as the format of the final output file written by
    + * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output 
partition's records are
    + * written as a single serialized, compressed stream that can be read with 
a new decompression and
    + * deserialization stream.
    + * <p>
    + * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this 
sorter does not merge its
    + * spill files. Instead, this merging is performed in {@link 
UnsafeShuffleWriter}, which uses a
    + * specialized merge procedure that avoids extra 
serialization/deserialization.
    + */
    +final class UnsafeShuffleExternalSorter {
    +
    +  private final Logger logger = 
LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
    +
    +  private static final int PAGE_SIZE = 
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
    +  @VisibleForTesting
    +  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
    +  @VisibleForTesting
    +  static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
    +
    +  private final int initialSize;
    +  private final int numPartitions;
    +  private final TaskMemoryManager memoryManager;
    +  private final ShuffleMemoryManager shuffleMemoryManager;
    +  private final BlockManager blockManager;
    +  private final TaskContext taskContext;
    +  private final boolean spillingEnabled;
    +  private final ShuffleWriteMetrics writeMetrics;
    +
    +  /** The buffer size to use when writing spills using 
DiskBlockObjectWriter */
    +  private final int fileBufferSize;
    +
    +  /**
    +   * Memory pages that hold the records being sorted. The pages in this 
list are freed when
    +   * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
    +   * this might not be necessary if we maintained a pool of re-usable 
pages in the TaskMemoryManager
    +   * itself).
    +   */
    +  private final LinkedList<MemoryBlock> allocatedPages = new 
LinkedList<MemoryBlock>();
    +
    +  private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
    +
    +  // All three of these variables are reset after spilling:
    +  private UnsafeShuffleSorter sorter;
    +  private MemoryBlock currentPage = null;
    +  private long currentPagePosition = -1;
    +  private long freeSpaceInCurrentPage = 0;
    +
    +  public UnsafeShuffleExternalSorter(
    +      TaskMemoryManager memoryManager,
    +      ShuffleMemoryManager shuffleMemoryManager,
    +      BlockManager blockManager,
    +      TaskContext taskContext,
    +      int initialSize,
    +      int numPartitions,
    +      SparkConf conf,
    +      ShuffleWriteMetrics writeMetrics) throws IOException {
    +    this.memoryManager = memoryManager;
    +    this.shuffleMemoryManager = shuffleMemoryManager;
    +    this.blockManager = blockManager;
    +    this.taskContext = taskContext;
    +    this.initialSize = initialSize;
    +    this.numPartitions = numPartitions;
    +    this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
    +    // Use getSizeAsKb (not bytes) to maintain backwards compatibility for 
units
    +    this.fileBufferSize = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
    +    this.writeMetrics = writeMetrics;
    +    openSorter();
    +  }
    +
    +  /**
    +   * Allocates a new sorter. Called when opening the spill writer for the 
first time and after
    +   * each spill.
    +   */
    +  private void openSorter() throws IOException {
    +    // TODO: move this sizing calculation logic into a static method of 
sorter:
    +    final long memoryRequested = initialSize * 8L;
    +    if (spillingEnabled) {
    +      final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(memoryRequested);
    +      if (memoryAcquired != memoryRequested) {
    +        shuffleMemoryManager.release(memoryAcquired);
    +        throw new IOException("Could not acquire memory!");
    +      }
    +    }
    +
    +    this.sorter = new UnsafeShuffleSorter(initialSize);
    +  }
    +
    +  /**
    +   * Sorts the in-memory records and writes the sorted records to a spill 
file.
    +   * This method does not free the sort data structures.
    +   *
    +   * @param isSpill if true, this indicates that we're writing a spill and 
that bytes written should
    +   *                be counted towards shuffle spill metrics rather than 
shuffle write metrics.
    +   */
    +  private void writeSpillFile(boolean isSpill) throws IOException {
    --- End diff --
    
    I went with `writeSortedFile` to emphasize that this writes a new file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to