[
https://issues.apache.org/jira/browse/YARN-11469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715296#comment-17715296
]
ASF GitHub Bot commented on YARN-11469:
---------------------------------------
slfan1989 commented on code in PR #5570:
URL: https://github.com/apache/hadoop/pull/5570#discussion_r1174424520
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java:
##########
@@ -0,0 +1,263 @@
+package org.apache.hadoop.yarn.server.federation.cache;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class FederationJCache extends FederationCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FederationJCache.class);
+
+ private Cache<Object, Object> cache;
+
+ private int cacheTimeToLive;
+
+ private boolean isCachingEnabled = false;
+
+ private FederationStateStore stateStore;
+
+ private String className = getClass().getSimpleName();
+
+ @Override
+ public boolean isCachingEnabled() {
+ return isCachingEnabled;
+ }
+
+ @Override
+ public void initCache(Configuration conf, FederationStateStore pStateStore) {
+ // Picking the JCache provider from classpath, need to make sure there's
+ // no conflict or pick up a specific one in the future
+ cacheTimeToLive =
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+ YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+ this.stateStore = pStateStore;
+ if (cacheTimeToLive > 0) {
+ CachingProvider jcacheProvider = Caching.getCachingProvider();
+ CacheManager jcacheManager = jcacheProvider.getCacheManager();
+ this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+ if (this.cache == null) {
+ String className = this.getClass().getSimpleName();
+ LOG.info("Creating a JCache Manager with name {}.", className);
+ Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+ FactoryBuilder.SingletonFactory<ExpiryPolicy>
expiryPolicySingletonFactory =
+ new FactoryBuilder.SingletonFactory<>(new
CreatedExpiryPolicy(cacheExpiry));
+ FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>
cacheLoaderSingletonFactory =
+ new FactoryBuilder.SingletonFactory<>(new CacheLoaderImpl<>());
+ CompleteConfiguration<Object, Object> configuration =
+ new MutableConfiguration<>().setStoreByValue(false)
+ .setReadThrough(true)
+ .setExpiryPolicyFactory(expiryPolicySingletonFactory)
+ .setCacheLoaderFactory(cacheLoaderSingletonFactory);
+ this.cache = jcacheManager.createCache(className, configuration);
+ }
+ isCachingEnabled = true;
+ return;
+ }
+ isCachingEnabled = false;
+ }
+
+ /**
+ * Internal class that implements the CacheLoader interface that can be
+ * plugged into the CacheManager to load objects into the cache for specified
+ * keys.
+ */
+ private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public V load(K key) throws CacheLoaderException {
+ try {
+ CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+ assert query != null;
+ return query.getValue();
+ } catch (Throwable ex) {
+ throw new CacheLoaderException(ex);
+ }
+ }
+
+ @Override
+ public Map<K, V> loadAll(Iterable<? extends K> keys)
+ throws CacheLoaderException {
+ // The FACADE does not use the Cache's getAll API. Hence this is not
+ // required to be implemented
+ throw new NotImplementedException("Code is not implemented");
+ }
+ }
+
+ /**
+ * Internal class that encapsulates the cache key and a function that returns
+ * the value for the specified key.
+ */
+ private class CacheRequest<K, V> {
+ private K key;
+ private Func<K, V> func;
+
+ CacheRequest(K key, Func<K, V> func) {
+ this.key = key;
+ this.func = func;
+ }
+
+ public V getValue() throws Exception {
+ return func.invoke(key);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+ if (key == null) {
+ if (other.key != null) {
+ return false;
+ }
+ } else if (!key.equals(other.key)) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Encapsulates a method that has one parameter and returns a value of the
+ * type specified by the TResult parameter.
+ */
+ protected interface Func<T, R> {
+ R invoke(T input) throws Exception;
+ }
+
+ @Override
+ public void clearCache() {
+ CachingProvider jcacheProvider = Caching.getCachingProvider();
+ CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+ jcacheManager.destroyCache(this.getClass().getSimpleName());
+ this.cache = null;
+ }
+
+ @Override
+ public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean
filterInactiveSubClusters)
+ throws YarnException {
+ return (Map<SubClusterId, SubClusterInfo>) cache
+ .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+ }
+
+ @Override
+ public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+ throws YarnException {
+ return (Map<String, SubClusterPolicyConfiguration>) cache
+ .get(buildGetPoliciesConfigurationsCacheRequest());
+ }
+
+ @Override
+ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) {
+ Object key = buildGetApplicationHomeSubClusterRequest(appId);
+ return (SubClusterId) cache.get(key);
+ }
+
+ @Override
+ public void removeSubCluster(boolean flushCache) {
+ cache.remove(buildGetSubClustersCacheRequest(flushCache));
+ }
+
+ private Object buildGetSubClustersCacheRequest(final boolean
filterInactiveSubClusters) {
+ final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
+ Boolean.toString(filterInactiveSubClusters));
+ CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+ new CacheRequest<>(cacheKey,
+ key -> {
Review Comment:
I will refactor this part of the code.
> Refactor FederationStateStoreFacade Cache Code
> ----------------------------------------------
>
> Key: YARN-11469
> URL: https://issues.apache.org/jira/browse/YARN-11469
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: federation
> Affects Versions: 3.4.0
> Reporter: Shilun Fan
> Assignee: Shilun Fan
> Priority: Major
> Labels: pull-request-available
>
> The Cache of FederationStateStoreFacade uses JCache, but considering that
> JCache is not a general Cache implementation (the latest version was released
> in 2014), this part of the code is refactored to support multiple Cache in
> the future.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]