Victsm commented on a change in pull request #30062:
URL: https://github.com/apache/spark/pull/30062#discussion_r516167838



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -23,24 +23,31 @@
 import org.apache.spark.network.protocol.Encoders;
 
 // Needed by ScalaDoc. See SPARK-7726
-import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
 
 
 /**
  * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.
  * The remote shuffle service will also include this message when responding 
the push requests.
  */
 public class PushBlockStream extends BlockTransferMessage {
+  public static final String SHUFFLE_PUSH_BLOCK_PREFIX = "shufflePush";
   public final String appId;
-  public final String blockId;
+  public final int shuffleId;
+  public final int mapIndex;
+  public final int reduceId;
   // Similar to the chunkIndex in StreamChunkId, indicating the index of a 
block in a batch of
   // blocks to be pushed.
   public final int index;
+  public final String streamId;

Review comment:
       Where's this streamId used?
   ShuffleId is used later as streamId in toString and decode.
   That seems a bit confusing.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
##########
@@ -363,4 +363,38 @@ public boolean useOldFetchProtocol() {
     return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
   }
 
+  /**
+   * Class name of the implementation of MergedShuffleFileManager that merges 
the blocks
+   * pushed to it when push-based shuffle is enabled. Default implementation 
for merging the blocks
+   * remotely is 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
+   * To turn off push-based shuffle at a cluster level, set the configuration 
to
+   * 
'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
+   */
+  public String mergeShuffleFileManagerImpl() {
+    return conf.get("spark.shuffle.push.based.mergedShuffleFileManagerImpl",
+      "org.apache.spark.network.shuffle.RemoteBlockPushResolver");

Review comment:
       Should the default be NoOpMergedShuffleFileManager instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to