otterc commented on a change in pull request #33340:
URL: https://github.com/apache/spark/pull/33340#discussion_r670904793
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
##########
@@ -86,104 +90,114 @@ void createAndStart(String[] blockIds,
BlockFetchingListener listener)
private int retryCount = 0;
/**
- * Set of all block ids which have not been fetched successfully or with a
non-IO Exception.
+ * Set of all block ids which have not been transferred successfully or with
a non-IO Exception.
* A retry involves requesting every outstanding block. Note that since this
is a LinkedHashSet,
* input ordering is preserved, so we always request blocks in the same
order the user provided.
*/
private final LinkedHashSet<String> outstandingBlocksIds;
/**
- * The BlockFetchingListener that is active with our current BlockFetcher.
+ * The BlockTransferListener that is active with our current BlockFetcher.
* When we start a retry, we immediately replace this with a new Listener,
which causes all any
* old Listeners to ignore all further responses.
*/
- private RetryingBlockFetchListener currentListener;
+ private RetryingBlockTransferListener currentListener;
private final ErrorHandler errorHandler;
- public RetryingBlockFetcher(
+ /**
+ * Term indicating whether this RetryingBlockTransferor is for block fetch
or push. Useful for
+ * printing more meaningful logs.
+ */
+ private final String transferTerm;
+
+ public RetryingBlockTransferor(
TransportConf conf,
- RetryingBlockFetcher.BlockFetchStarter fetchStarter,
+ BlockTransferStarter transferStarter,
String[] blockIds,
- BlockFetchingListener listener,
+ BlockTransferListener listener,
ErrorHandler errorHandler) {
- this.fetchStarter = fetchStarter;
+ this.transferStarter = transferStarter;
this.listener = listener;
+ this.transferTerm = (listener instanceof BlockFetchingListener) ?
Review comment:
Right now this could be either be "fetch" or "push".
I think we have introduced the `BlockTransferListener` which could now also
be extended for some future use-case.
So I am wondering whether this `transferTerm` should be passed in the
constructor instead of assuming that it could be just push or fetch.
Another option would be to add a method to `BlockTransferListener`, let's
say, `getLogPrefix` which by default can return `transfer`, but
`BlockFetchingListener` can override it to `fetch` and `BlockPushListener` can
override it to `push`.
Also, at other places in the comments, we are just assuming push or fetch.
Maybe we just say "transfer" in the comments.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+public interface BlockPushingListener extends BlockTransferListener {
Review comment:
Nit: Would prefer `BlockPushListener`. I think right now it is called
`BlockPushing...` because the other one is called `BlockFetching...`?
However, `BlockPushListener` sounds better and consistent with
`BlockTransferListener`.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -95,13 +95,15 @@ public void fetchBlocks(
logger.debug("External shuffle fetch from {}:{} (executor id {})", host,
port, execId);
try {
int maxRetries = conf.maxIORetries();
- RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
+ RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
+ assert inputListener instanceof BlockFetchingListener :
+ "Expecting a BlockFetchingListener, but got a
BlockPushingListener";;
Review comment:
nit: `... but got a different listener`. If in future another type of
listener is added then we will not need to change this error message here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]