[ 
https://issues.apache.org/jira/browse/YARN-11469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715296#comment-17715296
 ] 

ASF GitHub Bot commented on YARN-11469:
---------------------------------------

slfan1989 commented on code in PR #5570:
URL: https://github.com/apache/hadoop/pull/5570#discussion_r1174424520


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
+
+  private Cache<Object, Object> cache;
+
+  private int cacheTimeToLive;
+
+  private boolean isCachingEnabled = false;
+
+  private FederationStateStore stateStore;
+
+  private String className = getClass().getSimpleName();
+
+  @Override
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  @Override
+  public void initCache(Configuration conf, FederationStateStore pStateStore) {
+    // Picking the JCache provider from classpath, need to make sure there's
+    // no conflict or pick up a specific one in the future
+    cacheTimeToLive = 
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    this.stateStore = pStateStore;
+    if (cacheTimeToLive > 0) {
+      CachingProvider jcacheProvider = Caching.getCachingProvider();
+      CacheManager jcacheManager = jcacheProvider.getCacheManager();
+      this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+      if (this.cache == null) {
+        String className = this.getClass().getSimpleName();
+        LOG.info("Creating a JCache Manager with name {}.", className);
+        Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+        FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
+        FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>> 
cacheLoaderSingletonFactory =
+            new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+        CompleteConfiguration<Object, Object> configuration =
+            new MutableConfiguration<>().setStoreByValue(false)
+            .setReadThrough(true)
+            .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+            .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+        this.cache = jcacheManager.createCache(className, configuration);
+      }
+      isCachingEnabled = true;
+      return;
+    }
+    isCachingEnabled = false;
+  }
+
+  /**
+   * Internal class that implements the CacheLoader interface that can be
+   * plugged into the CacheManager to load objects into the cache for specified
+   * keys.
+   */
+  private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public V load(K key) throws CacheLoaderException {
+      try {
+        CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+        assert query != null;
+        return query.getValue();
+      } catch (Throwable ex) {
+        throw new CacheLoaderException(ex);
+      }
+    }
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+        throws CacheLoaderException {
+      // The FACADE does not use the Cache's getAll API. Hence this is not
+      // required to be implemented
+      throw new NotImplementedException("Code is not implemented");
+    }
+  }
+
+  /**
+   * Internal class that encapsulates the cache key and a function that returns
+   * the value for the specified key.
+   */
+  private class CacheRequest<K, V> {
+    private K key;
+    private Func<K, V> func;
+
+    CacheRequest(K key, Func<K, V> func) {
+      this.key = key;
+      this.func = func;
+    }
+
+    public V getValue() throws Exception {
+      return func.invoke(key);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((key == null) ? 0 : key.hashCode());
+      return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+      if (key == null) {
+        if (other.key != null) {
+          return false;
+        }
+      } else if (!key.equals(other.key)) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * Encapsulates a method that has one parameter and returns a value of the
+   * type specified by the TResult parameter.
+   */
+  protected interface Func<T, R> {
+    R invoke(T input) throws Exception;
+  }
+
+  @Override
+  public void clearCache() {
+    CachingProvider jcacheProvider = Caching.getCachingProvider();
+    CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+    jcacheManager.destroyCache(this.getClass().getSimpleName());
+    this.cache = null;
+  }
+
+  @Override
+  public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean 
filterInactiveSubClusters)
+      throws YarnException {
+    return (Map<SubClusterId, SubClusterInfo>) cache
+        .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+  }
+
+  @Override
+  public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+      throws YarnException {
+    return (Map<String, SubClusterPolicyConfiguration>) cache
+        .get(buildGetPoliciesConfigurationsCacheRequest());
+  }
+
+  @Override
+  public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) {
+    Object key = buildGetApplicationHomeSubClusterRequest(appId);
+    return (SubClusterId) cache.get(key);
+  }
+
+  @Override
+  public void removeSubCluster(boolean flushCache) {
+    cache.remove(buildGetSubClustersCacheRequest(flushCache));
+  }
+
+  private Object buildGetSubClustersCacheRequest(final boolean 
filterInactiveSubClusters) {
+    final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
+       Boolean.toString(filterInactiveSubClusters));
+    CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+        new CacheRequest<>(cacheKey,
+        key -> {

Review Comment:
   I will refactor this part of the code.





> Refactor FederationStateStoreFacade Cache Code
> ----------------------------------------------
>
>                 Key: YARN-11469
>                 URL: https://issues.apache.org/jira/browse/YARN-11469
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> The Cache of FederationStateStoreFacade uses JCache, but considering that 
> JCache is not a general Cache implementation (the latest version was released 
> in 2014), this part of the code is refactored to support multiple Cache in 
> the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to