Ngone51 commented on a change in pull request #33340:
URL: https://github.com/apache/spark/pull/33340#discussion_r670932154



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -138,23 +140,29 @@ public void pushBlocks(
     }
     logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, 
port);
     try {
-      RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
+      RetryingBlockTransferor.BlockTransferStarter blockPushStarter =
           (inputBlockId, inputListener) -> {
-            TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockPusher(client, appId, inputBlockId, 
inputListener, buffersWithId)
-              .start();
+            if (clientFactory != null) {
+              assert inputListener instanceof BlockPushingListener :
+                "Expecting a BlockPushingListener, but got a 
BlockFetchingListener";

Review comment:
       ditto

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
##########
@@ -34,44 +34,48 @@
 import org.apache.spark.network.util.TransportConf;
 
 /**
- * Wraps another BlockFetcher with the ability to automatically retry fetches 
which fail due to
- * IOExceptions, which we hope are due to transient network conditions.
+ * Wraps another BlockFetcher or BlockPusher with the ability to automatically 
retry fetches or
+ * pushes which fail due to IOExceptions, which we hope are due to transient 
network conditions.
  *
- * This fetcher provides stronger guarantees regarding the parent 
BlockFetchingListener. In
+ * This transferor provides stronger guarantees regarding the parent 
BlockTransferListener. In
  * particular, the listener will be invoked exactly once per blockId, with a 
success or failure.
  */
-public class RetryingBlockFetcher {
+public class RetryingBlockTransferor {

Review comment:
       err...I feel `RetryingBlockTransfer` is enough. `Transfer` can also be a 
noun, right?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -95,13 +95,15 @@ public void fetchBlocks(
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
     try {
       int maxRetries = conf.maxIORetries();
-      RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
+      RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
           (inputBlockId, inputListener) -> {
             // Unless this client is closed.
             if (clientFactory != null) {
+              assert inputListener instanceof BlockFetchingListener :
+                "Expecting a BlockFetchingListener, but got a 
BlockPushingListener";;

Review comment:
       maybe: `... but got listener inputListener`
   
   (it's always good to show the unexpected value)

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
##########
@@ -34,44 +34,48 @@
 import org.apache.spark.network.util.TransportConf;
 
 /**
- * Wraps another BlockFetcher with the ability to automatically retry fetches 
which fail due to
- * IOExceptions, which we hope are due to transient network conditions.
+ * Wraps another BlockFetcher or BlockPusher with the ability to automatically 
retry fetches or
+ * pushes which fail due to IOExceptions, which we hope are due to transient 
network conditions.
  *
- * This fetcher provides stronger guarantees regarding the parent 
BlockFetchingListener. In
+ * This transferor provides stronger guarantees regarding the parent 
BlockTransferListener. In
  * particular, the listener will be invoked exactly once per blockId, with a 
success or failure.
  */
-public class RetryingBlockFetcher {
+public class RetryingBlockTransferor {
 
   /**
-   * Used to initiate the first fetch for all blocks, and subsequently for 
retrying the fetch on any
-   * remaining blocks.
+   * Used to initiate the first fetch or push for all blocks, and subsequently 
for retrying the

Review comment:
       Since this class has been renamed with the general suffix - 
`Transferor`, I think we can just say `... fist transfer for all blocks ...` 
here but hide the specific transfer type. 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
##########
@@ -34,44 +34,48 @@
 import org.apache.spark.network.util.TransportConf;
 
 /**
- * Wraps another BlockFetcher with the ability to automatically retry fetches 
which fail due to
- * IOExceptions, which we hope are due to transient network conditions.
+ * Wraps another BlockFetcher or BlockPusher with the ability to automatically 
retry fetches or
+ * pushes which fail due to IOExceptions, which we hope are due to transient 
network conditions.
  *
- * This fetcher provides stronger guarantees regarding the parent 
BlockFetchingListener. In
+ * This transferor provides stronger guarantees regarding the parent 
BlockTransferListener. In
  * particular, the listener will be invoked exactly once per blockId, with a 
success or failure.
  */
-public class RetryingBlockFetcher {
+public class RetryingBlockTransferor {
 
   /**
-   * Used to initiate the first fetch for all blocks, and subsequently for 
retrying the fetch on any
-   * remaining blocks.
+   * Used to initiate the first fetch or push for all blocks, and subsequently 
for retrying the
+   * fetch or push on any remaining blocks.
    */
-  public interface BlockFetchStarter {
+  public interface BlockTransferStarter {
     /**
-     * Creates a new BlockFetcher to fetch the given block ids which may do 
some synchronous
-     * bootstrapping followed by fully asynchronous block fetching.
-     * The BlockFetcher must eventually invoke the Listener on every input 
blockId, or else this
-     * method must throw an exception.
+     * Creates a new BlockFetcher or BlockPusher to fetch or push the given 
block ids which may do
+     * some synchronous bootstrapping followed by fully asynchronous block 
transferring.
+     * The BlockFetcher or BlockPusher must eventually invoke the Listener on 
every input blockId,
+     * or else this method must throw an exception.
      *
      * This method should always attempt to get a new TransportClient from the
      * {@link org.apache.spark.network.client.TransportClientFactory} in order 
to fix connection
      * issues.
      */
-    void createAndStart(String[] blockIds, BlockFetchingListener listener)
+    void createAndStart(String[] blockIds, BlockTransferListener listener)
          throws IOException, InterruptedException;
   }
 
   /** Shared executor service used for waiting and retrying. */
   private static final ExecutorService executorService = 
Executors.newCachedThreadPool(
-    NettyUtils.createThreadFactory("Block Fetch Retry"));
+    NettyUtils.createThreadFactory("Block Transfer Retry"));

Review comment:
       nit: prefix with the `transferTerm`?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
##########
@@ -86,104 +90,114 @@ void createAndStart(String[] blockIds, 
BlockFetchingListener listener)
   private int retryCount = 0;
 
   /**
-   * Set of all block ids which have not been fetched successfully or with a 
non-IO Exception.
+   * Set of all block ids which have not been transferred successfully or with 
a non-IO Exception.
    * A retry involves requesting every outstanding block. Note that since this 
is a LinkedHashSet,
    * input ordering is preserved, so we always request blocks in the same 
order the user provided.
    */
   private final LinkedHashSet<String> outstandingBlocksIds;
 
   /**
-   * The BlockFetchingListener that is active with our current BlockFetcher.
+   * The BlockTransferListener that is active with our current BlockFetcher.
    * When we start a retry, we immediately replace this with a new Listener, 
which causes all any
    * old Listeners to ignore all further responses.
    */
-  private RetryingBlockFetchListener currentListener;
+  private RetryingBlockTransferListener currentListener;
 
   private final ErrorHandler errorHandler;
 
-  public RetryingBlockFetcher(
+  /**
+   * Term indicating whether this RetryingBlockTransferor is for block fetch 
or push. Useful for
+   * printing more meaningful logs.
+   */
+  private final String transferTerm;

Review comment:
       How about `transferType`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to