mridulm commented on a change in pull request #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r836934153
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
+ String prefix) throws IOException {
+ if (!s.startsWith(prefix)) {
Review comment:
nit: check for prefix followed by ';' ?
Will also prevent potential `ArrayIndexOutOfBoundsException` in `substring`
below.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
+ String prefix) throws IOException {
+ if (!s.startsWith(prefix)) {
+ throw new IllegalArgumentException("expected a string starting with " +
prefix);
+ }
+ String json = s.substring(prefix.length() + 1);
+ return mapper.readValue(json, AppShufflePartitionId.class);
+ }
+
+ private static byte[] dbAppAttemptPathsKey(String appId, int attemptId)
throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptIdJson = mapper.writeValueAsString(new
AppAttemptId(appId, attemptId));
+ String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws
IOException {
+ if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with "
+ + APP_ATTEMPT_PATH_KEY_PREFIX);
+ }
+ String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+ return mapper.readValue(json, AppAttemptId.class);
+ }
+
+ public void reloadAppShuffleInfo(DB db) throws IOException {
+ logger.debug("Reload applications merged shuffle information from DB");
+ reloadActiveAppAttemptsPathInfo(db);
+ reloadFinalizedAppAttemptsShuffleInfo(db);
+ reloadActiveAppAttemptsShufflePartitionInfo(db);
Review comment:
`reloadAppShuffleInfo` is used for testing, not the others.
Make `reloadAppShuffleInfo` package private with `@VisibleForTesting` and
make other methods `private` ?
If we need to open these up in future for testing, we can do so at that time.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
+ String prefix) throws IOException {
+ if (!s.startsWith(prefix)) {
+ throw new IllegalArgumentException("expected a string starting with " +
prefix);
+ }
+ String json = s.substring(prefix.length() + 1);
+ return mapper.readValue(json, AppShufflePartitionId.class);
+ }
+
+ private static byte[] dbAppAttemptPathsKey(String appId, int attemptId)
throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptIdJson = mapper.writeValueAsString(new
AppAttemptId(appId, attemptId));
+ String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws
IOException {
+ if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with "
+ + APP_ATTEMPT_PATH_KEY_PREFIX);
+ }
+ String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+ return mapper.readValue(json, AppAttemptId.class);
+ }
+
+ public void reloadAppShuffleInfo(DB db) throws IOException {
+ logger.debug("Reload applications merged shuffle information from DB");
+ reloadActiveAppAttemptsPathInfo(db);
+ reloadFinalizedAppAttemptsShuffleInfo(db);
+ reloadActiveAppAttemptsShufflePartitionInfo(db);
+ }
+
+ @VisibleForTesting
+ public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+ if (db != null) {
+ DBIterator itr = db.iterator();
+ itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ break;
+ }
+ AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+ try{
+ AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(),
AppPathsInfo.class);
+ logger.debug("Reloading active application {}_{} merged shuffle
files paths",
+ appAttemptId.appId, appAttemptId.attemptId);
+ appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+ new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId,
appPathsInfo));
+ } catch (Exception exception) {
+ logger.error("Parsing exception is {}", exception);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void reloadFinalizedAppAttemptsShuffleInfo(DB db) throws IOException {
+ if (db != null) {
+ DBIterator itr = db.iterator();
+
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+ break;
+ }
+ AppShufflePartitionId partitionId =
+ parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+ if (partitionId.reduceId != -1) {
Review comment:
Why would reducerId not be -1 for
`APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX` ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -536,9 +645,20 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
}
// Even when the mergePartitionsInfo is null, we mark the shuffle as
finalized but the results
- // sent to the driver will be empty. This cam happen when the service
didn't receive any
+ // sent to the driver will be empty. This can happen when the service
didn't receive any
// blocks for the shuffle yet and the driver didn't wait for enough time
to finalize the
// shuffle.
+ if (db != null) {
Review comment:
Do this only for the last `else` block above.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -342,6 +404,30 @@ void closeAndDeletePartitionFilesIfNeeded(
if (cleanupLocalDirs) {
deleteExecutorDirs(appShuffleInfo);
}
+ cleanUpAppShuffleInfoInDB(appShuffleInfo);
+ }
+
+ void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
Review comment:
nit: limit visibility to private if we dont need to expose it for
testing/functionality.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -342,6 +404,30 @@ void closeAndDeletePartitionFilesIfNeeded(
if (cleanupLocalDirs) {
deleteExecutorDirs(appShuffleInfo);
}
+ cleanUpAppShuffleInfoInDB(appShuffleInfo);
+ }
+
+ void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
+ if (db != null) {
+ try {
+ db.delete(dbAppAttemptPathsKey(appShuffleInfo.appId,
appShuffleInfo.attemptId));
+ appShuffleInfo.shuffles
+ .forEach((shuffleId, shuffleInfo) ->
shuffleInfo.shuffleMergePartitions
+ .forEach((shuffleMergeId, partitionInfo) -> {
+ synchronized (partitionInfo) {
+ if (shuffleInfo.isFinalized()) {
+ cleanUpAppShufflePartitionInfoInDB(new AppShufflePartitionId(
+ appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+ shuffleMergeId, -1));
+ }
Review comment:
nit: this `if isFinalized` can be moved into
`closeAndDeletePartitionFilesIfNeeded` and we can remove
`cleanUpAppShuffleInfoInDB` ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
Review comment:
nit: `s` -> `key` ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -261,7 +321,8 @@ public MergedBlockMeta getMergedBlockMeta(
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
- File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId,
shuffleMergeId, reduceId);
+ File metaFile =
+ appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId,
reduceId);
Review comment:
revert this and other whitespace changes to minimize diff ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -617,8 +738,8 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
appsShuffleInfo.computeIfAbsent(appId, id ->
new AppShuffleInfo(
appId, UNDEFINED_ATTEMPT_ID,
- new AppPathsInfo(appId, executorInfo.localDirs,
- mergeDir, executorInfo.subDirsPerLocalDir)
+ new AppPathsInfo(appId, attemptId, executorInfo.localDirs,
+ mergeDir, executorInfo.subDirsPerLocalDir, db)
Review comment:
Move the db update out of the constructor, and do it explicitly ?
##########
File path:
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
##########
@@ -120,6 +120,7 @@
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
private static final String SECRETS_RECOVERY_FILE_NAME =
"sparkShuffleRecovery.ldb";
+ private static final String MERGE_MANAGER_FILE_NAME = "mergeManager.ldb";
Review comment:
nit: rename to `sparkShuffleMergeRecovery` or some such to be more clear
about the intent of the file ?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
+ String prefix) throws IOException {
+ if (!s.startsWith(prefix)) {
+ throw new IllegalArgumentException("expected a string starting with " +
prefix);
+ }
+ String json = s.substring(prefix.length() + 1);
+ return mapper.readValue(json, AppShufflePartitionId.class);
+ }
+
+ private static byte[] dbAppAttemptPathsKey(String appId, int attemptId)
throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptIdJson = mapper.writeValueAsString(new
AppAttemptId(appId, attemptId));
+ String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws
IOException {
+ if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with "
+ + APP_ATTEMPT_PATH_KEY_PREFIX);
+ }
+ String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+ return mapper.readValue(json, AppAttemptId.class);
+ }
+
+ public void reloadAppShuffleInfo(DB db) throws IOException {
+ logger.debug("Reload applications merged shuffle information from DB");
+ reloadActiveAppAttemptsPathInfo(db);
+ reloadFinalizedAppAttemptsShuffleInfo(db);
+ reloadActiveAppAttemptsShufflePartitionInfo(db);
+ }
+
+ @VisibleForTesting
+ public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+ if (db != null) {
+ DBIterator itr = db.iterator();
+ itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ break;
+ }
+ AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+ try{
+ AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(),
AppPathsInfo.class);
+ logger.debug("Reloading active application {}_{} merged shuffle
files paths",
+ appAttemptId.appId, appAttemptId.attemptId);
+ appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+ new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId,
appPathsInfo));
Review comment:
Instead of `computeIfAbsent`, use `compute` and ensure that the newer
attempt id gets added (in case an earlier deletion did failed/etc).
Same for `reloadFinalizedAppAttemptsShuffleInfo` below as well (w.r.t
`shuffleMergeId`: there can be a race between `closeAndDeletePartitionFiles`,
`finalizeShuffleMerge` and NM shutdown).
I am assuming `reloadActiveAppAttemptsShufflePartitionInfo`,
`reloadPartitionInfo`, etc will get deleted if we are only tracking finalized
shuffles.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo
getOrCreateAppShufflePartitionInfo(
File metaFile =
appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId,
reduceId);
try {
- return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId,
shuffleMergeId,
- reduceId, dataFile, indexFile, metaFile);
+ AppShufflePartitionInfo appShufflePartitionInfo =
+ newAppShufflePartitionInfo(appShuffleInfo.appId,
appShuffleInfo.attemptId,
+ shuffleId, shuffleMergeId, reduceId, dataFile, indexFile,
metaFile);
+ if (db != null) {
Review comment:
Why not track only those which have been successfully finalized ?
That is, we dont track in progress pushes ... this should simply the code,
remove all changes to `AppShufflePartitionInfo`, remove
`APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX`, recovery, etc.
(Some of my other comments in this PR might not be relevant once this is
done btw).
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo
getOrCreateAppShufflePartitionInfo(
File metaFile =
appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId,
reduceId);
try {
- return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId,
shuffleMergeId,
- reduceId, dataFile, indexFile, metaFile);
+ AppShufflePartitionInfo appShufflePartitionInfo =
+ newAppShufflePartitionInfo(appShuffleInfo.appId,
appShuffleInfo.attemptId,
+ shuffleId, shuffleMergeId, reduceId, dataFile, indexFile,
metaFile);
+ if (db != null) {
+ try{
+ byte[] dbKey =
+ dbAppAttemptShufflePartitionKeyWithCustomPrefix(
Review comment:
nit: Add `getShufflePartitionDbKey` to be more clear ? (which delegates
to `dbAppAttemptShufflePartitionKeyWithCustomPrefix` if required). The
read/parse logic seems to be doing it with `parseDbApp*` methods already.
Same for other uses of `dbAppAttemptShufflePartitionKeyWithCustomPrefix` as
well.
Localize all string/serde manipulations to a small set of methods
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
Review comment:
nit: pull the `;` as a constant
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo
getOrCreateAppShufflePartitionInfo(
File metaFile =
appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId,
reduceId);
try {
- return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId,
shuffleMergeId,
- reduceId, dataFile, indexFile, metaFile);
+ AppShufflePartitionInfo appShufflePartitionInfo =
+ newAppShufflePartitionInfo(appShuffleInfo.appId,
appShuffleInfo.attemptId,
+ shuffleId, shuffleMergeId, reduceId, dataFile, indexFile,
metaFile);
+ if (db != null) {
+ try{
+ byte[] dbKey =
+ dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ appShufflePartitionInfo.appShufflePartitionId,
+ APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX);
+ // We can reload a partition's state solely from the partition ID.
Thus only the
+ // partition Ids are stored in levelDB.
+ db.put(dbKey, new byte[0]);
+ } catch (Exception e) {
+ logger.error("Error saving active app shuffle partition", e);
+ }
+ }
Review comment:
We have a bunch of
```
if (db != null) {
try {
byte[] key = generate key
db.put(key, value)
} catch (Exception ex) {
log message
}
}
```
sprinkled across this class.
Refactor these and move them to util methods ? So that the main flow is
clearer ?
```suggestion
writeShufflePartitionToDb(appShufflePartitionInfo.appShufflePartitionId)
```
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -355,8 +441,31 @@ void closeAndDeletePartitionFiles(Map<Integer,
AppShufflePartitionInfo> partitio
.forEach((partitionId, partitionInfo) -> {
synchronized (partitionInfo) {
partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+
cleanUpAppShufflePartitionInfoInDB(partitionInfo.appShufflePartitionId);
}
});
+
+ }
+
+ void cleanUpAppShufflePartitionInfoInDB(AppShufflePartitionId
appShufflePartitionId) {
+ if (db != null) {
+ try {
+ if (appShufflePartitionId.reduceId == -1) {
+ db.delete(dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ appShufflePartitionId,
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX));
+ } else {
+ db.delete(dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ appShufflePartitionId, APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX));
+ }
+ } catch (IOException e) {
+ logger.error("Error deleting {}_{} shuffleMergeId {} shuffle {}"
+ + "reduce {} from application shuffle merged partition info
db",
+ appShufflePartitionId.appId, appShufflePartitionId.attemptId,
+ appShufflePartitionId.shuffleMergeId,
+ appShufflePartitionId.shuffleId,
Review comment:
`shuffleId` before `shuffleMergeId` (here and in `updateChunkInfo` - not
sure if elsewhere as well).
Also, use `toString` in `AppShufflePartitionId` instead (here and elsewhere
we are logging) ?
##########
File path:
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
##########
@@ -296,7 +307,8 @@ static MergedShuffleFileManager
newMergedShuffleFileManagerInstance(TransportCon
mergeManagerImplClazz.asSubclass(MergedShuffleFileManager.class);
// The assumption is that all the custom implementations just like the
RemoteBlockPushResolver
// will also need the transport configuration.
- return
mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
+ return mergeManagerSubClazz.getConstructor(TransportConf.class,
File.class)
+ .newInstance(conf, mergeManagerFile);
Review comment:
Modify `NoOpMergedShuffleFileManager` to also support this constructor -
check other implementations as well if relevant.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ AppShufflePartitionId id,
+ String prefix) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+ String key = prefix + ";" + appAttemptShufflePartitionJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppShufflePartitionId
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ String s,
+ String prefix) throws IOException {
+ if (!s.startsWith(prefix)) {
+ throw new IllegalArgumentException("expected a string starting with " +
prefix);
+ }
+ String json = s.substring(prefix.length() + 1);
+ return mapper.readValue(json, AppShufflePartitionId.class);
+ }
+
+ private static byte[] dbAppAttemptPathsKey(String appId, int attemptId)
throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appAttemptIdJson = mapper.writeValueAsString(new
AppAttemptId(appId, attemptId));
+ String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws
IOException {
+ if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with "
+ + APP_ATTEMPT_PATH_KEY_PREFIX);
+ }
+ String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+ return mapper.readValue(json, AppAttemptId.class);
+ }
+
+ public void reloadAppShuffleInfo(DB db) throws IOException {
+ logger.debug("Reload applications merged shuffle information from DB");
+ reloadActiveAppAttemptsPathInfo(db);
+ reloadFinalizedAppAttemptsShuffleInfo(db);
+ reloadActiveAppAttemptsShufflePartitionInfo(db);
+ }
+
+ @VisibleForTesting
+ public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+ if (db != null) {
+ DBIterator itr = db.iterator();
+ itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+ break;
+ }
+ AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+ try{
+ AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(),
AppPathsInfo.class);
+ logger.debug("Reloading active application {}_{} merged shuffle
files paths",
+ appAttemptId.appId, appAttemptId.attemptId);
+ appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+ new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId,
appPathsInfo));
+ } catch (Exception exception) {
+ logger.error("Parsing exception is {}", exception);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void reloadFinalizedAppAttemptsShuffleInfo(DB db) throws IOException {
+ if (db != null) {
+ DBIterator itr = db.iterator();
+
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+ break;
+ }
+ AppShufflePartitionId partitionId =
+ parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+ key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+ if (partitionId.reduceId != -1) {
+ break;
+ }
+ AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+ if (appShuffleInfo != null) {
+ appShuffleInfo.shuffles.putIfAbsent(partitionId.shuffleId,
+ new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId,
true));
Review comment:
See comment in `reloadActiveAppAttemptsPathInfo` above.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1172,47 +1634,58 @@ private void finalizePartition() throws IOException {
// Get rid of any partial block data at the end of the file. This could
either
// be due to failure, or a request still being processed when the shuffle
// merge gets finalized, or any exceptions while updating index/meta
files.
- dataChannel.truncate(lastChunkOffset);
+ logger.trace("{}_{} shuffleId {} shuffleMergeId {} reduceId {} "
+ + "truncating files data {} index {} meta {}",
appShufflePartitionId.appId,
+ appShufflePartitionId.attemptId, appShufflePartitionId.shuffleId,
+ appShufflePartitionId.shuffleMergeId, appShufflePartitionId.reduceId,
+ lastChunkOffset, indexFile.getPos(), metaFile.getPos());
+ dataFile.getChannel().truncate(lastChunkOffset);
indexFile.getChannel().truncate(indexFile.getPos());
metaFile.getChannel().truncate(metaFile.getPos());
}
void closeAllFilesAndDeleteIfNeeded(boolean delete) {
try {
- if (dataChannel.isOpen()) {
- dataChannel.close();
- if (delete) {
- dataFile.delete();
- }
+ dataFile.close();
+ if (delete) {
+ dataFile.delete();
Review comment:
There is a change in behavior here - we are deleting even if file was
not open.
If we are simplifying to only tracking finalized merges, we remove the need
for this change as well and preserve earlier behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]