otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r638114426
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -116,29 +115,35 @@ public ShuffleIndexInformation load(File file) throws
IOException {
* application, retrieves the associated metadata. If not present and the
corresponding merged
* shuffle does not exist, initializes the metadata.
*/
- private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
- AppShuffleId appShuffleId,
+ private AppAttemptShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
+ AppAttemptShuffleId appAttemptShuffleId,
int reduceId) {
- File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
- if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+ if (!appsPathsInfo.containsKey(appAttemptShuffleId.appId)
Review comment:
We are checking `appPathsInfo` for the attempt so this can be pulled out
of here and checked before calling this method
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -761,28 +818,31 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
- AppShuffleId that = (AppShuffleId) o;
- return shuffleId == that.shuffleId && Objects.equal(appId, that.appId);
+ AppAttemptShuffleId that = (AppAttemptShuffleId) o;
+ return Objects.equal(appId, that.appId)
+ && attemptId == that.attemptId
+ && shuffleId == that.shuffleId;
}
@Override
public int hashCode() {
- return Objects.hashCode(appId, shuffleId);
+ return Objects.hashCode(appId, attemptId, shuffleId);
}
@Override
public String toString() {
- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
- .append("appId", appId)
- .append("shuffleId", shuffleId)
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("attemptId", attemptId)
+ .add("shuffleId", shuffleId)
.toString();
}
}
/** Metadata tracked for an actively merged shuffle partition */
- public static class AppShufflePartitionInfo {
+ public static class AppAttemptShufflePartitionInfo {
Review comment:
We only create a partition info for the latest attempt. I don't really
think renaming this class is necessary.
The java doc also says this. Here `attempt` has no significance since it is
just the part of the `AppAttemptShuffleId` which is renamed.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int,
reduceId: Int) exte
override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex +
"_" + reduceId
}
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int)
extends BlockId {
+ override def name: String = "shuffleMerged_" + appId + "_" + shuffleId + "_"
+ reduceId + ".data"
+}
+
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+ appId: String,
+ shuffleId: Int,
+ reduceId: Int) extends BlockId {
+ override def name: String =
+ "shuffleMerged_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
+}
+
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedMetaBlockId(
+ appId: String,
+ shuffleId: Int,
+ reduceId: Int) extends BlockId {
+ override def name: String =
+ "shuffleMerged_" + appId + "_" + shuffleId + "_" + reduceId + ".meta"
Review comment:
So the file names that the client expects now starts with
`shuffleMerged`. Has this be changed on the server side which writes these
files? IIRC it was writing to files starting with `mergedShuffle`
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +429,65 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ partitions.remove(appAttemptShuffleId);
+ logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId,
msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo,
MergeDirectoryMeta.class);
+ if (mergeDirectoryMeta.attemptId == -1) {
+ // When attemptId is 0, there is no attemptId stored in the
ExecutorShuffleInfo.
Review comment:
Are there UTs added for these cases?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -59,7 +59,7 @@ default boolean shouldLogError(Throwable t) {
* will not retry pushing the block nor log the exception on the client
side.
*/
public static final String TOO_LATE_MESSAGE_SUFFIX =
- "received after merged shuffle is finalized";
+ "received after merged shuffle is finalized or newer attempt has
started";
Review comment:
Why don't we make "newer attempt has started" a separate message?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -92,7 +91,7 @@
public RemoteBlockPushResolver(TransportConf conf) {
this.conf = conf;
this.partitions = Maps.newConcurrentMap();
- this.appsPathInfo = Maps.newConcurrentMap();
+ this.appsPathsInfo = Maps.newConcurrentMap();
Review comment:
Nit: rename to appAttemptPathsInfo
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -293,9 +307,9 @@ void deleteExecutorDirs(Path[] dirs) {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
// Retrieve merged shuffle file metadata
- AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
- AppShufflePartitionInfo partitionInfoBeforeCheck =
- getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
+ AppAttemptShuffleId appAttemptShuffleId = new
AppAttemptShuffleId(msg.appId, msg.attemptId, msg.shuffleId);
+ AppAttemptShufflePartitionInfo partitionInfoBeforeCheck =
Review comment:
We can just check before creating the partitionInfo, whether the
attemptId is latest or not. If not, then throw the relevant exception here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]