venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r676933588
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -78,6 +80,19 @@
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+ private static final int UNDEFINED_SHUFFLE_MERGE_ID = Integer.MIN_VALUE;
+
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
+ // Marker to identify stale shuffle partitions typically happens in the case
of
+ // indeterminate stage retries.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
STALE_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
Review comment:
Since we are checking for `STALE_SHUFFLE_PARTITIONS` and
`FINALIZED_SHUFFLE_PARTITIONS` separately to respond accordingly if we make
both of them `Collections.emptyMap` then that won't work right?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -78,6 +80,19 @@
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+ private static final int UNDEFINED_SHUFFLE_MERGE_ID = Integer.MIN_VALUE;
+
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
+ // Marker to identify stale shuffle partitions typically happens in the case
of
+ // indeterminate stage retries.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
STALE_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
+
+ // Marker for finalized shuffle partitions, used to identify late blocks
getting merged.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
FINALIZED_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
Review comment:
Same as above
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -135,51 +150,87 @@ protected AppShuffleInfo
validateAndGetAppShuffleInfo(String appId) {
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleInfo appShuffleInfo,
int shuffleId,
+ int shuffleMergeId,
int reduceId) {
- File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId,
reduceId);
- ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+ ConcurrentMap<Integer, Map<Integer, Map<Integer,
AppShufflePartitionInfo>>> partitions =
appShuffleInfo.partitions;
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.compute(shuffleId, (id, map) -> {
- if (map == null) {
- // If this partition is already finalized then the partitions map
will not contain the
- // shuffleId but the data file would exist. In that case the block
is considered late.
- if (dataFile.exists()) {
+ Map<Integer, Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitions
=
+ partitions.compute(shuffleId, (id, shuffleMergePartitionsMap) -> {
+ if (shuffleMergePartitionsMap == null) {
+ logger.info("Creating a new attempt for shuffle blocks push request
for"
+ + " shuffle {} with shuffleMergeId {} for application {}_{}",
shuffleId,
+ shuffleMergeId, appShuffleInfo.appId, appShuffleInfo.attemptId));
Review comment:
Fixed it. Sorry about that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]