Victsm commented on a change in pull request #30062:
URL: https://github.com/apache/spark/pull/30062#discussion_r511088423



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -0,0 +1,899 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * An implementation of {@link MergedShuffleFileManager} that provides the 
most essential shuffle
+ * service processing logic to support push based shuffle.
+ */
+public class RemoteBlockPushResolver implements MergedShuffleFileManager {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
+  private static final String MERGE_MANAGER_DIR = "merge_manager";
+
+  private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
+  private final ConcurrentMap<AppShufflePartitionId, AppShufflePartitionInfo> 
partitions;
+
+  private final Executor directoryCleaner;
+  private final TransportConf conf;
+  private final int minChunkSize;
+  private final String relativeMergeDirPathPattern;
+  private final ErrorHandler.BlockPushErrorHandler errorHandler;
+
+  @SuppressWarnings("UnstableApiUsage")
+  private final LoadingCache<File, ShuffleIndexInformation> indexCache;
+
+  @SuppressWarnings("UnstableApiUsage")
+  public RemoteBlockPushResolver(TransportConf conf, String 
relativeMergeDirPathPattern) {
+    this.conf = conf;
+    this.partitions = Maps.newConcurrentMap();
+    this.appsPathInfo = Maps.newConcurrentMap();
+    this.directoryCleaner = Executors.newSingleThreadExecutor(
+      // Add `spark` prefix because it will run in NM in Yarn mode.
+      
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
+    this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
+    CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
+      new CacheLoader<File, ShuffleIndexInformation>() {
+        public ShuffleIndexInformation load(File file) throws IOException {
+          return new ShuffleIndexInformation(file);
+        }
+      };
+    indexCache = CacheBuilder.newBuilder()
+      .maximumWeight(conf.mergedIndexCacheSize())
+      .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> 
indexInfo.getSize())
+      .build(indexCacheLoader);
+    this.relativeMergeDirPathPattern = relativeMergeDirPathPattern;
+    this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
+  }
+
+  /**
+   * Given an ID that uniquely identifies a given shuffle partition of an 
application, retrieves the
+   * associated metadata. If not present and the corresponding merged shuffle 
does not exist,
+   * initializes the metadata.
+   */
+  private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(AppShufflePartitionId id) {
+    return partitions.computeIfAbsent(id, key -> {
+      // It only gets here when the key is not present in the map. This could 
either
+      // be the first time the merge manager receives a pushed block for a 
given application
+      // shuffle partition, or after the merged shuffle file is finalized. We 
handle these
+      // two cases accordingly by checking if the file already exists.
+      File dataFile = getMergedShuffleDataFile(id);
+      File indexFile = getMergedShuffleIndexFile(id);
+      File metaFile = getMergedShuffleMetaFile(id);
+      try {
+        if (dataFile.exists()) {
+          return null;
+        } else {
+          return new AppShufflePartitionInfo(id, dataFile, indexFile, 
metaFile);
+        }
+      } catch (IOException e) {
+        logger.error(
+          "Cannot create merged shuffle partition {} with shuffle file {}, 
index file {}, and "
+            + "meta file {}", key, indexFile.getAbsolutePath(),
+            indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
+        throw new RuntimeException(
+          String.format("Cannot initialize merged shuffle partition %s", 
key.toString()), e);
+      }
+    });
+  }
+
+  @Override
+  public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
+    AppShufflePartitionId id = new AppShufflePartitionId(appId, shuffleId, 
reduceId);
+    File indexFile = getMergedShuffleIndexFile(id);
+    if (!indexFile.exists()) {
+      throw new RuntimeException(String.format(
+        "Merged shuffle index file %s of %s not found", indexFile.getPath(), 
id.toString()));
+    }
+    int size = (int) indexFile.length();
+    // First entry is the zero offset
+    int numChunks = (size / Long.BYTES) - 1;
+    File metaFile = getMergedShuffleMetaFile(id);
+    if (!metaFile.exists()) {
+      throw new RuntimeException(String.format("Merged shuffle meta file %s of 
%s not found",
+        metaFile.getPath(), id.toString()));
+    }
+    FileSegmentManagedBuffer chunkBitMaps =
+      new FileSegmentManagedBuffer(conf, metaFile, 0L, metaFile.length());
+    logger.trace(
+      "{} shuffleId {} reduceId {} num chunks {}", appId, shuffleId, reduceId, 
numChunks);
+    return new MergedBlockMeta(numChunks, chunkBitMaps);
+  }
+
+  @SuppressWarnings("UnstableApiUsage")
+  @Override
+  public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int 
reduceId, int chunkId) {
+    AppShufflePartitionId id = new AppShufflePartitionId(appId, shuffleId, 
reduceId);
+    File dataFile = getMergedShuffleDataFile(id);
+    if (!dataFile.exists()) {
+      throw new RuntimeException(String.format("Merged shuffle data file %s of 
%s not found",
+        dataFile.getPath(), id.toString()));
+    }
+    File indexFile = getMergedShuffleIndexFile(id);
+    try {
+      // If we get here, the merged shuffle file should have been properly 
finalized. Thus we can
+      // use the file length to determine the size of the merged shuffle block.
+      ShuffleIndexInformation shuffleIndexInformation = 
indexCache.get(indexFile);
+      ShuffleIndexRecord shuffleIndexRecord = 
shuffleIndexInformation.getIndex(chunkId);
+      return new FileSegmentManagedBuffer(
+        conf, dataFile, shuffleIndexRecord.getOffset(), 
shuffleIndexRecord.getLength());
+    } catch (ExecutionException e) {
+      throw new RuntimeException(String.format(
+        "Failed to open merged shuffle index file %s of %s", 
indexFile.getPath(), id.toString()),
+          e);
+    }
+  }
+
+  /**
+   * The logic here is consistent with
+   * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+   */
+  // TODO should we use subDirsPerLocalDir to potentially reduce inode size?
+  private File getFile(String appId, String filename) {
+    int hash = JavaUtils.nonNegativeHash(filename);
+    // TODO: Change the message when this service is able to handle NM restart
+    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
+      "application " + appId + " is not registered or NM was restarted.");
+    Path[] activeLocalDirs = getActiveLocalDirs(appPathsInfo.activeLocalDirs);
+    Path localDir = activeLocalDirs[hash % activeLocalDirs.length];
+    String relativePath = getRelativePath(appPathsInfo.user, appId);
+    Path filePath = localDir.resolve(relativePath);
+    File targetFile = new File(filePath.toFile(), filename);
+    logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+    return targetFile;
+  }
+
+  private Path[] getActiveLocalDirs(String[] activeLocalDirs) {
+    Preconditions.checkNotNull(activeLocalDirs,
+      "Active local dirs list has not been updated by any executor 
registration");
+    return Arrays.stream(activeLocalDirs).map(localDir -> 
Paths.get(localDir)).toArray(Path[]::new);
+  }
+
+  private String getRelativePath(String user, String appId) {
+    return String.format(relativeMergeDirPathPattern + MERGE_MANAGER_DIR, 
user, appId);
+  }
+
+  private File getMergedShuffleDataFile(AppShufflePartitionId id) {
+    String fileName = String.format("%s.data", id.generateFileName());
+    return getFile(id.appId, fileName);
+  }
+
+  private File getMergedShuffleIndexFile(AppShufflePartitionId id) {
+    String indexName = String.format("%s.index", id.generateFileName());
+    return getFile(id.appId, indexName);
+  }
+
+  private File getMergedShuffleMetaFile(AppShufflePartitionId id) {
+    String metaName = String.format("%s.meta", id.generateFileName());
+    return getFile(id.appId, metaName);
+  }
+
+  @Override
+  public String[] getMergedBlockDirs(String appId) {
+    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
+      "application " + appId + " is not registered or NM was restarted.");
+    String[] activeLocalDirs = 
Preconditions.checkNotNull(appsPathInfo.get(appId).activeLocalDirs,
+      "application " + appId
+      + " active local dirs list has not been updated by any executor 
registration");
+    return Arrays.stream(activeLocalDirs)
+      .map(dir -> dir + getRelativePath(appPathsInfo.user, appId))
+      .toArray(String[]::new);
+  }
+
+  @Override
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
+    // TODO: Change the message when this service is able to handle NM restart
+    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
+      "application " + appId + " is not registered or NM was restarted.");
+    Iterator<Map.Entry<AppShufflePartitionId, AppShufflePartitionInfo>> 
iterator =
+      partitions.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<AppShufflePartitionId, AppShufflePartitionInfo> entry = 
iterator.next();
+      AppShufflePartitionId partitionId = entry.getKey();
+      AppShufflePartitionInfo partition = entry.getValue();
+      if (appId.equals(partitionId.appId)) {
+        iterator.remove();
+        try {
+          partition.channel.close();
+        } catch (IOException e) {
+          logger.error("Error closing merged shuffle file for {}", 
partitionId);
+        }
+      }
+    }
+
+    if (cleanupLocalDirs) {
+      Path[] dirs = 
Arrays.stream(getActiveLocalDirs(appPathsInfo.activeLocalDirs))
+        .map(dir -> dir.resolve(getRelativePath(appPathsInfo.user, appId)))
+        .toArray(Path[]::new);
+      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+    }
+  }
+
+  /**
+   * Synchronously delete local dirs, executed in a separate thread.
+   */
+  private void deleteExecutorDirs(Path[] dirs) {
+    for (Path localDir : dirs) {
+      try {
+        if (Files.exists(localDir)) {
+          JavaUtils.deleteRecursively(localDir.toFile());
+          logger.debug("Successfully cleaned up directory: {}", localDir);
+        }
+      } catch (Exception e) {
+        logger.error("Failed to delete directory: {}", localDir, e);
+      }
+    }
+  }
+
+  @Override
+  public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+    // Retrieve merged shuffle file metadata
+    String[] blockIdParts = msg.blockId.split("_");
+    if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
+      throw new IllegalArgumentException("Unexpected shuffle block id format: 
" + msg.blockId);
+    }
+    AppShufflePartitionId partitionId = new AppShufflePartitionId(
+      msg.appId, Integer.parseInt(blockIdParts[1]), 
Integer.parseInt(blockIdParts[3]));
+    int mapIndex = Integer.parseInt(blockIdParts[2]);
+    AppShufflePartitionInfo partitionInfoBeforeCheck =
+      getOrCreateAppShufflePartitionInfo(partitionId);
+
+    // Here partitionInfo will be null in 2 cases:
+    // 1) The request is received for a block that has already been merged, 
this is possible due
+    // to the retry logic.
+    // 2) The request is received after the merged shuffle is finalized, thus 
is too late.
+    //
+    // For case 1, we will drain the data in the channel and just respond 
success
+    // to the client. This is required because the response of the previously 
merged
+    // block will be ignored by the client, per the logic in 
RetryingBlockFetcher.
+    // Note that the netty server should receive data for a given block id 
only from 1 channel
+    // at any time. The block should be pushed only from successful maps, thus 
there should be
+    // only 1 source for a given block at any time. Although the netty client 
might retry sending
+    // this block to the server multiple times, the data of the same block 
always arrives from the
+    // same channel thus the server should have already processed the previous 
request of this
+    // block before seeing it again in the channel. This guarantees that we 
can simply just
+    // check the bitmap to determine if a block is a duplicate or not.
+    //
+    // For case 2, we will also drain the data in the channel, but throw an 
exception in
+    // {@link 
org.apache.spark.network.client.StreamCallback#onComplete(String)}. This way,
+    // the client will be notified of the failure but the channel will remain 
active. Keeping
+    // the channel alive is important because the same channel could be reused 
by multiple map
+    // tasks in the executor JVM, which belongs to different stages. While one 
of the shuffles
+    // in these stages is finalized, the others might still be active. Tearing 
down the channel
+    // on the server side will disrupt these other on-going shuffle merges. 
It's also important
+    // to notify the client of the failure, so that it can properly halt 
pushing the remaining
+    // blocks upon receiving such failures to preserve resources on the 
server/client side.
+    //
+    // Speculative execution would also raise a possible scenario with 
duplicate blocks. Although
+    // speculative execution would kill the slower task attempt, leading to 
only 1 task attempt
+    // succeeding in the end, there is no guarantee that only one copy of the 
block will be
+    // pushed. This is due to our handling of block push process outside of 
the map task, thus
+    // it is possible for the speculative task attempt to initiate the block 
push process before
+    // getting killed. When this happens, we need to distinguish the duplicate 
blocks as they
+    // arrive. More details on this is explained in later comments.
+
+    // Track if the block is received after shuffle merge finalize
+    final boolean isTooLate = partitionInfoBeforeCheck == null;
+    // Check if the given block is already merged by checking the bitmap 
against the given map index
+    final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != 
null
+      && partitionInfoBeforeCheck.mapTracker.contains(mapIndex) ? null : 
partitionInfoBeforeCheck;
+
+    return new StreamCallbackWithID() {
+      private int length = 0;
+      // This indicates that this stream got the opportunity to write the 
blocks to the merged file.
+      // Once this is set to true and the stream encounters a failure then it 
will take necessary
+      // action to overwrite any partial written data. This is reset to false 
when the stream
+      // completes without any failures.
+      private boolean isWriting = false;
+      // Use on-heap instead of direct ByteBuffer since these buffers will be 
GC'ed very quickly
+      private List<ByteBuffer> deferredBufs;
+
+      @Override
+      public String getID() {
+        return msg.blockId;
+      }
+
+      /**
+       * Write a ByteBuffer to the merged shuffle file. Here we keep track of 
the length of the
+       * block data written to file. In case of failure during writing block 
to file, we use the
+       * information tracked in partitionInfo to overwrite the corrupt block 
when writing the new
+       * block.
+       */
+      private void writeBuf(ByteBuffer buf) throws IOException {
+        while (buf.hasRemaining()) {
+          assert partitionInfo != null;
+          if (partitionInfo.isEncounteredFailure()) {
+            long updatedPos = partitionInfo.getPosition() + length;
+            logger.debug(
+              "{} shuffleId {} reduceId {} encountered failure current pos {} 
updated pos {}",
+              partitionId.appId, partitionId.shuffleId, partitionId.reduceId,
+              partitionInfo.getPosition(), updatedPos);
+            length += partitionInfo.channel.write(buf, updatedPos);
+          } else {
+            length += partitionInfo.channel.write(buf);
+          }
+        }
+      }
+
+      /**
+       * There will be multiple streams of map blocks belonging to the same 
reduce partition. At any
+       * given point of time, only a single map stream can write it's data to 
the merged file. Until

Review comment:
       Nit: it's -> its




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to