venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r677797596



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -135,51 +150,87 @@ protected AppShuffleInfo 
validateAndGetAppShuffleInfo(String appId) {
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
+      int shuffleMergeId,
       int reduceId) {
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
-    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+    ConcurrentMap<Integer, Map<Integer, Map<Integer, 
AppShufflePartitionInfo>>> partitions =
       appShuffleInfo.partitions;
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.compute(shuffleId, (id, map) -> {
-        if (map == null) {
-          // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId but the data file would exist. In that case the block 
is considered late.
-          if (dataFile.exists()) {
+    Map<Integer, Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitions 
=
+      partitions.compute(shuffleId, (id, shuffleMergePartitionsMap) -> {
+        if (shuffleMergePartitionsMap == null) {
+          logger.info("Creating a new attempt for shuffle blocks push request 
for"
+              + " shuffle {} with shuffleMergeId {} for application {}_{}", 
shuffleId,
+              shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId));
+          Map<Integer, Map<Integer, AppShufflePartitionInfo>> 
newShuffleMergePartitions
+            = new ConcurrentHashMap<>();
+          Map<Integer, AppShufflePartitionInfo> newPartitionsMap = new 
ConcurrentHashMap<>();
+          newShuffleMergePartitions.put(shuffleMergeId, newPartitionsMap);
+          return newShuffleMergePartitions;
+        } else if (shuffleMergePartitionsMap.containsKey(shuffleMergeId)) {
+          return shuffleMergePartitionsMap;
+        } else {
+          int latestShuffleMergeId = 
shuffleMergePartitionsMap.keySet().stream()
+            .mapToInt(v -> v).max().orElse(UNDEFINED_SHUFFLE_MERGE_ID);
+          if (latestShuffleMergeId > shuffleMergeId) {
+            logger.info("Rejecting shuffle blocks push request for shuffle {} 
with"
+                + " shuffleMergeId {} for application {}_{} as a higher 
shuffleMergeId"
+                + " {} request is already seen", shuffleId, shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));
+            // Reject the request as we have already seen a higher 
shuffleMergeId than the
+            // current incoming one
             return null;

Review comment:
       Currently throwing a `RuntimeException`, does it make sense to create a 
custom one `StaleBlockPushException` or is it a overkill?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to