Ngone51 commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r502917557
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -116,6 +121,42 @@ public void fetchBlocks(
}
}
+ @Override
+ public void pushBlocks(
+ String host,
+ int port,
+ String[] blockIds,
+ ManagedBuffer[] buffers,
+ BlockFetchingListener listener) {
+ checkInit();
+ assert blockIds.length == buffers.length : "Number of block ids and
buffers do not match.";
+
+ Map<String, ManagedBuffer> buffersWithId = new HashMap<>();
+ for (int i = 0; i < blockIds.length; i++) {
+ buffersWithId.put(blockIds[i], buffers[i]);
Review comment:
Is `buffersWithId` necessary since `blockIds` and `buffers` sharing the
same index? Inside `OneForOneBlockPusher` we can also identify them by the
index.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+
+ boolean shouldRetryError(Throwable t);
+
+ default boolean shouldLogError(Throwable t) {
+ return true;
+ }
+
+ /**
+ * A no-op error handler instance.
+ */
+ ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+ /**
+ * The error handler for pushing shuffle blocks to remote shuffle services.
+ */
+ class BlockPushErrorHandler implements ErrorHandler {
+ /**
+ * String constant used for generating exception messages indicating a
block to be merged
+ * arrives too late on the server side, and also for later checking such
exceptions on the
+ * client side. When we get a block push failure because of the block
arrives too late, we
+ * will not retry pushing the block nor log the exception on the client
side.
+ */
+ public static final String TOO_LATE_MESSAGE_SUFFIX =
+ "received after merged shuffle is finalized";
+
+ /**
+ * String constant used for generating exception messages indicating the
server couldn't
+ * append a block after all available attempts due to collision with other
blocks belonging
+ * to the same shuffle partition, and also for later checking such
exceptions on the client
+ * side. When we get a block push failure because of the block couldn't be
written due to
+ * this reason, we will not log the exception on the client side.
+ */
+ public static final String COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX =
+ "Couldn't find an opportunity to write block";
+
+
+ @Override
+ public boolean shouldRetryError(Throwable t) {
+ // If it is a connection time out or a connection closed exception, no
need to retry.
+ if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+ return false;
+ }
+ // If the block is too late, there is no need to retry it
+ return (t.getMessage() == null ||
+ !t.getMessage().contains(TOO_LATE_MESSAGE_SUFFIX)) &&
+ (t.getCause() == null || t.getCause().getMessage() == null ||
+ !t.getCause().getMessage().contains(TOO_LATE_MESSAGE_SUFFIX));
+ }
+
+ @Override
+ public boolean shouldLogError(Throwable t) {
+ return (t.getMessage() == null ||
+ (!t.getMessage().contains(COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) &&
+ !t.getMessage().contains(TOO_LATE_MESSAGE_SUFFIX))) &&
+ (t.getCause() == null || t.getCause().getMessage() == null ||
+ (!t.getCause().getMessage()
+ .contains(COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) &&
+ !t.getCause().getMessage()
+ .contains(TOO_LATE_MESSAGE_SUFFIX)));
Review comment:
Shall we use `Utils.exceptionString()` to simplify the code? (same for
shouldRetryError). We can do like:
```scala
val errorMsg = Utils.exceptionString(t)
val noOpportunity = errorMsg.contains(COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX)
val tooLate = errorMsg.contains(TOO_LATE_MESSAGE_SUFFIX)
return !noOpportunity && !tooLate
```
BTW, IIUC, the second `&&` can be replaced with `||`?
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+
+public class OneForOneBlockPusherSuite {
Review comment:
Could you also add a test for the case of
`COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX`?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push
based shuffle.
+ * The remote shuffle service will also include this message when responding
the push requests.
+ */
+public class PushBlockStream extends BlockTransferMessage {
+ public final String appId;
+ public final String blockId;
+ // Similar to the chunkIndex in StreamChunkId, indicating the index of a
block in a batch of
+ // blocks to be pushed.
+ public final int index;
+
+ public PushBlockStream(String appId, String blockId, int index) {
+ this.appId = appId;
+ this.blockId = blockId;
+ this.index = index;
+ }
+
+ @Override
+ protected Type type() {
+ return Type.PUSH_BLOCK_STREAM;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, blockId, index);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("blockId", blockId)
+ .add("index", index)
+ .toString();
Review comment:
nit: 2 indents. Please check other places.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -116,6 +121,42 @@ public void fetchBlocks(
}
}
+ @Override
+ public void pushBlocks(
+ String host,
+ int port,
+ String[] blockIds,
+ ManagedBuffer[] buffers,
+ BlockFetchingListener listener) {
+ checkInit();
+ assert blockIds.length == buffers.length : "Number of block ids and
buffers do not match.";
+
+ Map<String, ManagedBuffer> buffersWithId = new HashMap<>();
+ for (int i = 0; i < blockIds.length; i++) {
+ buffersWithId.put(blockIds[i], buffers[i]);
+ }
+ logger.debug("Push shuffle blocks to {}:{} with {} blocks", host, port,
blockIds.length);
Review comment:
How about?
```suggestion
logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length), host,
port) ;
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -135,4 +136,23 @@ public void onFailure(Throwable t) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}
+
+ /**
+ * Push a sequence of shuffle blocks to a remote node asynchronously to be
merged into
+ * a larger shuffle block.
Review comment:
To be clear, the larger shuffler block is combined by part of this
sequence of shuffle blocks and blocks from other nodes rather than this
sequence blocks only, right? If so, could you reword the comment to make it
more accurate?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+ private static final Logger logger =
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+ private static final ErrorHandler PUSH_ERROR_HANDLER = new
ErrorHandler.BlockPushErrorHandler();
+
+ private final TransportClient client;
+ private final String appId;
+ private final String[] blockIds;
+ private final BlockFetchingListener listener;
+ private final Map<String, ManagedBuffer> buffers;
+
+ public OneForOneBlockPusher(
+ TransportClient client,
+ String appId,
+ String[] blockIds,
+ BlockFetchingListener listener,
+ Map<String, ManagedBuffer> buffers) {
+ this.client = client;
+ this.appId = appId;
+ this.blockIds = blockIds;
+ this.listener = listener;
+ this.buffers = buffers;
+ }
+
+ private class BlockPushCallback implements RpcResponseCallback {
+
+ private int index;
+ private String blockId;
+
+ BlockPushCallback(int index, String blockId) {
+ this.index = index;
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ // On receipt of a successful block push
+ listener.onBlockFetchSuccess(blockId, new
NioManagedBuffer(ByteBuffer.allocate(0)));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ // Since block push is best effort, i.e., if we encountered a block push
failure that's not
+ // retriable or exceeding the max retires, we should not fail all
remaining block pushes.
+ // The best effort nature makes block push tolerable of a partial
completion. Thus, we only
+ // fail the block that's actually failed. Not that, on the
RetryingBlockFetcher side, once
+ // retry is initiated, it would still invalidate the previous active
retry listener, and
+ // retry all outstanding blocks. We are preventing forwarding
unnecessary block push failures
+ // to the parent listener of the retry listener. The only exceptions
would be if the block
+ // push failure is due to block arriving on the server side after merge
finalization, or the
+ // client fails to establish connection to the server side. In both
cases, we would fail all
+ // remaining blocks.
+ if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
+ String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index +
1);
+ failRemainingBlocks(targetBlockId, e);
+ } else {
+ String[] targetBlockId = Arrays.copyOfRange(blockIds, index,
blockIds.length);
+ failRemainingBlocks(targetBlockId, e);
+ }
+ }
+ }
+
+ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
+ for (String blockId : failedBlockIds) {
+ try {
+ listener.onBlockFetchFailure(blockId, e);
+ } catch (Exception e2) {
+ logger.error("Error in block push failure callback", e2);
+ }
+ }
+ }
+
+ /**
+ * Begins the block pushing process, calling the listener with every block
pushed.
+ */
+ public void start() {
+ logger.debug("Start pushing {} blocks", blockIds.length);
+ for (int i = 0; i < blockIds.length; i++) {
+ if (!buffers.containsKey(blockIds[i])) {
Review comment:
Does this really happen?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+
+ boolean shouldRetryError(Throwable t);
+
+ default boolean shouldLogError(Throwable t) {
+ return true;
+ }
+
+ /**
+ * A no-op error handler instance.
+ */
+ ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+ /**
+ * The error handler for pushing shuffle blocks to remote shuffle services.
+ */
+ class BlockPushErrorHandler implements ErrorHandler {
+ /**
+ * String constant used for generating exception messages indicating a
block to be merged
+ * arrives too late on the server side, and also for later checking such
exceptions on the
+ * client side. When we get a block push failure because of the block
arrives too late, we
+ * will not retry pushing the block nor log the exception on the client
side.
+ */
+ public static final String TOO_LATE_MESSAGE_SUFFIX =
+ "received after merged shuffle is finalized";
+
+ /**
+ * String constant used for generating exception messages indicating the
server couldn't
+ * append a block after all available attempts due to collision with other
blocks belonging
Review comment:
I can see how we detect the collision on the same server for multiple
attempts. But how do we prevent it when it happens on different servers?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+
+ boolean shouldRetryError(Throwable t);
+
+ default boolean shouldLogError(Throwable t) {
+ return true;
+ }
+
+ /**
+ * A no-op error handler instance.
+ */
+ ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+ /**
+ * The error handler for pushing shuffle blocks to remote shuffle services.
+ */
+ class BlockPushErrorHandler implements ErrorHandler {
+ /**
+ * String constant used for generating exception messages indicating a
block to be merged
+ * arrives too late on the server side, and also for later checking such
exceptions on the
+ * client side. When we get a block push failure because of the block
arrives too late, we
+ * will not retry pushing the block nor log the exception on the client
side.
+ */
+ public static final String TOO_LATE_MESSAGE_SUFFIX =
+ "received after merged shuffle is finalized";
+
+ /**
+ * String constant used for generating exception messages indicating the
server couldn't
+ * append a block after all available attempts due to collision with other
blocks belonging
+ * to the same shuffle partition, and also for later checking such
exceptions on the client
+ * side. When we get a block push failure because of the block couldn't be
written due to
+ * this reason, we will not log the exception on the client side.
+ */
+ public static final String COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX =
+ "Couldn't find an opportunity to write block";
+
+
Review comment:
nit: redundant empty line.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -156,6 +193,20 @@ protected void handleMessage(
Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId,
msg.execIds);
callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
+ } else if (msgObj instanceof FinalizeShuffleMerge) {
+ final Timer.Context responseDelayContext =
+ metrics.finalizeShuffleMergeLatencyMillis.time();
+ FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
+ try {
+ checkAuth(client, msg.appId);
+ MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
+ callback.onSuccess(statuses.toByteBuffer());
+ } catch(IOException e) {
+ throw new RuntimeException(String.format("Error while finalizing
shuffle merge "
+ + "for application %s shuffle %d", msg.appId, msg.shuffleId));
Review comment:
nit: 2 indentations.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
##########
@@ -215,8 +227,15 @@ public void onBlockFetchFailure(String blockId, Throwable
exception) {
if (shouldRetry(exception)) {
initiateRetry();
} else {
- logger.error(String.format("Failed to fetch block %s, and will not
retry (%s retries)",
- blockId, retryCount), exception);
+ if (errorHandler.shouldLogError(exception)) {
+ logger.error(
+ String.format("Failed to fetch block %s, and will not retry
(%s retries)",
+ blockId, retryCount), exception);
+ } else {
+ logger.debug(
+ String.format("Failed to fetch block %s, and will not retry
(%s retries)",
+ blockId, retryCount), exception);
Review comment:
nit: 2 indents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
##########
@@ -215,8 +227,15 @@ public void onBlockFetchFailure(String blockId, Throwable
exception) {
if (shouldRetry(exception)) {
initiateRetry();
} else {
- logger.error(String.format("Failed to fetch block %s, and will not
retry (%s retries)",
- blockId, retryCount), exception);
+ if (errorHandler.shouldLogError(exception)) {
+ logger.error(
+ String.format("Failed to fetch block %s, and will not retry
(%s retries)",
+ blockId, retryCount), exception);
Review comment:
nit: 2 indentations. (same for `String.format...`)
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+ private static final Logger logger =
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+ private static final ErrorHandler PUSH_ERROR_HANDLER = new
ErrorHandler.BlockPushErrorHandler();
+
+ private final TransportClient client;
+ private final String appId;
+ private final String[] blockIds;
+ private final BlockFetchingListener listener;
+ private final Map<String, ManagedBuffer> buffers;
+
+ public OneForOneBlockPusher(
+ TransportClient client,
+ String appId,
+ String[] blockIds,
+ BlockFetchingListener listener,
+ Map<String, ManagedBuffer> buffers) {
+ this.client = client;
+ this.appId = appId;
+ this.blockIds = blockIds;
+ this.listener = listener;
+ this.buffers = buffers;
+ }
+
+ private class BlockPushCallback implements RpcResponseCallback {
+
+ private int index;
+ private String blockId;
+
+ BlockPushCallback(int index, String blockId) {
+ this.index = index;
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ // On receipt of a successful block push
+ listener.onBlockFetchSuccess(blockId, new
NioManagedBuffer(ByteBuffer.allocate(0)));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ // Since block push is best effort, i.e., if we encountered a block push
failure that's not
+ // retriable or exceeding the max retires, we should not fail all
remaining block pushes.
+ // The best effort nature makes block push tolerable of a partial
completion. Thus, we only
+ // fail the block that's actually failed. Not that, on the
RetryingBlockFetcher side, once
+ // retry is initiated, it would still invalidate the previous active
retry listener, and
+ // retry all outstanding blocks. We are preventing forwarding
unnecessary block push failures
+ // to the parent listener of the retry listener. The only exceptions
would be if the block
+ // push failure is due to block arriving on the server side after merge
finalization, or the
+ // client fails to establish connection to the server side. In both
cases, we would fail all
+ // remaining blocks.
+ if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
+ String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index +
1);
+ failRemainingBlocks(targetBlockId, e);
+ } else {
+ String[] targetBlockId = Arrays.copyOfRange(blockIds, index,
blockIds.length);
Review comment:
Every block has a separate `BlockPushCallback`. So what's the motivation
to fail the following blocks in batch? (Note I'm not aware of the detail of
listerner in `failRemainingBlocks`.)
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Result returned by an ExternalShuffleService to the DAGScheduler. This
represents the result
+ * of all the remote shuffle block merge operations performed by an
ExternalShuffleService
+ * for a given shuffle ID. It includes the shuffle ID, an array of bitmaps
each representing
+ * the set of mapper partition blocks that are merged for a given reducer
partition, an array
+ * of reducer IDs, and an array of merged shuffle partition sizes. The 3
arrays list information
+ * about all the reducer partitions merged by the ExternalShuffleService in
the same order.
+ */
+public class MergeStatuses extends BlockTransferMessage {
+ /** Shuffle ID **/
+ public final int shuffleId;
+ /**
+ * Array of bitmaps tracking the set of mapper partition blocks merged for
each
+ * reducer partition
+ */
+ public final RoaringBitmap[] bitmaps;
+ /** Array of reducer IDs **/
+ public final int[] reduceIds;
+ /**
+ * Array of merged shuffle partition block size. Each represents the total
size of all
+ * merged shuffle partition blocks for one reducer partition.
+ * **/
+ public final long[] sizes;
+
+ public MergeStatuses(
+ int shuffleId,
+ RoaringBitmap[] bitmaps,
+ int[] reduceIds,
+ long[] sizes) {
+ this.shuffleId = shuffleId;
+ this.bitmaps = bitmaps;
+ this.reduceIds = reduceIds;
+ this.sizes = sizes;
+ }
+
+ @Override
+ protected Type type() {
+ return Type.MERGE_STATUSES;
+ }
+
+ @Override
+ public int hashCode() {
+ int objectHashCode = Objects.hashCode(shuffleId);
+ return (objectHashCode * 41 + Arrays.hashCode(reduceIds) * 41
+ + Arrays.hashCode(bitmaps) * 41 + Arrays.hashCode(sizes));
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("shuffleId", shuffleId)
+ .add("reduceId size", reduceIds.length)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof MergeStatuses) {
+ MergeStatuses o = (MergeStatuses) other;
+ return Objects.equal(shuffleId, o.shuffleId)
+ && Arrays.equals(bitmaps, o.bitmaps)
+ && Arrays.equals(reduceIds, o.reduceIds)
+ && Arrays.equals(sizes, o.sizes);
Review comment:
nit: 2 indents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Result returned by an ExternalShuffleService to the DAGScheduler. This
represents the result
+ * of all the remote shuffle block merge operations performed by an
ExternalShuffleService
+ * for a given shuffle ID. It includes the shuffle ID, an array of bitmaps
each representing
+ * the set of mapper partition blocks that are merged for a given reducer
partition, an array
+ * of reducer IDs, and an array of merged shuffle partition sizes. The 3
arrays list information
+ * about all the reducer partitions merged by the ExternalShuffleService in
the same order.
+ */
+public class MergeStatuses extends BlockTransferMessage {
+ /** Shuffle ID **/
+ public final int shuffleId;
+ /**
+ * Array of bitmaps tracking the set of mapper partition blocks merged for
each
+ * reducer partition
+ */
+ public final RoaringBitmap[] bitmaps;
+ /** Array of reducer IDs **/
+ public final int[] reduceIds;
+ /**
+ * Array of merged shuffle partition block size. Each represents the total
size of all
+ * merged shuffle partition blocks for one reducer partition.
+ * **/
+ public final long[] sizes;
+
+ public MergeStatuses(
+ int shuffleId,
+ RoaringBitmap[] bitmaps,
+ int[] reduceIds,
+ long[] sizes) {
+ this.shuffleId = shuffleId;
+ this.bitmaps = bitmaps;
+ this.reduceIds = reduceIds;
+ this.sizes = sizes;
+ }
+
+ @Override
+ protected Type type() {
+ return Type.MERGE_STATUSES;
+ }
+
+ @Override
+ public int hashCode() {
+ int objectHashCode = Objects.hashCode(shuffleId);
+ return (objectHashCode * 41 + Arrays.hashCode(reduceIds) * 41
+ + Arrays.hashCode(bitmaps) * 41 + Arrays.hashCode(sizes));
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("shuffleId", shuffleId)
+ .add("reduceId size", reduceIds.length)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof MergeStatuses) {
+ MergeStatuses o = (MergeStatuses) other;
+ return Objects.equal(shuffleId, o.shuffleId)
+ && Arrays.equals(bitmaps, o.bitmaps)
+ && Arrays.equals(reduceIds, o.reduceIds)
+ && Arrays.equals(sizes, o.sizes);
+ }
+ return false;
+ }
+
+ @Override
+ public int encodedLength() {
+ return 4 // int
+ + Encoders.BitmapArrays.encodedLength(bitmaps)
+ + Encoders.IntArrays.encodedLength(reduceIds)
+ + Encoders.LongArrays.encodedLength(sizes);
Review comment:
nit: 2 indents
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test suite for {@link ErrorHandler}
+ */
+public class ErrorHandlerSuite {
+
+ @Test
+ public void testPushErrorRetry() {
+ ErrorHandler.BlockPushErrorHandler handler = new
ErrorHandler.BlockPushErrorHandler();
+ assertFalse(handler.shouldRetryError(new RuntimeException(new
IllegalArgumentException(
+ ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX))));
Review comment:
nit: 2 indents. Could you check other places?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]