[ 
https://issues.apache.org/jira/browse/YARN-11323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611280#comment-17611280
 ] 

ASF GitHub Bot commented on YARN-11323:
---------------------------------------

goiri commented on code in PR #4954:
URL: https://github.com/apache/hadoop/pull/4954#discussion_r984059221


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java:
##########
@@ -1054,4 +1060,24 @@ private void copyPlacementQueueToSubmissionContext(
       context.setQueue(placementContext.getQueue());
     }
   }
+
+  @VisibleForTesting
+  public void setFederationStateStoreService(FederationStateStoreService 
stateStoreService) {
+    this.federationStateStoreService = stateStoreService;
+  }
+
+  private void removeApplicationIdFromStateStore(ApplicationId applicationId) {
+    if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != 
null) {
+      try {
+        DeleteApplicationHomeSubClusterResponse response =
+            
federationStateStoreService.cleanUpFinishApplicationsWithRetries(applicationId);
+        if (response != null) {
+          LOG.info("applicationId = {} remove from state store success.",
+              applicationId);

Review Comment:
   Make the var appId and single line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +382,104 @@ public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new 
Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+  }
+
+  /**
+   * Create a thread that cleans up the app.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+
+      try {
+        // Get the current RM's App list based on subClusterId
+        GetApplicationsHomeSubClusterRequest request =
+            GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+        GetApplicationsHomeSubClusterResponse response =
+            getApplicationsHomeSubCluster(request);
+        List<ApplicationHomeSubCluster> applications = 
response.getAppsHomeSubClusters();
+
+        // Traverse the app list and clean up the app.
+        long successCleanUpAppCount = 0;
+        for (ApplicationHomeSubCluster application : applications) {
+          ApplicationId applicationId = application.getApplicationId();
+          if (!this.rmContext.getRMApps().containsKey(applicationId)) {
+            try {
+              DeleteApplicationHomeSubClusterResponse deleteResponse =
+                  cleanUpFinishApplicationsWithRetries(applicationId);
+              if (deleteResponse != null) {
+                LOG.info("application = {} has been cleaned up successfully.", 
applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (YarnException e) {
+              LOG.error("problem during application = {} cleanup.", 
applicationId, e);
+            }
+          }
+        }
+
+        // print app cleanup log
+        LOG.info("cleanup finished applications size = {}, number = {} 
successful cleanups.",
+            applications.size(), successCleanUpAppCount);
+      } catch (Exception e) {
+        LOG.error("problem during cleanup applications.", e);
+      }
+    };
+  }
+
+  /**
+   * Clean up the completed Application.
+   *
+   * @param applicationId app id.
+   * @return DeleteApplicationHomeSubClusterResponse.
+   * @throws Exception exception occurs.
+   */
+  public DeleteApplicationHomeSubClusterResponse
+      cleanUpFinishApplicationsWithRetries(ApplicationId applicationId) throws 
Exception {
+    DeleteApplicationHomeSubClusterRequest request =
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId);
+    return new 
FederationStateStoreAction<DeleteApplicationHomeSubClusterResponse>() {
+      @Override
+      public DeleteApplicationHomeSubClusterResponse run() throws Exception {
+        return deleteApplicationHomeSubCluster(request);
+      }
+    }.runWithRetries();
+  }
+
+  /**
+   * Define an abstract class, abstract retry method,
+   * which can be used for other methods later.
+   *
+   * @param <T> abstract parameter
+   */
+  private abstract class FederationStateStoreAction<T> {
+    abstract T run() throws Exception;
+
+    T runWithRetries() throws Exception {
+      int retry = 0;
+      while (true) {

Review Comment:
   Doesn't this exist?





> [Federation] Improve Router Handler FinishApps
> ----------------------------------------------
>
>                 Key: YARN-11323
>                 URL: https://issues.apache.org/jira/browse/YARN-11323
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation, router, yarn
>    Affects Versions: 3.4.0
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to