Repository: ignite Updated Branches: refs/heads/master 52712ff0f -> b3b5395c8
Minor refactoring in GridCacheProcessor descriptor handling. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eac7861d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eac7861d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eac7861d Branch: refs/heads/master Commit: eac7861d41106a3daf09f353bfd18e905278858c Parents: 9e0e7f3 Author: devozerov <voze...@gridgain.com> Authored: Fri Apr 14 12:49:57 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Apr 14 12:49:57 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 88 +++++++++++--------- 1 file changed, 50 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eac7861d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b81be12..7c7d45f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -73,7 +73,6 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; @@ -719,7 +718,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { String masked = maskNull(cfg.getName()); - if (registeredCaches.containsKey(masked)) { + if (cacheDescriptor(cfg.getName()) != null) { String cacheName = cfg.getName(); if (cacheName != null) @@ -758,7 +757,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.receivedFrom(ctx.localNodeId()); if (!template) { - registeredCaches.put(masked, desc); + cacheDescriptor(cfg.getName(), desc); ctx.discovery().setCacheFilter( cfg.getName(), @@ -831,7 +830,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { List<CacheConfiguration> tmpCacheCfg = new ArrayList<>(); for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { CacheConfiguration c = desc.cacheConfiguration(); IgnitePredicate filter = c.getNodeFilter(); @@ -864,7 +863,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.database().onKernalStart(false); // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { if (ctx.config().isDaemon()) continue; @@ -926,7 +925,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.service().onUtilityCacheStarted(); // Wait for caches in SYNC preload mode. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { CacheConfiguration cfg = desc.cacheConfiguration(); IgnitePredicate filter = cfg.getNodeFilter(); @@ -976,7 +975,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { log, null, n.id(), "deploymentMode", "Deployment mode", locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); if (rmtCfg != null) { @@ -1153,7 +1152,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert oldDesc != null : "No descriptor for cache: " + name; - DynamicCacheDescriptor newDesc = registeredCaches.get(maskNull(name)); + DynamicCacheDescriptor newDesc = cacheDescriptor(name); stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); } @@ -1720,7 +1719,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of started cache names. */ public Collection<String> cacheNames() { - return F.viewReadOnly(registeredCaches.values(), + return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() { @Override public String apply(DynamicCacheDescriptor desc) { return desc.cacheConfiguration().getName(); @@ -1775,7 +1774,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of currently started public cache names */ public Collection<String> publicCacheNames() { - return F.viewReadOnly(registeredCaches.values(), + return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() { @Override public String apply(DynamicCacheDescriptor desc) { return desc.cacheConfiguration().getName(); @@ -1796,7 +1795,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache mode. */ public CacheMode cacheMode(String cacheName) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + DynamicCacheDescriptor desc = cacheDescriptor(cacheName); return desc != null ? desc.cacheConfiguration().getCacheMode() : null; } @@ -1821,7 +1820,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { topVer ); - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); if (desc != null) desc.onStart(); @@ -1838,7 +1837,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { throws IgniteCheckedException { List<DynamicCacheDescriptor> started = null; - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) { if (desc.receivedFrom() != null) { AffinityTopologyVersion startVer = desc.receivedFromStartVersion(); @@ -2158,7 +2157,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param reqs requests. */ private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { // RequestId must be null because on different node will be different byte [] and // we get duplicate discovery data, for more details see // TcpDiscoveryNodeAddedMessage#addDiscoveryData. @@ -2166,11 +2165,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { null); req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); reqs.add(req); @@ -2210,11 +2206,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null); req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); reqs.add(req); @@ -2299,7 +2292,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { continue; } - DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); + DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName()); if (req.start() && !req.clientStartOnly()) { CacheConfiguration ccfg = req.startCacheConfiguration(); @@ -2307,7 +2300,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (existing != null) { if (joiningNodeId.equals(ctx.localNodeId())) { existing.receivedFrom(req.receivedFrom()); - existing.deploymentId(req.deploymentId()); } @@ -2340,7 +2332,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.receivedFrom(req.receivedFrom()); - DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); + DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc); assert old == null : old; @@ -2381,7 +2373,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name); if (!sysCache) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); if (desc != null && desc.deploymentId().equals(req.deploymentId())) { Map<UUID, Boolean> nodes = batch.clientNodes().get(name); @@ -2727,7 +2719,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size()); for (String cacheName : cacheNames) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + DynamicCacheDescriptor desc = cacheDescriptor(cacheName); if (desc == null) { log.warning("Reset lost partition will not be executed, " + @@ -2795,7 +2787,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( UUID.randomUUID(), cacheName, ctx.localNodeId()); - DynamicCacheDescriptor desc = registeredCaches.get(cacheName); + DynamicCacheDescriptor desc = cacheDescriptor(cacheName); req.deploymentId(desc.deploymentId()); req.stop(true); @@ -2866,7 +2858,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { if (req.stop() || req.close()) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); if (desc == null) // No-op. @@ -2946,7 +2938,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED) { - for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { + for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) { if (node.id().equals(cacheDesc.receivedFrom())) cacheDesc.receivedFromStartVersion(topVer); } @@ -3021,7 +3013,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { continue; } - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); DynamicCacheStartFuture fut = null; @@ -3057,7 +3049,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { startDesc.startTopologyVersion(newTopVer); - DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); + DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc); assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; @@ -3209,7 +3201,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) { if (!node.isClient()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + for (DynamicCacheDescriptor desc : cacheDescriptors()) { CacheConfiguration cfg = desc.cacheConfiguration(); if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { @@ -3530,7 +3522,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting public cache for name: " + name); - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); + DynamicCacheDescriptor desc = cacheDescriptor(name); if (desc == null) throw new IllegalArgumentException("Cache is not started: " + name); @@ -3576,7 +3568,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); - DynamicCacheDescriptor desc = registeredCaches.get(masked); + DynamicCacheDescriptor desc = cacheDescriptor(cacheName); if (desc != null && !desc.cacheType().userCache()) throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); @@ -3597,7 +3589,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache configuration. */ public CacheConfiguration cacheConfiguration(String name) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); + DynamicCacheDescriptor desc = cacheDescriptor(name); if (desc == null) throw new IllegalStateException("Cache doesn't exist: " + name); @@ -3606,6 +3598,27 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Get registered cache descriptor. + * + * @param name Name. + * @return Descriptor. + */ + public DynamicCacheDescriptor cacheDescriptor(String name) { + return registeredCaches.get(maskNull(name)); + } + + /** + * Put registered cache descriptor. + * + * @param name Name. + * @param desc Descriptor. + * @return Old descriptor (if any). + */ + private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) { + return registeredCaches.put(maskNull(name), desc); + } + + /** * @return Cache descriptors. */ public Collection<DynamicCacheDescriptor> cacheDescriptors() { @@ -3617,7 +3630,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache descriptor. */ @Nullable public DynamicCacheDescriptor cacheDescriptor(int cacheId) { - for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { + for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) { CacheConfiguration ccfg = cacheDesc.cacheConfiguration(); assert ccfg != null : cacheDesc; @@ -3761,7 +3774,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return {@code True} if specified cache is system, {@code false} otherwise. */ public boolean systemCache(@Nullable String name) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); + DynamicCacheDescriptor desc = cacheDescriptor(name); return desc != null && !desc.cacheType().userCache(); } @@ -4015,7 +4028,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean failIfExists, boolean failIfNotStarted ) throws IgniteCheckedException { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + DynamicCacheDescriptor desc = cacheDescriptor(cacheName); DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); @@ -4050,7 +4063,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(true); req.deploymentId(desc.deploymentId()); - req.startCacheConfiguration(descCfg); } }