IGNITE-9188 Fixed unvalid partition eviction if rebalance was cancelled by a topology event - Fixes #4578.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa112a32 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa112a32 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa112a32 Branch: refs/heads/ignite-9273 Commit: aa112a3255e5bcdc652ff2ca42e03deffb765dff Parents: dde936a Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Wed Aug 22 17:33:39 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Aug 22 17:35:22 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 17 +- .../GridCachePartitionExchangeManager.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 5 +- .../CacheDataLossOnPartitionMoveTest.java | 296 +++++++++++++++++++ .../CacheRentingStateRepairTest.java | 5 - .../junits/common/GridCommonAbstractTest.java | 20 ++ .../testsuites/IgniteCacheTestSuite7.java | 2 + 7 files changed, 333 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 41f7c63..0b448e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -2250,10 +2250,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (!owners.isEmpty() && !owners.contains(curPrimary)) curPrimary = owners.get(0); - if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { - if (aliveNodes.contains(curPrimary)) { - GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); + // If new assignment is empty preserve current ownership for alive nodes. + if (curPrimary != null && newPrimary == null) { + newNodes0 = new ArrayList<>(curNodes.size()); + + for (ClusterNode node : curNodes) { + if (aliveNodes.contains(node)) + newNodes0.add(node); + } + } + else if (curPrimary != null && !curPrimary.equals(newPrimary)) { + GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); + if (aliveNodes.contains(curPrimary)) { if (state != GridDhtPartitionState.OWNING) { newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, @@ -2263,8 +2272,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } else { - GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); - if (state != GridDhtPartitionState.OWNING) { for (int i = 1; i < curNodes.size(); i++) { ClusterNode curNode = curNodes.get(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d5eeaeb..9d88152 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -310,7 +310,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param cache Discovery data cache. */ private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) { - // Clean local join caches context. + // Clear local join caches context. cctx.cache().localJoinCachesContext(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index fbd3264..516dfb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -505,7 +505,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.equals(this.exchId); this.exchId.discoveryEvent(discoEvt); - this.firstDiscoEvt= discoEvt; + this.firstDiscoEvt = discoEvt; this.firstEvtDiscoCache = discoCache; evtLatch.countDown(); @@ -3840,9 +3840,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte crd0 = crd; - if (crd0 == null) { + if (crd0 == null) finishState = new FinishState(null, initialVersion(), null); - } } if (crd0 == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDataLossOnPartitionMoveTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDataLossOnPartitionMoveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDataLossOnPartitionMoveTest.java new file mode 100644 index 0000000..2a99271 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDataLossOnPartitionMoveTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.testframework.GridTestNode; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + +/** + * + */ +public class CacheDataLossOnPartitionMoveTest extends GridCommonAbstractTest { + /** */ + public static final long MB = 1024 * 1024L; + + /** */ + public static final String GRP_ATTR = "grp"; + + /** */ + public static final int GRIDS_CNT = 2; + + /** */ + public static final String EVEN_GRP = "event"; + + /** */ + public static final String ODD_GRP = "odd"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setPeerClassLoadingEnabled(true); + + Map<String, Object> attrs = new HashMap<>(); + + attrs.put(GRP_ATTR, grp(getTestIgniteInstanceIndex(igniteInstanceName))); + + cfg.setUserAttributes(attrs); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(50 * MB).setMaxSize(50 * MB)) + .setWalMode(WALMode.LOG_ONLY); + + cfg.setDataStorageConfiguration(memCfg); + + cfg.setCacheConfiguration(configuration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** + * @param name Name. + */ + private CacheConfiguration configuration(String name) { + return new CacheConfiguration(name). + setCacheMode(CacheMode.PARTITIONED). + setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL). + setBackups(1). + setRebalanceBatchSize(1). + setAffinity(new TestAffinityFunction().setPartitions(32)); + } + + /** + * @param idx Index. + */ + private String grp(int idx) { + return idx < GRIDS_CNT / 2 ? EVEN_GRP : ODD_GRP; + } + + /** + * @throws Exception if failed. + */ + public void testDataLossOnPartitionMove() throws Exception { + try { + Ignite ignite = startGridsMultiThreaded(GRIDS_CNT / 2, false); + + ignite.cluster().active(true); + + List<Integer> toCp = movingKeysAfterJoin(ignite, DEFAULT_CACHE_NAME, 1, + node -> ((GridTestNode)node).setAttribute(GRP_ATTR, ODD_GRP)); + + int blockPartId = ignite.affinity(DEFAULT_CACHE_NAME).partition(toCp.get(0)); + + awaitPartitionMapExchange(); + + int c = 0; + + for (int i = 0; i < 1000; i++) { + if (ignite.affinity(DEFAULT_CACHE_NAME).partition(i) == blockPartId) { + ignite.cache(DEFAULT_CACHE_NAME).put(i, i); + + c++; + } + } + + assertEquals(c, ignite.cache(DEFAULT_CACHE_NAME).size()); + + startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2); + + // Prevent rebalancing to new nodes. + for (Ignite ig0 : G.allGrids()) { + TestRecordingCommunicationSpi.spi(ig0).blockMessages((node, message) -> { + if (message instanceof GridDhtPartitionDemandMessage) { + assertTrue(node.order() <= GRIDS_CNT / 2); + + GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)message; + + return msg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME); + } + + return false; + }); + } + + ignite.cluster().setBaselineTopology(GRIDS_CNT); + + for (Ignite ig0 : G.allGrids()) { + if (ig0.cluster().localNode().order() <= GRIDS_CNT / 2) + continue; + + TestRecordingCommunicationSpi.spi(ig0).waitForBlocked(); + } + + assertEquals(c, ignite.cache(DEFAULT_CACHE_NAME).size()); + + int i = 0; + + while(i < GRIDS_CNT / 2) { + stopGrid(GRIDS_CNT / 2 + i); + + i++; + } + + awaitPartitionMapExchange(); + + for (Ignite ig : G.allGrids()) { + GridDhtLocalPartition locPart = dht(ig.cache(DEFAULT_CACHE_NAME)).topology().localPartition(blockPartId); + + assertNotNull(locPart); + + assertEquals("Unexpected state", OWNING, locPart.state()); + } + + startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2); + + awaitPartitionMapExchange(true, true, null); + + for (Ignite ig : G.allGrids()) { + GridDhtLocalPartition locPart = dht(ig.cache(DEFAULT_CACHE_NAME)).topology().localPartition(blockPartId); + + assertNotNull(locPart); + + switch ((String)ig.cluster().localNode().attribute(GRP_ATTR)) { + case EVEN_GRP: + assertEquals("Unexpected state", EVICTED, locPart.state()); + + break; + + case ODD_GRP: + assertEquals("Unexpected state", OWNING, locPart.state()); + + break; + + default: + fail(); + } + } + } + finally { + stopAllGrids(); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cleanPersistenceDir(); + } + + /** */ + public static class TestAffinityFunction extends RendezvousAffinityFunction { + /** */ + public TestAffinityFunction() { + } + + /** */ + public TestAffinityFunction(boolean exclNeighbors) { + super(exclNeighbors); + } + + /** */ + public TestAffinityFunction(boolean exclNeighbors, int parts) { + super(exclNeighbors, parts); + } + + /** */ + public TestAffinityFunction(int parts, + @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + super(parts, backupFilter); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + int parts = partitions(); + + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + Map<UUID, Collection<ClusterNode>> neighborhoodCache = isExcludeNeighbors() ? + GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; + + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + Map<Object, List<ClusterNode>> nodesByGrp = U.newHashMap(2); + + for (ClusterNode node : nodes) { + Object grp = node.attribute(GRP_ATTR); + + List<ClusterNode> grpNodes = nodesByGrp.get(grp); + + if (grpNodes == null) + nodesByGrp.put(grp, (grpNodes = new ArrayList<>())); + + grpNodes.add(node); + } + + boolean split = nodesByGrp.size() == 2; + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAssignment = assignPartition(i, split ? + nodesByGrp.get(i % 2 == 0 ? EVEN_GRP : ODD_GRP) : nodes, + affCtx.backups(), neighborhoodCache); + + assignments.add(partAssignment); + } + + return assignments; + } + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Integer.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java index dd117cd..83a590a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java @@ -31,7 +31,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -181,14 +180,10 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { cleanPersistenceDir(); - - U.IGNITE_TEST_FEATURES_ENABLED = true; } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { cleanPersistenceDir(); - - U.IGNITE_TEST_FEATURES_ENABLED = false; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index b373be9..8591f0c 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTask; +import org.apache.ignite.internal.util.lang.GridAbsClosure; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -109,6 +110,7 @@ import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; @@ -1178,6 +1180,21 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @return List of keys. */ protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size) { + return movingKeysAfterJoin(ign, cacheName, size, null); + } + + /** + * Return list of keys that are primary for given node on current topology, + * but primary node will change after new node will be added. + * + * @param ign Ignite. + * @param cacheName Cache name. + * @param size Number of keys. + * @param nodeInitializer Node initializer closure. + * @return List of keys. + */ + protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size, + @Nullable IgniteInClosure<ClusterNode> nodeInitializer) { assertEquals("Expected consistentId is set to node name", ign.name(), ign.cluster().localNode().consistentId()); ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); @@ -1186,6 +1203,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null); + if (nodeInitializer != null) + nodeInitializer.apply(fakeNode); + fakeNode.consistentId(getTestIgniteInstanceName(nodes.size())); nodes.add(fakeNode); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa112a32/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 2bd0861..3a35950 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAf import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest; import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest; @@ -96,6 +97,7 @@ public class IgniteCacheTestSuite7 extends TestSuite { suite.addTestSuite(CacheRentingStateRepairTest.class); suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class); + suite.addTestSuite(CacheDataLossOnPartitionMoveTest.class); return suite; }