This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d2afc5cf6f8 YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in 
ApplicationCleaner. (#6083) Contributed by Shilun Fan.
1d2afc5cf6f8 is described below

commit 1d2afc5cf6f816461b76ec2bdbab8209052cd129
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Sep 29 07:15:53 2023 +0800

    YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in 
ApplicationCleaner. (#6083) Contributed by Shilun Fan.
    
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../federation/utils/FederationRegistryClient.java | 60 +++++++++++++---------
 .../utils/TestFederationRegistryClient.java        | 27 ++++++++++
 .../server/globalpolicygenerator/GPGContext.java   |  5 ++
 .../globalpolicygenerator/GPGContextImpl.java      | 12 +++++
 .../GlobalPolicyGenerator.java                     | 21 ++++++++
 .../applicationcleaner/ApplicationCleaner.java     | 30 ++++++++---
 .../DefaultApplicationCleaner.java                 |  2 +
 .../TestDefaultApplicationCleaner.java             | 34 ++++++++++++
 8 files changed, 159 insertions(+), 32 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
index fa64188a608b..9e4d1e6ed0e8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.registry.client.api.BindFlags;
@@ -142,9 +143,7 @@ public class FederationRegistryClient {
       // Then update the subClusterTokenMap
       subClusterTokenMap.put(subClusterId, token);
     } catch (YarnException | IOException e) {
-      LOG.error(
-          "Failed writing AMRMToken to registry for subcluster " + 
subClusterId,
-          e);
+      LOG.error("Failed writing AMRMToken to registry for subcluster {}.", 
subClusterId, e);
     }
     return update;
   }
@@ -189,8 +188,7 @@ public class FederationRegistryClient {
 
         retMap.put(scId, amrmToken);
       } catch (Exception e) {
-        LOG.error("Failed reading registry key " + key
-            + ", skipping subcluster " + scId, e);
+        LOG.error("Failed reading registry key {}, skipping subcluster {}.",  
key, scId, e);
       }
     }
 
@@ -202,24 +200,39 @@ public class FederationRegistryClient {
   /**
    * Remove an application from registry.
    *
-   * @param appId application id
+   * @param appId application id.
    */
   public synchronized void removeAppFromRegistry(ApplicationId appId) {
+    removeAppFromRegistry(appId, false);
+  }
+
+  /**
+   * Remove an application from registry.
+   *
+   * @param appId application id
+   * @param ignoreMemoryState whether to ignore the memory data in terms of
+   *      known application
+   */
+  public synchronized void removeAppFromRegistry(ApplicationId appId,
+      boolean ignoreMemoryState) {
     Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
         this.appSubClusterTokenMap.get(appId);
-    LOG.info("Removing all registry entries for {}", appId);
-
-    if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
-      return;
+    if (!ignoreMemoryState) {
+      if (MapUtils.isEmpty(subClusterTokenMap)) {
+        return;
+      }
     }
+    LOG.info("Removing all registry entries for {}.", appId);
 
     // Lastly remove the application directory
     String key = getRegistryKey(appId, null);
     try {
       removeKeyRegistry(this.registry, this.user, key, true, true);
-      subClusterTokenMap.clear();
+      if (subClusterTokenMap != null) {
+        subClusterTokenMap.clear();
+      }
     } catch (YarnException e) {
-      LOG.error("Failed removing registry directory key " + key, e);
+      LOG.error("Failed removing registry directory key {}.", key, e);
     }
   }
 
@@ -247,7 +260,7 @@ public class FederationRegistryClient {
           }
         } catch (Throwable e) {
           if (throwIfFails) {
-            LOG.error("Registry resolve key " + key + " failed", e);
+            LOG.error("Registry resolve key {} failed.", key, e);
           }
         }
         return null;
@@ -271,7 +284,7 @@ public class FederationRegistryClient {
           return true;
         } catch (Throwable e) {
           if (throwIfFails) {
-            LOG.error("Registry remove key " + key + " failed", e);
+            LOG.error("Registry remove key {} failed.", key, e);
           }
         }
         return false;
@@ -300,7 +313,7 @@ public class FederationRegistryClient {
           return true;
         } catch (Throwable e) {
           if (throwIfFails) {
-            LOG.error("Registry write key " + key + " failed", e);
+            LOG.error("Registry write key {} failed.", key, e);
           }
         }
         return false;
