mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r677980653
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -125,63 +127,88 @@ private AbstractFetchShuffleBlocks
createFetchShuffleBlocksOrChunksMsg(
String execId,
String[] blockIds) {
if (blockIds[0].startsWith(SHUFFLE_CHUNK_PREFIX)) {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
true);
+ return createFetchShuffleChunksMsg(appId, execId, blockIds);
} else {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
false);
+ return createFetchShuffleBlocksMsg(appId, execId, blockIds);
}
}
- /**
- * Create FetchShuffleBlocks/FetchShuffleBlockChunks message and rebuild
internal blockIds by
- * analyzing the passed in blockIds.
- */
- private AbstractFetchShuffleBlocks createFetchShuffleMsgAndBuildBlockIds(
+ private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId,
String execId,
- String[] blockIds,
- boolean areMergedChunks) {
+ String[] blockIds) {
String[] firstBlock = splitBlockId(blockIds[0]);
int shuffleId = Integer.parseInt(firstBlock[1]);
- boolean batchFetchEnabled = firstBlock.length == 5;
-
- // In case of FetchShuffleBlocks, primaryId is mapId. For
FetchShuffleBlockChunks, primaryId
- // is reduceId.
- LinkedHashMap<Number, BlocksInfo> primaryIdToBlocksInfo = new
LinkedHashMap<>();
+ boolean batchFetchEnabled = (firstBlock.length == 5 &&
+ firstBlock[0].equals(SHUFFLE_BLOCK_SPLIT));
Review comment:
`firstBlock[0].equals(SHUFFLE_BLOCK_SPLIT)` will always be `true`, right
?
If yes, remove from condition ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
##########
@@ -40,6 +40,11 @@
public class MergeStatuses extends BlockTransferMessage {
/** Shuffle ID **/
public final int shuffleId;
+ /**
+ * shuffleMergeId is used to uniquely identify merging process of an
indeterminate stage
Review comment:
"merging process of an indeterminate stage" -> "merging process of
shuffle by an indeterminate stage"
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +137,86 @@ protected AppShuffleInfo
validateAndGetAppShuffleInfo(String appId) {
}
/**
- * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies
a given shuffle
- * partition of an application, retrieves the associated metadata. If not
present and the
- * corresponding merged shuffle does not exist, initializes the metadata.
+ * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that
uniquely identifies
+ * a given shuffle partition of an application, retrieves the associated
metadata. If not
+ * present and the corresponding merged shuffle does not exist, initializes
the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleInfo appShuffleInfo,
int shuffleId,
- int reduceId) {
- File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId,
reduceId);
- ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
- appShuffleInfo.partitions;
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.compute(shuffleId, (id, map) -> {
- if (map == null) {
+ int shuffleMergeId,
+ int reduceId) throws RuntimeException {
+ ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> partitions =
appShuffleInfo.partitions;
+ AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
+ partitions.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
+ if (appShuffleMergePartitionsInfo == null) {
+ File dataFile =
+ appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
// If this partition is already finalized then the partitions map
will not contain the
// shuffleId but the data file would exist. In that case the block
is considered late.
Review comment:
Please add that this applies only to `DETERMINATE` stages - for
`INDETERMINATE`, we will have the marker, and so
`appShuffleMergePartitionsInfo` will not be `null`
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -79,6 +81,13 @@
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
+ // Marker to identify finalized indeterminate shuffle partitions in the case
of indeterminate
+ // stage retries.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
INDETERMINATE_SHUFFLE_FINALIZED =
+ Collections.unmodifiableMap(new HashMap<>(1));
Review comment:
nit: Given there is only a single marker map now - you can make it
`Collections.emptyMap()`
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -824,12 +943,34 @@ AppShufflePartitionInfo getPartitionInfo() {
}
}
+ public static class AppShuffleMergePartitionsInfo {
+ private final int shuffleMergeId;
+ private Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
+
+ public AppShuffleMergePartitionsInfo(
+ int shuffleMergeId) {
+ this.shuffleMergeId = shuffleMergeId;
+ this.shuffleMergePartitions = new ConcurrentHashMap<>();
+ }
+
+ @VisibleForTesting
+ public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
+ return shuffleMergePartitions;
+ }
+
+ public void markIndeterminateShuffleFinalized() {
+ this.shuffleMergePartitions =
RemoteBlockPushResolver.INDETERMINATE_SHUFFLE_FINALIZED;
+ }
+ }
Review comment:
As mentioned above, let us make this class immutable.
Also, add a short javadoc for the class.
```suggestion
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo>
shuffleMergePartitions;
public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean
shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
this.shuffleMergePartitions = marker ?
RemoteBlockPushResolver.INDETERMINATE_SHUFFLE_FINALIZED : new
ConcurrentHashMap<>();
}
@VisibleForTesting
public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions()
{
return shuffleMergePartitions;
}
}
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -410,17 +493,40 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
+ "with the current attempt id %s stored in shuffle service for
application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- appShuffleInfo.partitions.remove(msg.shuffleId);
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = null;
+ // Determinate stage can remove metadata of the shuffle Id as part of
finalizing shuffle
+ // merge
Review comment:
nit: Also add to comment that only the first successful stage attempt
for `DETERMINATE` stage will be merged/finalized - subsequent attempts for the
shuffle id will not have merge enabled (unlike `INDETERMINATE` stage).
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +137,86 @@ protected AppShuffleInfo
validateAndGetAppShuffleInfo(String appId) {
}
/**
- * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies
a given shuffle
- * partition of an application, retrieves the associated metadata. If not
present and the
- * corresponding merged shuffle does not exist, initializes the metadata.
+ * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that
uniquely identifies
+ * a given shuffle partition of an application, retrieves the associated
metadata. If not
+ * present and the corresponding merged shuffle does not exist, initializes
the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
Review comment:
The new formulation of this method is much more cleaner than before,
thanks for fixing this !
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -188,49 +225,70 @@ private AppShufflePartitionInfo
getOrCreateAppShufflePartitionInfo(
AppShufflePartitionInfo newAppShufflePartitionInfo(
String appId,
int shuffleId,
+ int shuffleMergeId,
int reduceId,
File dataFile,
File indexFile,
File metaFile) throws IOException {
- return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile,
+ return new AppShufflePartitionInfo(appId, shuffleId, shuffleMergeId,
reduceId, dataFile,
new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
}
@Override
- public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int
reduceId) {
+ public MergedBlockMeta getMergedBlockMeta(
+ String appId,
+ int shuffleId,
+ int shuffleMergeId,
+ int reduceId) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ if (appShuffleInfo.partitions.containsKey(shuffleId) &&
+ appShuffleInfo.partitions.get(shuffleId).shuffleMergeId >
shuffleMergeId) {
Review comment:
Pull the value locally and then check against that.
The same comment applies to `getMergedBlockData` as well.
```suggestion
AppShuffleMergePartitionsInfo partitionsInfo =
appShuffleInfo.partitions.get(shuffleId);
if (null != partitionsInfo && partitionsInfo.shuffleMergeId >
shuffleMergeId) {
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -408,26 +409,41 @@ protected Ratio getRatio() {
}
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
Review comment:
Can you please fix this @venkata91 ? This comment was marked resolved,
but I did not see it get addressed.
Please do the same for `shuffleReduceIdAndChunkIds` below as well - add a
comment to describe what the expected format is as a method doc.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -410,17 +493,40 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
+ "with the current attempt id %s stored in shuffle service for
application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- appShuffleInfo.partitions.remove(msg.shuffleId);
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = null;
+ // Determinate stage can remove metadata of the shuffle Id as part of
finalizing shuffle
+ // merge
+ if (msg.shuffleMergeId == 0) {
+ shuffleMergePartitions =
+ appShuffleInfo.partitions.remove(msg.shuffleId).shuffleMergePartitions;
+ } else {
+ // For indeterminate stage we will set shuffleMergePartitions of the
shuffleId to
+ // INDETERMINATE_SHUFFLE_FINALIZED. This is needed so as to reject old
shuffleMergeId
+ // push requests after the shuffle merge is finalized.
+ if (null == appShuffleInfo.partitions.get(msg.shuffleId) ||
+ INDETERMINATE_SHUFFLE_FINALIZED ==
+
appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions) {
+ throw new RuntimeException(
+ String.format("Shuffle merge finalize request for shuffle %s with"
+ + " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId,
+ ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+ } else if (msg.shuffleMergeId ==
+ appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergeId) {
+ shuffleMergePartitions =
+ appShuffleInfo.partitions.get(msg.shuffleId).shuffleMergePartitions;
+
appShuffleInfo.partitions.get(msg.shuffleId).markIndeterminateShuffleFinalized();
+ }
Review comment:
This is not thread safe, you will need to do this atomically.
```suggestion
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
appShuffleInfo.partitions.compute(msg.shuffleId, (id, value) -> {
if (null == value || INDETERMINATE_SHUFFLE_FINALIZED ==
value.shuffleMergePartitions) {
throw new RuntimeException(
String.format("Shuffle merge finalize request for shuffle
%s with"
+ " shuffleMergeId %s is %s",
msg.shuffleId, msg.shuffleMergeId,
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
} else if (msg.shuffleMergeId == value.shuffleMergeId) {
shuffleMergePartitionsRef.set(value);
// replace with marker value.
// Note: take constructor parameter true/false to specify whether
it is finalized or not.
// and make fields of AppShuffleMergePartitionsInfo final - remove
markIndeterminateShuffleFinalized
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
} else {
return value;
}
});
shuffleMergePartitions = shuffleMergePartitionsRef.get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]