[ 
https://issues.apache.org/jira/browse/YARN-11469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714320#comment-17714320
 ] 

ASF GitHub Bot commented on YARN-11469:
---------------------------------------

goiri commented on code in PR #5570:
URL: https://github.com/apache/hadoop/pull/5570#discussion_r1171881329


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class FederationCache {
+
+  protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
+  protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
+      "getPoliciesConfigurations";
+  protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
+      "getApplicationHomeSubCluster";
+
+  public abstract boolean isCachingEnabled();
+
+  public abstract void initCache(Configuration conf, FederationStateStore 
stateStore);
+
+  public abstract void clearCache();
+
+  protected String buildCacheKey(String typeName, String methodName, String 
argName) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(typeName).append(".").append(methodName);
+    if (argName != null) {
+      buffer.append("::");

Review Comment:
   Make a constant.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+

Review Comment:
   Header



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)

Review Comment:
   Split lines



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)
+            .setReadThrough(true)
+            .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+            .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+        this.cache = jcacheManager.createCache(className, configuration);
+      }
+      isCachingEnabled = true;
+      return;
+    }
+    isCachingEnabled = false;
+  }
+
+  /**
+   * Internal class that implements the CacheLoader interface that can be
+   * plugged into the CacheManager to load objects into the cache for specified
+   * keys.
+   */
+  private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public V load(K key) throws CacheLoaderException {
+      try {
+        CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+        assert query != null;
+        return query.getValue();
+      } catch (Throwable ex) {
+        throw new CacheLoaderException(ex);
+      }
+    }
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+        throws CacheLoaderException {
+      // The FACADE does not use the Cache's getAll API. Hence this is not
+      // required to be implemented
+      throw new NotImplementedException("Code is not implemented");
+    }
+  }
+
+  /**
+   * Internal class that encapsulates the cache key and a function that returns
+   * the value for the specified key.
+   */
+  private class CacheRequest<K, V> {
+    private K key;
+    private Func<K, V> func;
+
+    CacheRequest(K key, Func<K, V> func) {
+      this.key = key;
+      this.func = func;
+    }
+
+    public V getValue() throws Exception {
+      return func.invoke(key);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((key == null) ? 0 : key.hashCode());
+      return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+      if (key == null) {
+        if (other.key != null) {
+          return false;
+        }
+      } else if (!key.equals(other.key)) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * Encapsulates a method that has one parameter and returns a value of the
+   * type specified by the TResult parameter.
+   */
+  protected interface Func<T, R> {
+    R invoke(T input) throws Exception;
+  }
+
+  @Override
+  public void clearCache() {
+    CachingProvider jcacheProvider = Caching.getCachingProvider();
+    CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+    jcacheManager.destroyCache(this.getClass().getSimpleName());
+    this.cache = null;
+  }
+
+  @Override
+  public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean 
filterInactiveSubClusters)
+      throws YarnException {
+    return (Map<SubClusterId, SubClusterInfo>) cache
+        .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+  }
+
+  @Override
+  public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+      throws YarnException {
+    return (Map<String, SubClusterPolicyConfiguration>) cache
+        .get(buildGetPoliciesConfigurationsCacheRequest());
+  }
+
+  @Override
+  public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) {
+    Object key = buildGetApplicationHomeSubClusterRequest(appId);
+    return (SubClusterId) cache.get(key);
+  }
+
+  @Override
+  public void removeSubCluster(boolean flushCache) {
+    cache.remove(buildGetSubClustersCacheRequest(flushCache));
+  }
+
+  private Object buildGetSubClustersCacheRequest(final boolean 
filterInactiveSubClusters) {
+    final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
+       Boolean.toString(filterInactiveSubClusters));
+    CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {

Review Comment:
   We don't do anything with the key?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class FederationCache {
+
+  protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
+  protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
+      "getPoliciesConfigurations";
+  protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
+      "getApplicationHomeSubCluster";
+
+  public abstract boolean isCachingEnabled();
+
+  public abstract void initCache(Configuration conf, FederationStateStore 
stateStore);
+
+  public abstract void clearCache();
+
+  protected String buildCacheKey(String typeName, String methodName, String 
argName) {

Review Comment:
   Put documentation with examples of input and outputs.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {

Review Comment:
   Reverse the if and return.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)
+            .setReadThrough(true)
+            .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+            .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+        this.cache = jcacheManager.createCache(className, configuration);
+      }
+      isCachingEnabled = true;
+      return;
+    }
+    isCachingEnabled = false;
+  }
+
+  /**
+   * Internal class that implements the CacheLoader interface that can be
+   * plugged into the CacheManager to load objects into the cache for specified
+   * keys.
+   */
+  private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public V load(K key) throws CacheLoaderException {
+      try {
+        CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+        assert query != null;
+        return query.getValue();
+      } catch (Throwable ex) {
+        throw new CacheLoaderException(ex);
+      }
+    }
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+        throws CacheLoaderException {
+      // The FACADE does not use the Cache's getAll API. Hence this is not
+      // required to be implemented
+      throw new NotImplementedException("Code is not implemented");
+    }
+  }
+
+  /**
+   * Internal class that encapsulates the cache key and a function that returns
+   * the value for the specified key.
+   */
+  private class CacheRequest<K, V> {
+    private K key;
+    private Func<K, V> func;
+
+    CacheRequest(K key, Func<K, V> func) {
+      this.key = key;
+      this.func = func;
+    }
+
+    public V getValue() throws Exception {
+      return func.invoke(key);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((key == null) ? 0 : key.hashCode());
+      return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+      if (key == null) {
+        if (other.key != null) {
+          return false;
+        }
+      } else if (!key.equals(other.key)) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * Encapsulates a method that has one parameter and returns a value of the
+   * type specified by the TResult parameter.
+   */
+  protected interface Func<T, R> {
+    R invoke(T input) throws Exception;
+  }
+
+  @Override
+  public void clearCache() {
+    CachingProvider jcacheProvider = Caching.getCachingProvider();
+    CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+    jcacheManager.destroyCache(this.getClass().getSimpleName());
+    this.cache = null;
+  }
+
+  @Override
+  public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean 
filterInactiveSubClusters)
+      throws YarnException {
+    return (Map<SubClusterId, SubClusterInfo>) cache
+        .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+  }
+
+  @Override
+  public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+      throws YarnException {
+    return (Map<String, SubClusterPolicyConfiguration>) cache
+        .get(buildGetPoliciesConfigurationsCacheRequest());
+  }
+
+  @Override
+  public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) {
+    Object key = buildGetApplicationHomeSubClusterRequest(appId);
+    return (SubClusterId) cache.get(key);
+  }
+
+  @Override
+  public void removeSubCluster(boolean flushCache) {
+    cache.remove(buildGetSubClustersCacheRequest(flushCache));
+  }
+
+  private Object buildGetSubClustersCacheRequest(final boolean 
filterInactiveSubClusters) {
+    final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
+       Boolean.toString(filterInactiveSubClusters));
+    CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {
+          GetSubClustersInfoRequest request =
+              GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters);
+          GetSubClustersInfoResponse subClusters = 
stateStore.getSubClusters(request);
+          return buildSubClusterInfoMap(subClusters);
+        });
+    return cacheRequest;
+  }
+
+  private Object buildGetPoliciesConfigurationsCacheRequest() {
+    final String cacheKey = buildCacheKey(className,
+       GET_POLICIES_CONFIGURATIONS_CACHEID, null);
+    CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> 
cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {
+          GetSubClusterPoliciesConfigurationsRequest request =
+              GetSubClusterPoliciesConfigurationsRequest.newInstance();
+          GetSubClusterPoliciesConfigurationsResponse policyConfigs =
+              stateStore.getPoliciesConfigurations(request);
+          return buildPolicyConfigMap(policyConfigs);
+        });
+    return cacheRequest;
+  }
+
+  private Object buildGetApplicationHomeSubClusterRequest(ApplicationId 
applicationId) {
+    final String cacheKey = buildCacheKey(getClass().getSimpleName(),
+       GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
+    CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
+        cacheKey,
+            input -> {

Review Comment:
   Is the indentation correct?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)
+            .setReadThrough(true)
+            .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+            .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+        this.cache = jcacheManager.createCache(className, configuration);
+      }
+      isCachingEnabled = true;
+      return;
+    }
+    isCachingEnabled = false;
+  }
+
+  /**
+   * Internal class that implements the CacheLoader interface that can be
+   * plugged into the CacheManager to load objects into the cache for specified
+   * keys.
+   */
+  private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public V load(K key) throws CacheLoaderException {
+      try {
+        CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+        assert query != null;
+        return query.getValue();
+      } catch (Throwable ex) {
+        throw new CacheLoaderException(ex);
+      }
+    }
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+        throws CacheLoaderException {
+      // The FACADE does not use the Cache's getAll API. Hence this is not
+      // required to be implemented
+      throw new NotImplementedException("Code is not implemented");
+    }
+  }
+
+  /**
+   * Internal class that encapsulates the cache key and a function that returns
+   * the value for the specified key.
+   */
+  private class CacheRequest<K, V> {
+    private K key;
+    private Func<K, V> func;
+
+    CacheRequest(K key, Func<K, V> func) {
+      this.key = key;
+      this.func = func;
+    }
+
+    public V getValue() throws Exception {
+      return func.invoke(key);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;

Review Comment:
   Can we use the HashBuilder and the EqualsBuilder?





> Refactor FederationStateStoreFacade Cache Code
> ----------------------------------------------
>
>                 Key: YARN-11469
>                 URL: https://issues.apache.org/jira/browse/YARN-11469
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> The Cache of FederationStateStoreFacade uses JCache, but considering that 
> JCache is not a general Cache implementation (the latest version was released 
> in 2014), this part of the code is refactored to support multiple Cache in 
> the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to