@@ -317,18 +330,15 @@ public class FederationRegistryClient {
   private List<String> listDirRegistry(final RegistryOperations registryImpl,
       UserGroupInformation ugi, final String key, final boolean throwIfFails)
       throws YarnException {
-    List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
-      @Override
-      public List<String> run() {
-        try {
-          return registryImpl.list(key);
-        } catch (Throwable e) {
-          if (throwIfFails) {
-            LOG.error("Registry list key " + key + " failed", e);
-          }
+    List<String> result = ugi.doAs((PrivilegedAction<List<String>>) () -> {
+      try {
+        return registryImpl.list(key);
+      } catch (Throwable e) {
+        if (throwIfFails) {
+          LOG.error("Registry list key {} failed.", key, e);
         }
-        return null;
       }
+      return null;
     });
     if (result == null && throwIfFails) {
       throw new YarnException("Registry list key " + key + " failed");
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
index 42be851512af..cccfbb4613c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
@@ -87,4 +87,31 @@ public class TestFederationRegistryClient {
         this.registryClient.loadStateFromRegistry(appId).size());
   }
 
+  @Test
+  public void testRemoveWithMemoryState() {
+    ApplicationId appId1 = ApplicationId.newInstance(0, 0);
+    ApplicationId appId2 = ApplicationId.newInstance(0, 1);
+    String scId0 = "subcluster0";
+
+    this.registryClient.writeAMRMTokenForUAM(appId1, scId0, new Token<>());
+    this.registryClient.writeAMRMTokenForUAM(appId2, scId0, new Token<>());
+    Assert.assertEquals(2, this.registryClient.getAllApplications().size());
+
+    // Create a new client instance
+    this.registryClient =
+        new FederationRegistryClient(this.conf, this.registry, this.user);
+
+    this.registryClient.loadStateFromRegistry(appId2);
+    // Should remove app2
+    this.registryClient.removeAppFromRegistry(appId2, false);
+    Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+
+    // Should not remove app1 since memory state don't have it
+    this.registryClient.removeAppFromRegistry(appId1, false);
+    Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+
+    // Should remove app1
+    this.registryClient.removeAppFromRegistry(appId1, true);
+    Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
index 6b0a5a43112b..e54244d7133d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 
 /**
@@ -32,4 +33,8 @@ public interface GPGContext {
   GPGPolicyFacade getPolicyFacade();
 
   void setPolicyFacade(GPGPolicyFacade facade);
+
+  FederationRegistryClient getRegistryClient();
+
+  void setRegistryClient(FederationRegistryClient client);
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
index bb498448fae8..b14f50299018 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 
 /**
@@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext {
 
   private FederationStateStoreFacade facade;
   private GPGPolicyFacade policyFacade;
+  private FederationRegistryClient registryClient;
 
   @Override
   public FederationStateStoreFacade getStateStoreFacade() {
@@ -48,4 +50,14 @@ public class GPGContextImpl implements GPGContext {
   public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
     policyFacade = gpgPolicyfacade;
   }
+
+  @Override
+  public FederationRegistryClient getRegistryClient() {
+    return registryClient;
+  }
+
+  @Override
+  public void setRegistryClient(FederationRegistryClient client) {
+    registryClient = client;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index ba8ce856cdaa..7ea2f5f27277 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.security.AuthenticationFilterInitializer;
 import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.JvmPauseMonitor;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
@@ -81,6 +83,7 @@ public class GlobalPolicyGenerator extends CompositeService {
 
   // Federation Variables
   private GPGContext gpgContext;
+  private RegistryOperations registry;
 
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
@@ -123,6 +126,17 @@ public class GlobalPolicyGenerator extends 
CompositeService {
         new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
     this.gpgContext.setPolicyFacade(gpgPolicyFacade);
 
+    this.registry = FederationStateStoreFacade.createInstance(conf,
+        YarnConfiguration.YARN_REGISTRY_CLASS,
+        YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
+        RegistryOperations.class);
+    this.registry.init(conf);
+
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    FederationRegistryClient registryClient =
+        new FederationRegistryClient(conf, this.registry, user);
+    this.gpgContext.setRegistryClient(registryClient);
+
     this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
@@ -157,6 +171,8 @@ public class GlobalPolicyGenerator extends CompositeService 
{
 
     super.serviceStart();
 
+    this.registry.start();
+
     // Schedule SubClusterCleaner service
     Configuration config = getConfig();
     long scCleanerIntervalMs = config.getTimeDuration(
@@ -214,6 +230,11 @@ public class GlobalPolicyGenerator extends 
CompositeService {
 
   @Override
   protected void serviceStop() throws Exception {
+    if (this.registry != null) {
+      this.registry.stop();
+      this.registry = null;
+    }
+
     try {
       if (this.scheduledExecutorService != null
           && !this.scheduledExecutorService.isShutdown()) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
index cd3f7618558e..af0bd6184b79 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -27,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable {
 
   private Configuration conf;
   private GPGContext gpgContext;
+  private FederationRegistryClient registryClient;
 
   private int minRouterSuccessCount;
   private int maxRouterRetry;
@@ -56,6 +60,7 @@ public abstract class ApplicationCleaner implements Runnable {
 
     this.gpgContext = context;
     this.conf = config;
+    this.registryClient = context.getRegistryClient();
 
     String routerSpecString =
         this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
@@ -80,10 +85,9 @@ public abstract class ApplicationCleaner implements Runnable 
{
           + this.minRouterSuccessCount + " should be positive");
     }
 
-    LOG.info(
-        "Initialized AppCleaner with Router query with min success {}, "
-            + "max retry {}, retry interval {}",
-        this.minRouterSuccessCount, this.maxRouterRetry,
+    LOG.info("Initialized AppCleaner with Router query with min success {}, " +
+        "max retry {}, retry interval {}.", this.minRouterSuccessCount,
+        this.maxRouterRetry,
         DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
   }
 
@@ -100,9 +104,9 @@ public abstract class ApplicationCleaner implements 
Runnable {
   public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
     String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
 
-    LOG.info(String.format("Contacting router at: %s", webAppAddress));
-    AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", 
AppsInfo.class, conf,
-        DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+    LOG.info("Contacting router at: {}.", webAppAddress);
+    AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, 
RMWSConsts.APPS,
+        AppsInfo.class, conf, 
DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
 
     Set<ApplicationId> appSet = new HashSet<>();
     for (AppInfo appInfo : appsInfo.getApps()) {
@@ -148,6 +152,18 @@ public abstract class ApplicationCleaner implements 
Runnable {
         + " success Router queries after " + totalAttemptCount + " retries");
   }
 
+  protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
+    List<String> allApps = this.registryClient.getAllApplications();
+    LOG.info("Got {} existing apps in registry.", allApps.size());
+    for (String app : allApps) {
+      ApplicationId appId = ApplicationId.fromString(app);
+      if (!knownApps.contains(appId)) {
+        LOG.info("removing finished application entry for {}", app);
+        this.registryClient.removeAppFromRegistry(appId, true);
+      }
+    }
+  }
+
   @Override
   public abstract void run();
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
index 857d2e645d4c..5b2ff26fcfb4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -70,6 +70,8 @@ public class DefaultApplicationCleaner extends 
ApplicationCleaner {
           LOG.error("deleteApplicationHomeSubCluster failed at application 
{}.", appId, e);
         }
       }
+      // Clean up registry entries
+      cleanupAppRecordInRegistry(routerApps);
     } catch (Throwable e) {
       LOG.error("Application cleaner started at time {} fails. ", now, e);
     }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
index 2d63c48236fb..1e703b51960e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -24,15 +24,21 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
@@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner {
   private FederationStateStoreFacade facade;
   private ApplicationCleaner appCleaner;
   private GPGContext gpgContext;
+  private RegistryOperations registry;
+  private FederationRegistryClient registryClient;
 
   private List<ApplicationId> appIds;
   // The list of applications returned by mocked router
@@ -68,8 +76,18 @@ public class TestDefaultApplicationCleaner {
     facade = FederationStateStoreFacade.getInstance();
     facade.reinitialize(stateStore, conf);
 
+    registry = new FSRegistryOperationsService();
+    registry.init(conf);
+    registry.start();
+
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    registryClient = new FederationRegistryClient(conf, registry, user);
+    registryClient.cleanAllApplications();
+    Assert.assertEquals(0, registryClient.getAllApplications().size());
+
     gpgContext = new GPGContextImpl();
     gpgContext.setStateStoreFacade(facade);
+    gpgContext.setRegistryClient(registryClient);
 
     appCleaner = new TestableDefaultApplicationCleaner();
     appCleaner.init(conf, gpgContext);
@@ -87,7 +105,12 @@ public class TestDefaultApplicationCleaner {
       stateStore.addApplicationHomeSubCluster(
           AddApplicationHomeSubClusterRequest.newInstance(
               ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+
+      // Write some registry entries for the app
+      registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(),
+          new Token<AMRMTokenIdentifier>());
     }
+    Assert.assertEquals(3, registryClient.getAllApplications().size());
   }
 
   @After
@@ -96,6 +119,14 @@ public class TestDefaultApplicationCleaner {
       stateStore.close();
       stateStore = null;
     }
+    if (registryClient != null) {
+      registryClient.cleanAllApplications();
+      registryClient = null;
+    }
+    if (registry != null) {
+      registry.stop();
+      registry = null;
+    }
   }
 
   @Test
@@ -116,6 +147,9 @@ public class TestDefaultApplicationCleaner {
             .getApplicationsHomeSubCluster(
                 GetApplicationsHomeSubClusterRequest.newInstance())
             .getAppsHomeSubClusters().size());
+
+    // The known app should not be cleaned in registry
+    Assert.assertEquals(1, registryClient.getAllApplications().size());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to