[
https://issues.apache.org/jira/browse/YARN-11323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611280#comment-17611280
]
ASF GitHub Bot commented on YARN-11323:
---------------------------------------
goiri commented on code in PR #4954:
URL: https://github.com/apache/hadoop/pull/4954#discussion_r984059221
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java:
##########
@@ -1054,4 +1060,24 @@ private void copyPlacementQueueToSubmissionContext(
context.setQueue(placementContext.getQueue());
}
}
+
+ @VisibleForTesting
+ public void setFederationStateStoreService(FederationStateStoreService
stateStoreService) {
+ this.federationStateStoreService = stateStoreService;
+ }
+
+ private void removeApplicationIdFromStateStore(ApplicationId applicationId) {
+ if (HAUtil.isFederationEnabled(conf) && federationStateStoreService !=
null) {
+ try {
+ DeleteApplicationHomeSubClusterResponse response =
+
federationStateStoreService.cleanUpFinishApplicationsWithRetries(applicationId);
+ if (response != null) {
+ LOG.info("applicationId = {} remove from state store success.",
+ applicationId);
Review Comment:
Make the var appId and single line
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +382,104 @@ public RouterMasterKeyResponse
getMasterKeyByDelegationKey(RouterMasterKeyReques
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
+
+ /**
+ * Create a thread that cleans up the app.
+ * @param stage rm-start/rm-stop.
+ */
+ public void createCleanUpFinishApplicationThread(String stage) {
+ String threadName = cleanUpThreadNamePrefix + "-" + stage;
+ Thread finishApplicationThread = new
Thread(createCleanUpFinishApplicationThread());
+ finishApplicationThread.setName(threadName);
+ finishApplicationThread.start();
+ }
+
+ /**
+ * Create a thread that cleans up the app.
+ *
+ * @return thread object.
+ */
+ private Runnable createCleanUpFinishApplicationThread() {
+ return () -> {
+
+ try {
+ // Get the current RM's App list based on subClusterId
+ GetApplicationsHomeSubClusterRequest request =
+ GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+ GetApplicationsHomeSubClusterResponse response =
+ getApplicationsHomeSubCluster(request);
+ List<ApplicationHomeSubCluster> applications =
response.getAppsHomeSubClusters();
+
+ // Traverse the app list and clean up the app.
+ long successCleanUpAppCount = 0;
+ for (ApplicationHomeSubCluster application : applications) {
+ ApplicationId applicationId = application.getApplicationId();
+ if (!this.rmContext.getRMApps().containsKey(applicationId)) {
+ try {
+ DeleteApplicationHomeSubClusterResponse deleteResponse =
+ cleanUpFinishApplicationsWithRetries(applicationId);
+ if (deleteResponse != null) {
+ LOG.info("application = {} has been cleaned up successfully.",
applicationId);
+ successCleanUpAppCount++;
+ }
+ } catch (YarnException e) {
+ LOG.error("problem during application = {} cleanup.",
applicationId, e);
+ }
+ }
+ }
+
+ // print app cleanup log
+ LOG.info("cleanup finished applications size = {}, number = {}
successful cleanups.",
+ applications.size(), successCleanUpAppCount);
+ } catch (Exception e) {
+ LOG.error("problem during cleanup applications.", e);
+ }
+ };
+ }
+
+ /**
+ * Clean up the completed Application.
+ *
+ * @param applicationId app id.
+ * @return DeleteApplicationHomeSubClusterResponse.
+ * @throws Exception exception occurs.
+ */
+ public DeleteApplicationHomeSubClusterResponse
+ cleanUpFinishApplicationsWithRetries(ApplicationId applicationId) throws
Exception {
+ DeleteApplicationHomeSubClusterRequest request =
+ DeleteApplicationHomeSubClusterRequest.newInstance(applicationId);
+ return new
FederationStateStoreAction<DeleteApplicationHomeSubClusterResponse>() {
+ @Override
+ public DeleteApplicationHomeSubClusterResponse run() throws Exception {
+ return deleteApplicationHomeSubCluster(request);
+ }
+ }.runWithRetries();
+ }
+
+ /**
+ * Define an abstract class, abstract retry method,
+ * which can be used for other methods later.
+ *
+ * @param <T> abstract parameter
+ */
+ private abstract class FederationStateStoreAction<T> {
+ abstract T run() throws Exception;
+
+ T runWithRetries() throws Exception {
+ int retry = 0;
+ while (true) {
Review Comment:
Doesn't this exist?
> [Federation] Improve Router Handler FinishApps
> ----------------------------------------------
>
> Key: YARN-11323
> URL: https://issues.apache.org/jira/browse/YARN-11323
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: federation, router, yarn
> Affects Versions: 3.4.0
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]