jiangxb1987 commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r503017066



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an 
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this 
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+
+  boolean shouldRetryError(Throwable t);
+
+  default boolean shouldLogError(Throwable t) {
+    return true;
+  }
+
+  /**
+   * A no-op error handler instance.
+   */
+  ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+  /**
+   * The error handler for pushing shuffle blocks to remote shuffle services.
+   */
+  class BlockPushErrorHandler implements ErrorHandler {
+    /**
+     * String constant used for generating exception messages indicating a 
block to be merged
+     * arrives too late on the server side, and also for later checking such 
exceptions on the
+     * client side. When we get a block push failure because of the block 
arrives too late, we
+     * will not retry pushing the block nor log the exception on the client 
side.
+     */
+    public static final String TOO_LATE_MESSAGE_SUFFIX =
+      "received after merged shuffle is finalized";
+
+    /**
+     * String constant used for generating exception messages indicating the 
server couldn't
+     * append a block after all available attempts due to collision with other 
blocks belonging
+     * to the same shuffle partition, and also for later checking such 
exceptions on the client
+     * side. When we get a block push failure because of the block couldn't be 
written due to
+     * this reason, we will not log the exception on the client side.
+     */
+    public static final String COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX =

Review comment:
       nit: can we give it a more meaningful name? like 
BLOCK_APPEND_FAILED_MSG_PREFIX or BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -373,6 +427,54 @@ public ManagedBuffer next() {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when 
push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements 
MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerExecutor(String appId, String[] localDirs) {
+      // No-Op. Do nothing.
+    }
+
+    @Override
+    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public ManagedBuffer getMergedBlockData(
+        String appId, int shuffleId, int reduceId, int chunkId) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
+      return null;

Review comment:
       Why return null instead of throwing an exception?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Contains meta information for a merged block. Currently this information 
constitutes:
+ * 1. Number of chunks in a merged shuffle block.
+ * 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all 
the mapIds that were
+ *    merged to that merged block chunk.
+ */
+public class MergedBlockMeta {
+  private final int numChunks;
+  private final ManagedBuffer chunksBitmapBuffer;
+
+  public MergedBlockMeta(int numChunks, ManagedBuffer chunksBitmapBuffer) {
+    this.numChunks = numChunks;
+    this.chunksBitmapBuffer = Preconditions.checkNotNull(chunksBitmapBuffer);
+  }
+
+  public int getNumChunks() {
+    return numChunks;
+  }
+
+  public ManagedBuffer getChunksBitmapBuffer() {
+    return chunksBitmapBuffer;
+  }
+
+  public RoaringBitmap[] readChunkBitmaps() throws IOException {
+    ByteBuf buf = Unpooled.wrappedBuffer(chunksBitmapBuffer.nioByteBuffer());
+    List<RoaringBitmap> bitmaps = new ArrayList<>();
+    while(buf.isReadable()) {
+      bitmaps.add(Encoders.Bitmaps.decode(buf));
+    }
+    assert (bitmaps.size() == numChunks);

Review comment:
       would it be a bit late to check the bitmaps size here? when the 
assertion fails, you already lost the context when the MergedBlockMeta is 
created?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -373,6 +427,54 @@ public ManagedBuffer next() {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when 
push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements 
MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerExecutor(String appId, String[] localDirs) {
+      // No-Op. Do nothing.
+    }
+
+    @Override
+    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");

Review comment:
       we should also make it a NoOp?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -89,6 +110,21 @@ public void receive(TransportClient client, ByteBuffer 
message, RpcResponseCallb
     handleMessage(msgObj, client, callback);
   }
 
+  @Override
+  public StreamCallbackWithID receiveStream(

Review comment:
       rename to `receivePushBlockStream` ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -20,18 +20,21 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 import com.codahale.metrics.MetricSet;
 import com.google.common.collect.Lists;
+
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
 import org.apache.spark.network.shuffle.protocol.*;
-
 import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.buffer.ManagedBuffer;

Review comment:
       nit: adjust the line position?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -116,6 +121,42 @@ public void fetchBlocks(
     }
   }
 
+  @Override
+  public void pushBlocks(
+      String host,
+      int port,
+      String[] blockIds,
+      ManagedBuffer[] buffers,
+      BlockFetchingListener listener) {
+    checkInit();
+    assert blockIds.length == buffers.length : "Number of block ids and 
buffers do not match.";
+
+    Map<String, ManagedBuffer> buffersWithId = new HashMap<>();
+    for (int i = 0; i < blockIds.length; i++) {
+      buffersWithId.put(blockIds[i], buffers[i]);
+    }
+    logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, 
port);
+    try {
+      RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
+          (blockIds1, listener1) -> {

Review comment:
       nit: rename the `(blockIds1, listener1)` to `(inputBlockIds, 
inputListener)`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Contains meta information for a merged block. Currently this information 
constitutes:
+ * 1. Number of chunks in a merged shuffle block.
+ * 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all 
the mapIds that were
+ *    merged to that merged block chunk.
+ */
+public class MergedBlockMeta {
+  private final int numChunks;
+  private final ManagedBuffer chunksBitmapBuffer;
+
+  public MergedBlockMeta(int numChunks, ManagedBuffer chunksBitmapBuffer) {
+    this.numChunks = numChunks;
+    this.chunksBitmapBuffer = Preconditions.checkNotNull(chunksBitmapBuffer);
+  }
+
+  public int getNumChunks() {
+    return numChunks;
+  }
+
+  public ManagedBuffer getChunksBitmapBuffer() {
+    return chunksBitmapBuffer;
+  }
+
+  public RoaringBitmap[] readChunkBitmaps() throws IOException {

Review comment:
       If the read action happens for multiple times, we would repeatedly wrap 
from the bitmap buffer, right?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -116,6 +121,42 @@ public void fetchBlocks(
     }
   }
 
+  @Override
+  public void pushBlocks(
+      String host,
+      int port,
+      String[] blockIds,
+      ManagedBuffer[] buffers,
+      BlockFetchingListener listener) {
+    checkInit();
+    assert blockIds.length == buffers.length : "Number of block ids and 
buffers do not match.";
+
+    Map<String, ManagedBuffer> buffersWithId = new HashMap<>();
+    for (int i = 0; i < blockIds.length; i++) {
+      buffersWithId.put(blockIds[i], buffers[i]);
+    }
+    logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, 
port);
+    try {
+      RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
+          (blockIds1, listener1) -> {
+            TransportClient client = clientFactory.createClient(host, port);
+            new OneForOneBlockPusher(client, appId, blockIds1, listener1, 
buffersWithId).start();
+          };
+      int maxRetries = conf.maxIORetries();
+      if (maxRetries > 0) {
+        new RetryingBlockFetcher(
+          conf, blockPushStarter, blockIds, listener, 
PUSH_ERROR_HANDLER).start();
+      } else {
+        blockPushStarter.createAndStart(blockIds, listener);

Review comment:
       What's the benefit of handle this case separately here, instead of just 
create a `RetryingBlockFetcher` anyway?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -156,6 +193,20 @@ protected void handleMessage(
       Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId, 
msg.execIds);
       callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
 
+    } else if (msgObj instanceof FinalizeShuffleMerge) {
+      final Timer.Context responseDelayContext =
+          metrics.finalizeShuffleMergeLatencyMillis.time();
+      FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
+      try {
+        checkAuth(client, msg.appId);
+        MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
+        callback.onSuccess(statuses.toByteBuffer());
+      } catch(IOException e) {
+        throw new RuntimeException(String.format("Error while finalizing 
shuffle merge "

Review comment:
       Should we also append the original IOException in the new 
RuntimeException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to