mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990751884
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java:
##########
@@ -224,6 +224,12 @@ protected void handleMessage(
} finally {
responseDelayContext.stop();
}
+ } else if (msgObj instanceof RemoveShuffleMerge) {
+ RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+ checkAuth(client, msg.appId);
+ logger.info("Removing shuffle merge data for application {} shuffle {}
shuffleMerge {}",
+ msg.appId, msg.shuffleId, msg.shuffleMergeId);
+ mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId,
msg.shuffleMergeId);
Review Comment:
We need to pass in `appAttemptId` as well for `RemoveShuffleMerge` - I had
left that comment earlier.
Take a look at `FinalizeShuffleMerge`, its processing and handle it
similarly ? (we can pass `RemoveShuffleMerge` to
`mergeManager.removeShuffleMerge` - and lookup from the fields there).
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(String appId, int shuffleId, int
shuffleMergeId) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
Review Comment:
Add validation for `attemptId` here.
Take a look at `finalizeShuffleMerge` for example
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(String appId, int shuffleId, int
shuffleMergeId) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ AppShuffleMergePartitionsInfo partitionsInfo =
appShuffleInfo.shuffles.remove(shuffleId);
Review Comment:
We should not remove it directly - the value within the map could be for a
different `shuffleMergeId` (newer for example).
Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
Rough sketch is:
```
public void removeShuffleMerge(FinalizeShuffleMerge msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// incoming request for older app attempt - exception
throw new IllegalArgumentException("appropriate msg for invalid app
attempt");
}
appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
if (null != mergePartitionsInfo) {
// where DELETE_CURRENT == -1
// merge id will be set to -1 when we are cleaning up shuffle, and
there is no chance of its reuse -
// else it will be set to an explicit value.
boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
// Looks like there is a bug in finalizeShuffleMerge, let us fix it
here anyway
// and handle it for finalizeShuffleMerge in a different PR.
AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new
AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId,
msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
if (!deleteAny && msg.shuffleMergeId <
mergePartitionsInfo.shuffleMergeId) {
// throw exception - request for an older shuffle merge id
throw new RuntimeException("appropriate msg for delete of old
merge id");
} else if (!deleteAny && msg.shuffleMergeId >
mergePartitionsInfo.shuffleMergeId) {
// cleanup request for newer shuffle - remove the outdated data we
have.
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
} else {
// request to cleanup shuffle we are currently hosting
// Not yet finalized - use the existing cleanup mechanism
if (!mergePartitionsInfo.isFinalized()) {
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
} else {
if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
// Introduce new method which deletes the files for
shuffleMergeId
submitCleanupTask(() ->
deleteMergedFiles(msg.appId, msg.appAttemptId,
msg.shuffleId, msg.shuffleMergeId,
// To be introduced - see below
mergePartitionsInfo.getReduceIds()));
}
// simply return existing entry immediately - db does not need
updating - we can actually
// drop reduce-ids here as an optimization
return mergePartitionsInfo;
}
}
}
// keep track of the latest merge id - and mark it as finalized and
immutable as already marked for deletion/deleted.
AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
// no reduceid's
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
});
}
```
To write up `deleteMergedFiles`, the only thing missing is set of valid
reduce id's (`getReduceIds` above).
We can keep track of that by modifying `finalizeShuffleMerge` as follows:
a) keep reference to response from `appShuffleInfo.shuffles.compute()`
b) Before returning `mergeStatuses`, update this variable with `reduceIds`
c) for efficiency, we can convert it to a bitmap and save space - but that
is an impl detail.
Thoughts ?
+CC @otterc, @zhouyejoe - please take a look at the bug there should be
there in `finalizeShuffleMerge`, which I have sketched fix for above.
Unless I am missing something, we should fix `finalizeShuffleMerge` in a
seperate PR.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
"finalizing shuffle partition {}", msg.appId,
msg.appAttemptId, msg.shuffleId,
msg.shuffleMergeId, partition.reduceId);
} finally {
- partition.closeAllFilesAndDeleteIfNeeded(false);
+ Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+ partition.closeAllFilesAndDeleteIfNeeded(deleteFile);
Review Comment:
Revert ?
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
} else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+ mergePartitionsInfo.shuffleMergePartitions;
submitCleanupTask(() ->
- closeAndDeleteOutdatedPartitions(
- appAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+ closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId,
shuffleMergePartitions));
} else {
Review Comment:
revert this ?
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
} else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
+ Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+ mergePartitionsInfo.shuffleMergePartitions;
submitCleanupTask(() ->
- closeAndDeleteOutdatedPartitions(
- appAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+ closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId,
shuffleMergePartitions));
} else {
// This block covers:
// 1. finalization of determinate stage
// 2. finalization of indeterminate stage if the shuffleMergeId
related to it is the one
// for which the message is received.
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
}
+ } else {
+ mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId,
true);
}
+ mergePartitionsInfo.setFinalized(true);
// Update the DB for the finalized shuffle
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
// Even when the mergePartitionsInfo is null, we mark the shuffle as
finalized but the results
// sent to the driver will be empty. This can happen when the service
didn't receive any
// blocks for the shuffle yet and the driver didn't wait for enough time
to finalize the
// shuffle.
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ return mergePartitionsInfo;
Review Comment:
revert changes to this method ?
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2543,16 +2541,13 @@ private[spark] class DAGScheduler(
shuffleIdToMapStage.filter { case (_, stage) =>
stage.shuffleDep.shuffleMergeAllowed &&
stage.shuffleDep.getMergerLocs.isEmpty &&
runningStages.contains(stage)
- }.foreach { case(_, stage: ShuffleMapStage) =>
- if (getAndSetShufflePushMergerLocations(stage).nonEmpty) {
- logInfo(s"Shuffle merge enabled adaptively for $stage with
shuffle" +
- s" ${stage.shuffleDep.shuffleId} and shuffle merge" +
- s" ${stage.shuffleDep.shuffleMergeId} with
${stage.shuffleDep.getMergerLocs.size}" +
- s" merger locations")
-
mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
- stage.shuffleDep.getMergerLocs)
- }
- }
+ }.foreach { case (_, stage: ShuffleMapStage) =>
+ configureShufflePushMergerLocations(stage)
+ logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" +
Review Comment:
Surround the `logInfo` with `if (stage.shuffleDep.getMergerLocs.nonEmpty)` -
else we will print that log line even if mergers were not set.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1639,9 +1661,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
try {
if (dataChannel.isOpen()) {
dataChannel.close();
- if (delete) {
- dataFile.delete();
- }
+ }
+ if (delete) {
+ dataFile.delete();
Review Comment:
This looks like an orthogonal bug fix.
+CC @otterc
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##########
@@ -256,6 +256,20 @@ public void onFailure(Throwable e) {
}
}
+ @Override
+ public boolean removeShuffleMerge(String host, int port, int shuffleId) {
+ checkInit();
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
Review Comment:
`send` should be fine here - there is no response from ESS to driver when
sending `RemoveShuffleMerge`
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,26 +1431,27 @@ public String toString() {
* required for the shuffles of indeterminate stages.
*/
public static class AppShuffleMergePartitionsInfo {
Review Comment:
Revert changes to this class ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]