zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r898238727
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -343,15 +391,52 @@ void closeAndDeletePartitionFilesIfNeeded(
if (cleanupLocalDirs) {
deleteExecutorDirs(appShuffleInfo);
}
+ if (removeFromDb){
+ removeAppShuffleInfoFromDB(appShuffleInfo);
+ }
+ }
+
+ /**
+ * Remove the application attempt local paths information from the DB.
+ * @param appAttemptId
+ */
+ private void removeAppAttemptPathInfoFromDB(AppAttemptId appAttemptId) {
+ if (db != null) {
+ try {
+ db.delete(getDbAppAttemptPathsKey(appAttemptId));
+ } catch (Exception e) {
+ logger.error("Error deleting {} from application paths info in DB",
appAttemptId, e);
+ }
+ }
+ }
+
+ /**
+ * Remove the finalized shuffle partitions information for an application
attempt from the DB
+ * @param appShuffleInfo
+ */
+ private void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
+ if (db != null) {
+ appShuffleInfo.shuffles
+ .forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions
+ .forEach((shuffleMergeId, partitionInfo) ->
+ removeAppShufflePartitionInfoFromDB(
+ new AppAttemptShuffleMergeId(
+ appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
shuffleMergeId))
Review Comment:
Nice catch, updated.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +771,206 @@ public void registerExecutor(String appId,
ExecutorShuffleInfo executorInfo) {
}
}
+ /**
+ * Close the DB during shutdown
+ */
+ @Override
+ public void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered app paths info
and "
+ + "shuffle partition info", e);
+ }
+ }
+ }
+
+ /**
+ * Write the application attempt's local path information to the DB
+ */
+ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo
appPathsInfo) {
+ if (db != null) {
+ try {
+ byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId,
attemptId));
+ String valueStr = mapper.writeValueAsString(appPathsInfo);
+ byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+ db.put(key, value);
+ } catch (Exception e) {
+ logger.error("Error saving registered app paths info", e);
+ }
+ }
+ }
+
+ /**
+ * Write the finalized shuffle merge partition information into the DB
+ */
+ private void writeAppAttemptShuffleMergeInfoToDB(
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+ if (db != null) {
+ // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+ try{
+ byte[] dbKey =
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+ db.put(dbKey, new byte[0]);
+ } catch (Exception e) {
+ logger.error("Error saving active app shuffle partition", e);
+ }
+ }
+
+ }
+
+ /**
+ * Parse the DB key with the prefix and the expected return value type
+ */
+ private <T> T parseDbKey(String key, String prefix, Class<T> valueType)
throws IOException {
+ String json = key.substring(prefix.length() + 1);
+ return mapper.readValue(json, valueType);
+ }
+
+ /**
+ * Generate AppAttemptId from the DB key
+ */
+ private AppAttemptId parseDbAppAttemptPathsKey(String key) throws
IOException {
+ return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+ }
+
+ /**
+ * Generate AppAttemptShuffleMergeId from the DB key
+ */
+ private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+ String key) throws IOException {
+ return parseDbKey(
+ key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX,
AppAttemptShuffleMergeId.class);
+ }
+
+ /**
+ * Generate the DB key with the key object and the specified string prefix
+ */
+ private byte[] getDbKey(Object key, String prefix) {
+ // We add a common prefix on all the keys so we can find them in the DB
+ try {
+ String keyJsonString = prefix + DB_KEY_DELIMITER +
mapper.writeValueAsString(key);
+ return keyJsonString.getBytes(StandardCharsets.UTF_8);
+ } catch (Exception exception) {
+ logger.error("Exception while generating the DB key {}", key);
+ return null;
+ }
Review Comment:
Refactored
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]