[ 
https://issues.apache.org/jira/browse/YARN-11323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611790#comment-17611790
 ] 

ASF GitHub Bot commented on YARN-11323:
---------------------------------------

goiri commented on code in PR #4954:
URL: https://github.com/apache/hadoop/pull/4954#discussion_r984998385


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +397,137 @@ public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new 
Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+    LOG.info("CleanUpFinishApplicationThread has been started {}.", 
threadName);
+  }
+
+  /**
+   * Create a thread that cleans up the apps.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+      createCleanUpFinishApplication();
+    };
+  }
+
+  /**
+   * cleans up the apps.
+   */
+  private void createCleanUpFinishApplication() {
+    try {
+      // Get the current RM's App list based on subClusterId
+      GetApplicationsHomeSubClusterRequest request =
+              GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+      GetApplicationsHomeSubClusterResponse response =
+              getApplicationsHomeSubCluster(request);
+      List<ApplicationHomeSubCluster> applicationHomeSCs = 
response.getAppsHomeSubClusters();
+
+      // Traverse the app list and clean up the app.
+      long successCleanUpAppCount = 0;
+
+      // Save a local copy of the map so that it won't change with the map
+      Map<ApplicationId, RMApp> rmApps = new 
HashMap<>(this.rmContext.getRMApps());
+
+      // Need to make sure there is app list in RM memory.
+      if (rmApps != null && !rmApps.isEmpty()) {
+        for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) 
{
+          ApplicationId applicationId = applicationHomeSC.getApplicationId();
+          if (!rmApps.containsKey(applicationId)) {
+            try {
+              Boolean cleanUpSuccess =
+                      cleanUpFinishApplicationsWithRetries(applicationId, 
false);
+              if (cleanUpSuccess) {
+                LOG.info("application = {} has been cleaned up successfully.", 
applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (Exception e) {
+              LOG.error("problem during application = {} cleanup.", 
applicationId, e);
+            }
+          }
+        }
+      }
+
+      // print app cleanup log
+      LOG.info("cleanup finished applications size = {}, number = {} 
successful cleanup.",
+              applicationHomeSCs.size(), successCleanUpAppCount);
+    } catch (Exception e) {
+      LOG.error("problem during cleanup applications.", e);
+    }
+  }
+
+  /**
+   * Clean up the federation completed Application.
+   *
+   * @param applicationId app id.
+   * @param isQuery true, need to query from statestore ; false not query.
+   * @throws Exception exception occurs.

Review Comment:
   javadoc return warning



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.retry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FederationActionRetry<T> {
+
+  public static final Logger LOG =

Review Comment:
   Single line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java:
##########
@@ -4061,6 +4061,16 @@ public static boolean isAclEnabled(Configuration conf) {
 
   public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 
1000;
 
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-count";
+
+  public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 
1;
+
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";
+
+  public static final long 
DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = 1000;

Review Comment:
   TimeUnit.SECONDS.toMillis(1);



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml:
##########
@@ -3727,6 +3727,26 @@
     <value>yarnfederation/</value>
   </property>
 
+  <property>
+    <description>
+      The number of retries to clear the app in the FederationStateStore,
+      the default value is 1, that is, after the app fails to clean up, it 
will retry the cleanup again.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-count</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+      Clear the sleep time of App retry in FederationStateStore.
+      When the app fails to clean up,
+      it will sleep for a period of time and then try to clean up.
+      The default value is 1000ms.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
+    <value>1000ms</value>

Review Comment:
   1s



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java:
##########
@@ -207,4 +233,254 @@ public void 
testFederationStateStoreServiceInitialHeartbeatDelay() throws Except
         "Started federation membership heartbeat with interval: 300 and 
initial delay: 10"));
     rm.stop();
   }
+
+  @Test
+  public void testCleanUpApplication() throws Exception {
+
+    // set yarn configuration
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 
10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+    // set up MockRM
+    final MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    rm.start();
+
+    // init subCluster Heartbeat,
+    // and check that the subCluster is in a running state
+    FederationStateStoreService stateStoreService =
+        rm.getFederationStateStoreService();
+    FederationStateStoreHeartbeat storeHeartbeat =
+        stateStoreService.getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    checkSubClusterInfo(SubClusterState.SC_RUNNING);
+
+    // generate an application and join the [SC-1] cluster
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    addApplication2StateStore(appId, stateStore);
+
+    // make sure the app can be queried in the stateStore
+    GetApplicationHomeSubClusterRequest appRequest =
+         GetApplicationHomeSubClusterRequest.newInstance(appId);
+    GetApplicationHomeSubClusterResponse response =
+         stateStore.getApplicationHomeSubCluster(appRequest);
+    Assert.assertNotNull(response);
+    ApplicationHomeSubCluster appHomeSubCluster = 
response.getApplicationHomeSubCluster();
+    Assert.assertNotNull(appHomeSubCluster);
+    Assert.assertNotNull(appHomeSubCluster.getApplicationId());
+    Assert.assertEquals(appId, appHomeSubCluster.getApplicationId());
+
+    // clean up the app.
+    boolean cleanUpResult =
+        stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+    Assert.assertTrue(cleanUpResult);
+
+    // after clean, the app can no longer be queried from the stateStore.
+    LambdaTestUtils.intercept(FederationStateStoreException.class,
+        "Application " + appId + " does not exist",
+        () -> stateStore.getApplicationHomeSubCluster(appRequest));
+
+  }
+
+  @Test
+  public void testCleanUpApplicationWhenRMStart() throws Exception {
+
+    // We design such a test case.
+    // Step1. We add app01, app02, app03 to the stateStore,
+    // But these apps are not in RM's RMContext, they are finished apps
+    // Step2. We simulate RM startup, there is only app04 in RMContext.
+    // Step3. We wait for 5 seconds, the automatic cleanup thread should clean 
up finished apps.
+
+    // set yarn configuration.
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 
10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+
+    // set up MockRM.
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+
+    // generate an [app01] and join the [SC-1] cluster.
+    List<ApplicationId> appIds = new ArrayList();

Review Comment:
   Type



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +397,137 @@ public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new 
Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+    LOG.info("CleanUpFinishApplicationThread has been started {}.", 
threadName);
+  }
+
+  /**
+   * Create a thread that cleans up the apps.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+      createCleanUpFinishApplication();
+    };
+  }
+
+  /**
+   * cleans up the apps.
+   */
+  private void createCleanUpFinishApplication() {
+    try {
+      // Get the current RM's App list based on subClusterId
+      GetApplicationsHomeSubClusterRequest request =
+              GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+      GetApplicationsHomeSubClusterResponse response =
+              getApplicationsHomeSubCluster(request);
+      List<ApplicationHomeSubCluster> applicationHomeSCs = 
response.getAppsHomeSubClusters();
+
+      // Traverse the app list and clean up the app.
+      long successCleanUpAppCount = 0;
+
+      // Save a local copy of the map so that it won't change with the map
+      Map<ApplicationId, RMApp> rmApps = new 
HashMap<>(this.rmContext.getRMApps());
+
+      // Need to make sure there is app list in RM memory.
+      if (rmApps != null && !rmApps.isEmpty()) {
+        for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) 
{
+          ApplicationId applicationId = applicationHomeSC.getApplicationId();
+          if (!rmApps.containsKey(applicationId)) {
+            try {
+              Boolean cleanUpSuccess =
+                      cleanUpFinishApplicationsWithRetries(applicationId, 
false);
+              if (cleanUpSuccess) {
+                LOG.info("application = {} has been cleaned up successfully.", 
applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (Exception e) {
+              LOG.error("problem during application = {} cleanup.", 
applicationId, e);
+            }
+          }
+        }
+      }
+
+      // print app cleanup log
+      LOG.info("cleanup finished applications size = {}, number = {} 
successful cleanup.",
+              applicationHomeSCs.size(), successCleanUpAppCount);
+    } catch (Exception e) {
+      LOG.error("problem during cleanup applications.", e);
+    }
+  }
+
+  /**
+   * Clean up the federation completed Application.
+   *
+   * @param applicationId app id.
+   * @param isQuery true, need to query from statestore ; false not query.
+   * @throws Exception exception occurs.
+   */
+  public boolean cleanUpFinishApplicationsWithRetries(ApplicationId 
applicationId, boolean isQuery)
+      throws Exception {
+
+    // Generate a request to delete data
+    DeleteApplicationHomeSubClusterRequest delRequest =
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId);
+
+    return new FederationActionRetry<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {

Review Comment:
   Can this be written as a lambda?





> [Federation] Improve Router Handler FinishApps
> ----------------------------------------------
>
>                 Key: YARN-11323
>                 URL: https://issues.apache.org/jira/browse/YARN-11323
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation, router, yarn
>    Affects Versions: 3.4.0
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to