Ngone51 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r677061512
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -150,6 +155,22 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
}
}
+ def newShuffleMergeState(): Unit = {
+ _shuffleMergeEnabled = canShuffleMergeBeEnabled()
Review comment:
Isn't this a constant value? Why do we need to reset it every time?
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -124,6 +122,13 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
*/
private[this] var _shuffleMergedFinalized: Boolean = false
+ /**
+ * shuffleMergeId is used to uniquely identify a indeterminate stage attempt
of a shuffle Id.
Review comment:
How about:
```suggestion
* shuffleMergeId is used to uniquely identify a merging process of an
indeterminate stage attempt.
```
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
##########
@@ -206,12 +206,15 @@ public long sendRpc(ByteBuffer message,
RpcResponseCallback callback) {
*
* @param appId applicationId.
* @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
nit: "... identify a merging process of an indeterminate stage attempt."
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -185,6 +188,8 @@ public void finalizeShuffleMerge(
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -125,63 +125,87 @@ private AbstractFetchShuffleBlocks
createFetchShuffleBlocksOrChunksMsg(
String execId,
String[] blockIds) {
if (blockIds[0].startsWith(SHUFFLE_CHUNK_PREFIX)) {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
true);
+ return createFetchShuffleChunksMsg(appId, execId, blockIds);
} else {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
false);
+ return createFetchShuffleBlocksMsg(appId, execId, blockIds);
}
}
- /**
- * Create FetchShuffleBlocks/FetchShuffleBlockChunks message and rebuild
internal blockIds by
- * analyzing the passed in blockIds.
- */
- private AbstractFetchShuffleBlocks createFetchShuffleMsgAndBuildBlockIds(
+ private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId,
String execId,
- String[] blockIds,
- boolean areMergedChunks) {
+ String[] blockIds) {
String[] firstBlock = splitBlockId(blockIds[0]);
int shuffleId = Integer.parseInt(firstBlock[1]);
boolean batchFetchEnabled = firstBlock.length == 5;
-
- // In case of FetchShuffleBlocks, primaryId is mapId. For
FetchShuffleBlockChunks, primaryId
- // is reduceId.
- LinkedHashMap<Number, BlocksInfo> primaryIdToBlocksInfo = new
LinkedHashMap<>();
+ Map<Long, BlocksInfo> mapIdToBlocksInfo = new LinkedHashMap<>();
for (String blockId : blockIds) {
String[] blockIdParts = splitBlockId(blockId);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
- throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
- ", got:" + blockId);
- }
- Number primaryId;
- if (!areMergedChunks) {
- primaryId = Long.parseLong(blockIdParts[2]);
- } else {
- primaryId = Integer.parseInt(blockIdParts[2]);
+ throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
- BlocksInfo blocksInfoByPrimaryId =
primaryIdToBlocksInfo.computeIfAbsent(primaryId,
- id -> new BlocksInfo());
- blocksInfoByPrimaryId.blockIds.add(blockId);
- // If blockId is a regular shuffle block, then blockIdParts[3] =
reduceId. If blockId is a
- // shuffleChunk block, then blockIdParts[3] = chunkId
- blocksInfoByPrimaryId.ids.add(Integer.parseInt(blockIdParts[3]));
+
+ long mapId = Long.parseLong(blockIdParts[2]);
+ BlocksInfo blocksInfoByMapId = mapIdToBlocksInfo.computeIfAbsent(mapId,
+ id -> new BlocksInfo());
+ blocksInfoByMapId.blockIds.add(blockId);
+ blocksInfoByMapId.ids.add(Integer.parseInt(blockIdParts[3]));
+
if (batchFetchEnabled) {
- // It comes here only if the blockId is a regular shuffle block not a
shuffleChunk block.
// When we read continuous shuffle blocks in batch, we will reuse
reduceIds in
// FetchShuffleBlocks to store the start and end reduce id for range
// [startReduceId, endReduceId).
assert(blockIdParts.length == 5);
// blockIdParts[4] is the end reduce id for the batch range
- blocksInfoByPrimaryId.ids.add(Integer.parseInt(blockIdParts[4]));
+ blocksInfoByMapId.ids.add(Integer.parseInt(blockIdParts[4]));
+ }
+ }
+
+ int[][] reduceIdsArray = getSecondaryIds(mapIdToBlocksInfo);
+ long[] mapIds = Longs.toArray(mapIdToBlocksInfo.keySet());
+ return new FetchShuffleBlocks(
+ appId, execId, shuffleId, mapIds, reduceIdsArray, batchFetchEnabled);
+ }
+
+ private AbstractFetchShuffleBlocks createFetchShuffleChunksMsg(
+ String appId,
+ String execId,
+ String[] blockIds) {
+ String[] firstBlock = splitBlockId(blockIds[0]);
+ int shuffleId = Integer.parseInt(firstBlock[1]);
+ int shuffleMergeId = Integer.parseInt(firstBlock[2]);
+
+ Map<Integer, BlocksInfo> reduceIdToBlocksInfo = new LinkedHashMap<>();
+ for (String blockId : blockIds) {
+ String[] blockIdParts = splitBlockId(blockId);
+ if (Integer.parseInt(blockIdParts[1]) != shuffleId ||
+ Integer.parseInt(blockIdParts[2]) != shuffleMergeId) {
+ throw new IllegalArgumentException(String.format("Expected shuffleId =
%s and"
+ + " shuffleMergeId = %s but got %s", shuffleId, shuffleMergeId,
blockId));
}
+
+ int reduceId = Integer.parseInt(blockIdParts[3]);
+ BlocksInfo blocksInfoByReduceId =
reduceIdToBlocksInfo.computeIfAbsent(reduceId,
+ id -> new BlocksInfo());
Review comment:
2 idents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -78,6 +78,27 @@
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+ private static final int UNDEFINED_SHUFFLE_MERGE_ID = Integer.MIN_VALUE;
+
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
+ // Marker to identify stale shuffle partitions typically happens in the case
of
+ // indeterminate stage retries.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
STALE_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
+
+ // Marker for finalized shuffle partitions, used to identify late blocks
getting merged.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
FINALIZED_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ public static final AppShufflePartitionInfo STALE_APP_SHUFFLE_PARTITION_INFO
=
+ new AppShufflePartitionInfo();
+
+ @VisibleForTesting
+ public static final AppShufflePartitionInfo
FINALIZED_APP_SHUFFLE_PARTITION_INFO =
+ new AppShufflePartitionInfo();
Review comment:
2 idents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlocksMetaListener.java
##########
@@ -30,17 +30,21 @@
* Called after successfully receiving the meta of a merged block.
*
* @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlocksMetaListener.java
##########
@@ -30,17 +30,21 @@
* Called after successfully receiving the meta of a merged block.
*
* @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
* @param reduceId reduce id.
* @param meta contains meta information of a merged block.
*/
- void onSuccess(int shuffleId, int reduceId, MergedBlockMeta meta);
+ void onSuccess(int shuffleId, int shuffleMergeId, int reduceId,
MergedBlockMeta meta);
/**
* Called when there is an exception while fetching the meta of a merged
block.
*
* @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -85,21 +85,27 @@
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
* @param reduceId reducer ID
* @param chunkId merged shuffle file chunk ID
* @return The {@link ManagedBuffer} for the given merged shuffle chunk
*/
- ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId,
int chunkId);
+ ManagedBuffer getMergedBlockData(
+ String appId, int shuffleId, int shuffleMergeId, int reduceId, int
chunkId);
/**
* Get the meta information of a merged block.
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -408,26 +409,41 @@ protected Ratio getRatio() {
}
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
Review comment:
Please add a comment to describe the format of the ShuffleBlock before
we do the parse below.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -408,26 +409,41 @@ protected Ratio getRatio() {
}
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+ final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4
- || (!requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
- || (requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+ if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
throw new IllegalArgumentException("Unexpected shuffle block id
format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId
+
", got:" + blockIds[i]);
}
- // For regular blocks, blockIdParts[2] is mapId. For chunks, it is
reduceId.
- primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
- // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is
chunkId.
- primaryIdAndSecondaryIds[2 * i + 1] =
Integer.parseInt(blockIdParts[3]);
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
Review comment:
Add comment:
```suggestion
// mapId
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
// reduceId
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -85,21 +85,27 @@
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -408,26 +409,41 @@ protected Ratio getRatio() {
}
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+ final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4
- || (!requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
- || (requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+ if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
throw new IllegalArgumentException("Unexpected shuffle block id
format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId
+
", got:" + blockIds[i]);
}
- // For regular blocks, blockIdParts[2] is mapId. For chunks, it is
reduceId.
- primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
- // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is
chunkId.
- primaryIdAndSecondaryIds[2 * i + 1] =
Integer.parseInt(blockIdParts[3]);
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
- return primaryIdAndSecondaryIds;
+ return mapIdAndReduceIds;
+ }
+
+ private int[] shuffleReduceIdAndChunkIds(
+ String[] blockIds,
+ int shuffleId,
+ int shuffleMergeId) {
+ final int[] reduceIdAndChunkIds = new int[2 * blockIds.length];
+ for(int i = 0; i < blockIds.length; i++) {
+ String[] blockIdParts = blockIds[i].split("_");
+ if (blockIdParts.length != 5 ||
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID)) {
+ throw new IllegalArgumentException("Unexpected shuffle chunk id
format: " + blockIds[i]);
+ }
+ if (Integer.parseInt(blockIdParts[1]) != shuffleId ||
+ Integer.parseInt(blockIdParts[2]) != shuffleMergeId) {
+ throw new IllegalArgumentException(String.format("Expected shuffleId
= %s"
+ + " and shuffleMergeId = %s but got %s", shuffleId,
shuffleMergeId, blockIds[i]));
+ }
+ reduceIdAndChunkIds[2 * i] = Integer.parseInt(blockIdParts[3]);
+ reduceIdAndChunkIds[2 * i + 1] = Integer.parseInt(blockIdParts[4]);
Review comment:
Add comment:
```suggestion
// reduceId
reduceIdAndChunkIds[2 * i] = Integer.parseInt(blockIdParts[3]);
// chunkId
reduceIdAndChunkIds[2 * i + 1] = Integer.parseInt(blockIdParts[4]);
```
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
##########
@@ -32,13 +32,20 @@
public final long requestId;
public final String appId;
public final int shuffleId;
+ public final int shuffleMergeId;
public final int reduceId;
- public MergedBlockMetaRequest(long requestId, String appId, int shuffleId,
int reduceId) {
+ public MergedBlockMetaRequest(
+ long requestId,
+ String appId,
+ int shuffleId,
+ int shuffleMergeId,
+ int reduceId) {
super(null, false);
this.requestId = requestId;
this.appId = appId;
this.shuffleId = shuffleId;
+ this.shuffleMergeId = shuffleMergeId;
Review comment:
Shall we also update `equals` & `toString` with this field?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -408,26 +409,41 @@ protected Ratio getRatio() {
}
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+ final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4
- || (!requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
- || (requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+ if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
throw new IllegalArgumentException("Unexpected shuffle block id
format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId
+
", got:" + blockIds[i]);
}
- // For regular blocks, blockIdParts[2] is mapId. For chunks, it is
reduceId.
- primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
- // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is
chunkId.
- primaryIdAndSecondaryIds[2 * i + 1] =
Integer.parseInt(blockIdParts[3]);
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
- return primaryIdAndSecondaryIds;
+ return mapIdAndReduceIds;
+ }
+
+ private int[] shuffleReduceIdAndChunkIds(
+ String[] blockIds,
+ int shuffleId,
+ int shuffleMergeId) {
+ final int[] reduceIdAndChunkIds = new int[2 * blockIds.length];
Review comment:
Please add a comment to describe the format of the ShuffleChunk before
we do the parse below.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -85,21 +85,27 @@
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
* @param reduceId reducer ID
* @param chunkId merged shuffle file chunk ID
* @return The {@link ManagedBuffer} for the given merged shuffle chunk
*/
- ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId,
int chunkId);
+ ManagedBuffer getMergedBlockData(
+ String appId, int shuffleId, int shuffleMergeId, int reduceId, int
chunkId);
Review comment:
```suggestion
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId,
int chunkId);
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -125,63 +125,87 @@ private AbstractFetchShuffleBlocks
createFetchShuffleBlocksOrChunksMsg(
String execId,
String[] blockIds) {
if (blockIds[0].startsWith(SHUFFLE_CHUNK_PREFIX)) {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
true);
+ return createFetchShuffleChunksMsg(appId, execId, blockIds);
} else {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
false);
+ return createFetchShuffleBlocksMsg(appId, execId, blockIds);
}
}
- /**
- * Create FetchShuffleBlocks/FetchShuffleBlockChunks message and rebuild
internal blockIds by
- * analyzing the passed in blockIds.
- */
- private AbstractFetchShuffleBlocks createFetchShuffleMsgAndBuildBlockIds(
+ private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId,
String execId,
- String[] blockIds,
- boolean areMergedChunks) {
+ String[] blockIds) {
String[] firstBlock = splitBlockId(blockIds[0]);
int shuffleId = Integer.parseInt(firstBlock[1]);
boolean batchFetchEnabled = firstBlock.length == 5;
-
- // In case of FetchShuffleBlocks, primaryId is mapId. For
FetchShuffleBlockChunks, primaryId
- // is reduceId.
- LinkedHashMap<Number, BlocksInfo> primaryIdToBlocksInfo = new
LinkedHashMap<>();
+ Map<Long, BlocksInfo> mapIdToBlocksInfo = new LinkedHashMap<>();
for (String blockId : blockIds) {
String[] blockIdParts = splitBlockId(blockId);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
- throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
- ", got:" + blockId);
- }
- Number primaryId;
- if (!areMergedChunks) {
- primaryId = Long.parseLong(blockIdParts[2]);
- } else {
- primaryId = Integer.parseInt(blockIdParts[2]);
+ throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
- BlocksInfo blocksInfoByPrimaryId =
primaryIdToBlocksInfo.computeIfAbsent(primaryId,
- id -> new BlocksInfo());
- blocksInfoByPrimaryId.blockIds.add(blockId);
- // If blockId is a regular shuffle block, then blockIdParts[3] =
reduceId. If blockId is a
- // shuffleChunk block, then blockIdParts[3] = chunkId
- blocksInfoByPrimaryId.ids.add(Integer.parseInt(blockIdParts[3]));
+
+ long mapId = Long.parseLong(blockIdParts[2]);
+ BlocksInfo blocksInfoByMapId = mapIdToBlocksInfo.computeIfAbsent(mapId,
+ id -> new BlocksInfo());
Review comment:
2 idents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -125,63 +125,87 @@ private AbstractFetchShuffleBlocks
createFetchShuffleBlocksOrChunksMsg(
String execId,
String[] blockIds) {
if (blockIds[0].startsWith(SHUFFLE_CHUNK_PREFIX)) {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
true);
+ return createFetchShuffleChunksMsg(appId, execId, blockIds);
} else {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
false);
+ return createFetchShuffleBlocksMsg(appId, execId, blockIds);
}
}
- /**
- * Create FetchShuffleBlocks/FetchShuffleBlockChunks message and rebuild
internal blockIds by
- * analyzing the passed in blockIds.
- */
- private AbstractFetchShuffleBlocks createFetchShuffleMsgAndBuildBlockIds(
+ private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId,
String execId,
- String[] blockIds,
- boolean areMergedChunks) {
+ String[] blockIds) {
String[] firstBlock = splitBlockId(blockIds[0]);
int shuffleId = Integer.parseInt(firstBlock[1]);
boolean batchFetchEnabled = firstBlock.length == 5;
-
- // In case of FetchShuffleBlocks, primaryId is mapId. For
FetchShuffleBlockChunks, primaryId
- // is reduceId.
- LinkedHashMap<Number, BlocksInfo> primaryIdToBlocksInfo = new
LinkedHashMap<>();
+ Map<Long, BlocksInfo> mapIdToBlocksInfo = new LinkedHashMap<>();
for (String blockId : blockIds) {
String[] blockIdParts = splitBlockId(blockId);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
- throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
- ", got:" + blockId);
- }
- Number primaryId;
- if (!areMergedChunks) {
- primaryId = Long.parseLong(blockIdParts[2]);
- } else {
- primaryId = Integer.parseInt(blockIdParts[2]);
+ throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockId);
}
- BlocksInfo blocksInfoByPrimaryId =
primaryIdToBlocksInfo.computeIfAbsent(primaryId,
- id -> new BlocksInfo());
- blocksInfoByPrimaryId.blockIds.add(blockId);
- // If blockId is a regular shuffle block, then blockIdParts[3] =
reduceId. If blockId is a
- // shuffleChunk block, then blockIdParts[3] = chunkId
- blocksInfoByPrimaryId.ids.add(Integer.parseInt(blockIdParts[3]));
+
+ long mapId = Long.parseLong(blockIdParts[2]);
+ BlocksInfo blocksInfoByMapId = mapIdToBlocksInfo.computeIfAbsent(mapId,
+ id -> new BlocksInfo());
+ blocksInfoByMapId.blockIds.add(blockId);
+ blocksInfoByMapId.ids.add(Integer.parseInt(blockIdParts[3]));
+
if (batchFetchEnabled) {
- // It comes here only if the blockId is a regular shuffle block not a
shuffleChunk block.
// When we read continuous shuffle blocks in batch, we will reuse
reduceIds in
// FetchShuffleBlocks to store the start and end reduce id for range
// [startReduceId, endReduceId).
assert(blockIdParts.length == 5);
// blockIdParts[4] is the end reduce id for the batch range
- blocksInfoByPrimaryId.ids.add(Integer.parseInt(blockIdParts[4]));
+ blocksInfoByMapId.ids.add(Integer.parseInt(blockIdParts[4]));
+ }
+ }
+
+ int[][] reduceIdsArray = getSecondaryIds(mapIdToBlocksInfo);
+ long[] mapIds = Longs.toArray(mapIdToBlocksInfo.keySet());
+ return new FetchShuffleBlocks(
+ appId, execId, shuffleId, mapIds, reduceIdsArray, batchFetchEnabled);
Review comment:
2 idents
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -125,63 +125,87 @@ private AbstractFetchShuffleBlocks
createFetchShuffleBlocksOrChunksMsg(
String execId,
String[] blockIds) {
if (blockIds[0].startsWith(SHUFFLE_CHUNK_PREFIX)) {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
true);
+ return createFetchShuffleChunksMsg(appId, execId, blockIds);
} else {
- return createFetchShuffleMsgAndBuildBlockIds(appId, execId, blockIds,
false);
+ return createFetchShuffleBlocksMsg(appId, execId, blockIds);
}
}
- /**
- * Create FetchShuffleBlocks/FetchShuffleBlockChunks message and rebuild
internal blockIds by
- * analyzing the passed in blockIds.
- */
- private AbstractFetchShuffleBlocks createFetchShuffleMsgAndBuildBlockIds(
+ private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(
String appId,
String execId,
- String[] blockIds,
- boolean areMergedChunks) {
+ String[] blockIds) {
String[] firstBlock = splitBlockId(blockIds[0]);
int shuffleId = Integer.parseInt(firstBlock[1]);
boolean batchFetchEnabled = firstBlock.length == 5;
Review comment:
```suggestion
boolean batchFetchEnabled = firstBlock.length == 5 &&
blockIds[0].equals("shuffle");
```
?
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -150,6 +155,22 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
}
}
+ def newShuffleMergeState(): Unit = {
+ _shuffleMergeEnabled = canShuffleMergeBeEnabled()
+ _shuffleMergedFinalized = false
+ mergerLocs = Nil
+ _shuffleMergeId += 1
+ }
+
+ private def canShuffleMergeBeEnabled(): Boolean = {
+ if (rdd.isBarrier()) {
+ logWarning("Push-based shuffle is currently not supported for barrier
stages")
+ }
Review comment:
We should only log this when push-based shuffle is enabled.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
##########
@@ -85,21 +85,27 @@
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
* @param reduceId reducer ID
* @param chunkId merged shuffle file chunk ID
* @return The {@link ManagedBuffer} for the given merged shuffle chunk
*/
- ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId,
int chunkId);
+ ManagedBuffer getMergedBlockData(
+ String appId, int shuffleId, int shuffleMergeId, int reduceId, int
chunkId);
/**
* Get the meta information of a merged block.
*
* @param appId application ID
* @param shuffleId shuffle ID
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
* @param reduceId reducer ID
* @return meta information of a merged block
*/
- MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int
reduceId);
+ MergedBlockMeta getMergedBlockMeta(
+ String appId, int shuffleId, int shuffleMergeId, int reduceId);
Review comment:
ditto
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -167,6 +167,8 @@ public void pushBlocks(
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify a
indeterminate stage
+ * attempt of a shuffle Id.
Review comment:
ditto.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -135,13 +135,14 @@ public void start() {
assert buffers.containsKey(blockIds[i]) : "Could not find the block
buffer for block "
+ blockIds[i];
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_PUSH_BLOCK_PREFIX)) {
+ if (blockIdParts.length != 5 ||
!blockIdParts[0].equals(SHUFFLE_PUSH_BLOCK_PREFIX)) {
Review comment:
We should exclude batched block id too.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -78,6 +78,27 @@
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+ private static final int UNDEFINED_SHUFFLE_MERGE_ID = Integer.MIN_VALUE;
+
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
+ // Marker to identify stale shuffle partitions typically happens in the case
of
+ // indeterminate stage retries.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
STALE_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
+
+ // Marker for finalized shuffle partitions, used to identify late blocks
getting merged.
+ @VisibleForTesting
+ public static final Map<Integer, AppShufflePartitionInfo>
FINALIZED_SHUFFLE_PARTITIONS =
+ new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ public static final AppShufflePartitionInfo STALE_APP_SHUFFLE_PARTITION_INFO
=
+ new AppShufflePartitionInfo();
Review comment:
2 indents
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]