tgravescs commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r495972834



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an 
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this 
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+  boolean shouldRetryError(Throwable t);
+
+  default boolean shouldLogError(Throwable t) {
+    return true;
+  }
+
+  /**
+   * A no-op error handler instance.
+   */
+  ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+  /**
+   * The error handler for pushing shuffle blocks to remote shuffle services.
+   */
+  class BlockPushErrorHandler implements ErrorHandler {
+
+    @Override
+    public boolean shouldRetryError(Throwable t) {
+      // If it is a connection time out or a connection closed exception, no 
need to retry.
+      if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+        return false;
+      }
+      // If the block is too late, there is no need to retry it
+      return (t.getMessage() == null ||
+          
!t.getMessage().contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX)) &&
+          (t.getCause() == null || t.getCause().getMessage() == null ||
+          
!t.getCause().getMessage().contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
+    }
+
+    @Override
+    public boolean shouldLogError(Throwable t) {
+      return (t.getMessage() == null ||
+          
(!t.getMessage().contains(BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX)
 &&

Review comment:
       why don't we want to log if its 
BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX? 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushException.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * A special exception type that would decode the encoded {@link 
PushBlockStream} from the
+ * exception String. This complements the encoding logic in
+ * {@link org.apache.spark.network.server.TransportRequestHandler}.
+ */
+public class BlockPushException extends RuntimeException {
+  private PushBlockStream header;
+
+  /**
+   * String constant used for generating exception messages indicating a block 
to be merged
+   * arrives too late on the server side, and also for later checking such 
exceptions on the
+   * client side.
+   */
+  public static final String TOO_LATE_MESSAGE_SUFFIX =

Review comment:
       I don't see these used anywhere right now so assume implementer would 
use them. Perhaps we should comment on the specific ramifications of failures 
(not retry or not log)

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote 
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This 
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied 
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+  private static final Logger logger = 
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+
+  private final TransportClient client;
+  private final String appId;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final RpcResponseCallback callback;
+  private final Map<String, ManagedBuffer> buffers;
+
+  public OneForOneBlockPusher(
+      TransportClient client,
+      String appId,
+      String[] blockIds,
+      BlockFetchingListener listener,

Review comment:
       so just to clarify you are going to rename it later? (you said you were 
thinking about it). It is a public interface though as well so wouldn't want to 
do it in minor release

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -373,6 +427,54 @@ public ManagedBuffer next() {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when 
push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements 
MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {

Review comment:
       that is definitely a good question. For other external shuffle services 
is this enough or should it be behind another interface that different ones 
could specify different arguments.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+
+/**
+ * The MergedShuffleFileManager is used to process push based shuffle when 
enabled. It works
+ * along side {@link ExternalBlockHandler} and serves as an RPCHandler for
+ * {@link org.apache.spark.network.server.RpcHandler#receiveStream}, where it 
processes the
+ * remotely pushed streams of shuffle blocks to merge them into merged shuffle 
files. Right
+ * now, push based shuffle can only be enabled when external shuffle service 
in YARN mode

Review comment:
       why is this only yarn?  what if I wrote one using external shuffle on 
standalone for instance, is there anything preventing me? Would be nice add to 
the comment as to why

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+
+/**
+ * The MergedShuffleFileManager is used to process push based shuffle when 
enabled. It works
+ * along side {@link ExternalBlockHandler} and serves as an RPCHandler for
+ * {@link org.apache.spark.network.server.RpcHandler#receiveStream}, where it 
processes the
+ * remotely pushed streams of shuffle blocks to merge them into merged shuffle 
files. Right
+ * now, push based shuffle can only be enabled when external shuffle service 
in YARN mode
+ * is used.
+ */
+public interface MergedShuffleFileManager {
+  /**
+   * Provides the stream callback used to process a remotely pushed block. The 
callback is
+   * used by the {@link org.apache.spark.network.client.StreamInterceptor} 
installed on the
+   * channel to process the block data in the channel outside of the message 
frame.
+   *
+   * @param msg metadata of the remotely pushed blocks. This is processed 
inside the message frame
+   * @return A stream callback to process the block data in streaming fashion 
as it arrives
+   */
+  StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg);
+
+  /**
+   * Handles the request to finalize shuffle merge for a given shuffle.
+   *
+   * @param msg contains appId and shuffleId to uniquely identify a shuffle to 
be finalized
+   * @return The statuses of the merged shuffle partitions for the given 
shuffle on this
+   *         shuffle service
+   * @throws IOException
+   */
+  MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException;
+
+  /**
+   * Registers an application when it starts. It also stores the username 
which is necessary
+   * for generating the host local directories for merged shuffle files.
+   * Right now, this is invoked by YarnShuffleService.

Review comment:
       I don't see any changes to YarnShuffleService is that coming later?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to