venkata91 commented on a change in pull request #33034: URL: https://github.com/apache/spark/pull/33034#discussion_r677797596
########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -135,51 +150,87 @@ protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) { private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( AppShuffleInfo appShuffleInfo, int shuffleId, + int shuffleMergeId, int reduceId) { - File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId); - ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions = + ConcurrentMap<Integer, Map<Integer, Map<Integer, AppShufflePartitionInfo>>> partitions = appShuffleInfo.partitions; - Map<Integer, AppShufflePartitionInfo> shufflePartitions = - partitions.compute(shuffleId, (id, map) -> { - if (map == null) { - // If this partition is already finalized then the partitions map will not contain the - // shuffleId but the data file would exist. In that case the block is considered late. - if (dataFile.exists()) { + Map<Integer, Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitions = + partitions.compute(shuffleId, (id, shuffleMergePartitionsMap) -> { + if (shuffleMergePartitionsMap == null) { + logger.info("Creating a new attempt for shuffle blocks push request for" + + " shuffle {} with shuffleMergeId {} for application {}_{}", shuffleId, + shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId)); + Map<Integer, Map<Integer, AppShufflePartitionInfo>> newShuffleMergePartitions + = new ConcurrentHashMap<>(); + Map<Integer, AppShufflePartitionInfo> newPartitionsMap = new ConcurrentHashMap<>(); + newShuffleMergePartitions.put(shuffleMergeId, newPartitionsMap); + return newShuffleMergePartitions; + } else if (shuffleMergePartitionsMap.containsKey(shuffleMergeId)) { + return shuffleMergePartitionsMap; + } else { + int latestShuffleMergeId = shuffleMergePartitionsMap.keySet().stream() + .mapToInt(v -> v).max().orElse(UNDEFINED_SHUFFLE_MERGE_ID); + if (latestShuffleMergeId > shuffleMergeId) { + logger.info("Rejecting shuffle blocks push request for shuffle {} with" + + " shuffleMergeId {} for application {}_{} as a higher shuffleMergeId" + + " {} request is already seen", shuffleId, shuffleMergeId, + appShuffleInfo.appId, appShuffleInfo.attemptId, latestShuffleMergeId)); + // Reject the request as we have already seen a higher shuffleMergeId than the + // current incoming one return null; Review comment: Currently throwing a `RuntimeException`, does it make sense to create a custom one `StaleBlockPushException` or is it a overkill? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org