zhouyejoe commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679691086
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +138,88 @@ protected AppShuffleInfo
validateAndGetAppShuffleInfo(String appId) {
}
/**
- * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies
a given shuffle
- * partition of an application, retrieves the associated metadata. If not
present and the
- * corresponding merged shuffle does not exist, initializes the metadata.
+ * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that
uniquely identifies
+ * a given shuffle partition of an application, retrieves the associated
metadata. If not
+ * present and the corresponding merged shuffle does not exist, initializes
the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleInfo appShuffleInfo,
int shuffleId,
- int reduceId) {
- File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId,
reduceId);
- ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
- appShuffleInfo.partitions;
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.compute(shuffleId, (id, map) -> {
- if (map == null) {
+ int shuffleMergeId,
+ int reduceId) throws StaleBlockPushException {
+ ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> partitions =
appShuffleInfo.partitions;
+ AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
+ partitions.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
+ if (appShuffleMergePartitionsInfo == null) {
+ File dataFile =
+ appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
// If this partition is already finalized then the partitions map
will not contain the
- // shuffleId but the data file would exist. In that case the block
is considered late.
+ // shuffleId for determinate stages but the data file would exist.
+ // In that case the block is considered late. In the case of
indeterminate stages, most
+ // recent shuffleMergeId finalized would be pointing to
INDETERMINATE_SHUFFLE_FINALIZED
if (dataFile.exists()) {
return null;
+ } else {
+ logger.info("Creating a new attempt for shuffle blocks push
request for shuffle {}"
+ + " with shuffleMergeId {} for application {}_{}", shuffleId,
shuffleMergeId,
+ appShuffleInfo.appId, appShuffleInfo.attemptId);
+ return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
}
- return new ConcurrentHashMap<>();
} else {
- return map;
+ // Reject the request as we have already seen a higher
shuffleMergeId than the
+ // current incoming one
+ int latestShuffleMergeId =
appShuffleMergePartitionsInfo.shuffleMergeId;
+ if (latestShuffleMergeId > shuffleMergeId) {
+ throw new StaleBlockPushException(String.format("Rejecting shuffle
blocks push request"
+ + " for shuffle %s with shuffleMergeId %s for application %s_%s
as a higher"
+ + " shuffleMergeId %s request is already seen", shuffleId,
shuffleMergeId,
+ appShuffleInfo.appId, appShuffleInfo.attemptId,
latestShuffleMergeId));
Review comment:
Nit: These two lines should have the same indents as the first +.
Example:
https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L70
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +138,88 @@ protected AppShuffleInfo
validateAndGetAppShuffleInfo(String appId) {
}
/**
- * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies
a given shuffle
- * partition of an application, retrieves the associated metadata. If not
present and the
- * corresponding merged shuffle does not exist, initializes the metadata.
+ * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that
uniquely identifies
+ * a given shuffle partition of an application, retrieves the associated
metadata. If not
+ * present and the corresponding merged shuffle does not exist, initializes
the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleInfo appShuffleInfo,
int shuffleId,
- int reduceId) {
- File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId,
reduceId);
- ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
- appShuffleInfo.partitions;
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.compute(shuffleId, (id, map) -> {
- if (map == null) {
+ int shuffleMergeId,
+ int reduceId) throws StaleBlockPushException {
+ ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> partitions =
appShuffleInfo.partitions;
+ AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
+ partitions.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
+ if (appShuffleMergePartitionsInfo == null) {
+ File dataFile =
+ appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
// If this partition is already finalized then the partitions map
will not contain the
- // shuffleId but the data file would exist. In that case the block
is considered late.
+ // shuffleId for determinate stages but the data file would exist.
+ // In that case the block is considered late. In the case of
indeterminate stages, most
+ // recent shuffleMergeId finalized would be pointing to
INDETERMINATE_SHUFFLE_FINALIZED
if (dataFile.exists()) {
return null;
+ } else {
+ logger.info("Creating a new attempt for shuffle blocks push
request for shuffle {}"
+ + " with shuffleMergeId {} for application {}_{}", shuffleId,
shuffleMergeId,
+ appShuffleInfo.appId, appShuffleInfo.attemptId);
+ return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
}
- return new ConcurrentHashMap<>();
} else {
- return map;
+ // Reject the request as we have already seen a higher
shuffleMergeId than the
+ // current incoming one
+ int latestShuffleMergeId =
appShuffleMergePartitionsInfo.shuffleMergeId;
+ if (latestShuffleMergeId > shuffleMergeId) {
+ throw new StaleBlockPushException(String.format("Rejecting shuffle
blocks push request"
+ + " for shuffle %s with shuffleMergeId %s for application %s_%s
as a higher"
+ + " shuffleMergeId %s request is already seen", shuffleId,
shuffleMergeId,
+ appShuffleInfo.appId, appShuffleInfo.attemptId,
latestShuffleMergeId));
+ } else if (latestShuffleMergeId == shuffleMergeId) {
+ return appShuffleMergePartitionsInfo;
+ } else {
+ // Higher shuffleMergeId seen for the shuffle ID meaning new stage
attempt is being
+ // run for the shuffle ID. Close and clean up old shuffleMergeId
files,
+ // happens in the indeterminate stage retries
+ logger.info("Creating a new attempt for shuffle blocks push
request for shuffle {}"
+ + " with shuffleMergeId {} for application {}_{} since it is
higher than the"
+ + " latest shuffleMergeId {} already seen", shuffleId,
shuffleMergeId,
Review comment:
Nit: Indent. Same above
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -270,18 +333,28 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
void closeAndDeletePartitionFilesIfNeeded(
AppShuffleInfo appShuffleInfo,
boolean cleanupLocalDirs) {
- for (Map<Integer, AppShufflePartitionInfo> partitionMap :
appShuffleInfo.partitions.values()) {
- for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
- synchronized (partitionInfo) {
- partitionInfo.closeAllFiles();
- }
- }
- }
+ appShuffleInfo.partitions.values().stream()
+ .flatMap(x -> x.shuffleMergePartitions.values().stream())
+ .forEach(partitionInfo ->
partitionInfo.closeAllFilesAndDeleteIfNeeded(false));
Review comment:
The synchronized is removed. Will that cause any issues?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -563,9 +675,10 @@ public String getID() {
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
long updatedPos = partitionInfo.getDataFilePos() + length;
- logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos
{}",
- partitionInfo.appId, partitionInfo.shuffleId,
- partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
+ logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current
pos"
+ + " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
Review comment:
Nit: Indent should be the same with the above "+". There are some other
places down below for the long line "logger" that there is no need to put an
additional indent, as the string and the later variables are at the same of
parameters for the debug method. Please double check other ones.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -270,18 +333,28 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
void closeAndDeletePartitionFilesIfNeeded(
AppShuffleInfo appShuffleInfo,
boolean cleanupLocalDirs) {
- for (Map<Integer, AppShufflePartitionInfo> partitionMap :
appShuffleInfo.partitions.values()) {
- for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
- synchronized (partitionInfo) {
- partitionInfo.closeAllFiles();
- }
- }
- }
+ appShuffleInfo.partitions.values().stream()
+ .flatMap(x -> x.shuffleMergePartitions.values().stream())
+ .forEach(partitionInfo ->
partitionInfo.closeAllFilesAndDeleteIfNeeded(false));
Review comment:
Should we make the newly added closeAndDeletePartitionFiles method to
have a marker parameter for the deletion, then we can call it here?
for (Map<Integer, AppShufflePartitionInfo> partitionMap :
appShuffleInfo.partitions.values()) {
closeAndDeletePartitionFiles(partitionMap, false)
}
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -188,49 +228,72 @@ private AppShufflePartitionInfo
getOrCreateAppShufflePartitionInfo(
AppShufflePartitionInfo newAppShufflePartitionInfo(
String appId,
int shuffleId,
+ int shuffleMergeId,
int reduceId,
File dataFile,
File indexFile,
File metaFile) throws IOException {
- return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile,
+ return new AppShufflePartitionInfo(appId, shuffleId, shuffleMergeId,
reduceId, dataFile,
new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
}
@Override
- public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int
reduceId) {
+ public MergedBlockMeta getMergedBlockMeta(
+ String appId,
+ int shuffleId,
+ int shuffleMergeId,
+ int reduceId) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ Map<Integer, AppShuffleMergePartitionsInfo> partitions =
appShuffleInfo.partitions;
+ if (partitions.containsKey(shuffleId) &&
+ partitions.get(shuffleId).shuffleMergeId > shuffleMergeId) {
+ throw new RuntimeException(String.format(
+ "MergedBlockMeta fetch for shuffle %s with shuffleMergeId %s reduceId
%s is %s",
+ shuffleId, shuffleMergeId, reduceId,
+ ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));
+ }
File indexFile =
- appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
+ appShuffleInfo.getMergedShuffleIndexFile(shuffleId, shuffleMergeId,
reduceId);
if (!indexFile.exists()) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s not found", indexFile.getPath()));
}
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
- File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId,
reduceId);
+ File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId,
shuffleMergeId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s
not found",
metaFile.getPath()));
}
FileSegmentManagedBuffer chunkBitMaps =
new FileSegmentManagedBuffer(conf, metaFile, 0L, metaFile.length());
logger.trace(
- "{} shuffleId {} reduceId {} num chunks {}", appId, shuffleId, reduceId,
numChunks);
+ "{} shuffleId {} shuffleMergeId {} reduceId {} num chunks {}",
+ appId, shuffleId, shuffleMergeId, reduceId, numChunks);
return new MergedBlockMeta(numChunks, chunkBitMaps);
}
@SuppressWarnings("UnstableApiUsage")
@Override
- public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int
reduceId, int chunkId) {
+ public ManagedBuffer getMergedBlockData(
+ String appId, int shuffleId, int shuffleMergeId, int reduceId, int
chunkId) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
- File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId,
reduceId);
+ Map<Integer, AppShuffleMergePartitionsInfo> partitions =
appShuffleInfo.partitions;
+ if (partitions.containsKey(shuffleId) &&
+ partitions.get(shuffleId).shuffleMergeId > shuffleMergeId) {
+ throw new RuntimeException(String.format(
+ "MergedBlockData fetch for shuffle %s with shuffleMergeId %s
reduceId %s is %s",
+ shuffleId, shuffleMergeId, reduceId,
+ ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));
Review comment:
Nit: Indent is off
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -304,9 +377,9 @@ void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
- final String streamId = String.format("%s_%d_%d_%d",
- OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId,
msg.mapIndex,
- msg.reduceId);
+ final String streamId = String.format("%s_%d_%d_%d_%d",
+ OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId,
msg.shuffleMergeId,
+ msg.mapIndex, msg.reduceId);
Review comment:
Nit: Indent should be the same with the above line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]