mridulm commented on a change in pull request #35325:
URL: https://github.com/apache/spark/pull/35325#discussion_r793202348
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
- // Metadata of the determinate stage shuffle can be safely removed as part
of finalizing
- // shuffle merge. Currently once the shuffle is finalized for a
determinate stages, retry
- // stages of the same shuffle will have shuffle push disabled.
- if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
- AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
- appShuffleInfo.shuffles.remove(msg.shuffleId);
- if (appShuffleMergePartitionsInfo != null) {
-
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (null == mergePartitionsInfo) {
+ //If the mergePartitions was never created then it means that there
weren't any push
+ //blocks that were ever received for this shuffle. This could be the
case when the driver
+ //doesn't wait for enough time to start the stage which reads this
shuffle data.
Review comment:
nit: space between `//` and text.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
- // Metadata of the determinate stage shuffle can be safely removed as part
of finalizing
- // shuffle merge. Currently once the shuffle is finalized for a
determinate stages, retry
- // stages of the same shuffle will have shuffle push disabled.
- if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
- AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
- appShuffleInfo.shuffles.remove(msg.shuffleId);
- if (appShuffleMergePartitionsInfo != null) {
-
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (null == mergePartitionsInfo) {
+ //If the mergePartitions was never created then it means that there
weren't any push
+ //blocks that were ever received for this shuffle. This could be the
case when the driver
+ //doesn't wait for enough time to start the stage which reads this
shuffle data.
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ } else if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
+ || mergePartitionsInfo.isFinalized()) {
+ throw new RuntimeException(
+ String.format("Shuffle merge finalize request for shuffle %s with"
+
+ " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId,
+
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+ } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+ // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
+ // empty MergeStatuses but cleanup the older shuffleMergeId files.
+ mergedShuffleCleaner.execute(() ->
+
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ } else {
+ // This block covers:
+ // 1. finalization of determinate stage
+ // 2. finalization of indeterminate stage if the shuffleMergeId
related to it is the one
+ // for which the message is received.
+
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
Review comment:
nit: All paths in this lambda, which are not throwing exceptions, are
doing the same thing - `new AppShuffleMergePartitionsInfo(msg.shuffleMergeId,
true);`
Make that common ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -576,14 +555,25 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
for (AppShufflePartitionInfo partition: shuffleMergePartitions.values())
{
synchronized (partition) {
try {
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing
shuffle partition "
+ + "{} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId);
// This can throw IOException which will marks this shuffle
partition as not merged.
partition.finalizePartition();
- bitmaps.add(partition.mapTracker);
- reduceIds.add(partition.reduceId);
- sizes.add(partition.getLastChunkOffset());
+ if (partition.mapTracker.getCardinality() > 0) {
+ bitmaps.add(partition.mapTracker);
+ reduceIds.add(partition.reduceId);
+ sizes.add(partition.getLastChunkOffset());
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}:
finalization results added"
+ + " for partition {} data size {} index size {} meta size {}",
Review comment:
nit: move the `+` to prev line (here and elsewhere in string concat for
log msgs)
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -534,35 +515,33 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
- // Metadata of the determinate stage shuffle can be safely removed as part
of finalizing
- // shuffle merge. Currently once the shuffle is finalized for a
determinate stages, retry
- // stages of the same shuffle will have shuffle push disabled.
- if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
- AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
- appShuffleInfo.shuffles.remove(msg.shuffleId);
- if (appShuffleMergePartitionsInfo != null) {
-
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (null == mergePartitionsInfo) {
+ //If the mergePartitions was never created then it means that there
weren't any push
+ //blocks that were ever received for this shuffle. This could be the
case when the driver
+ //doesn't wait for enough time to start the stage which reads this
shuffle data.
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ } else if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
+ || mergePartitionsInfo.isFinalized()) {
Review comment:
nit: move `||` to prev line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]