Repository: ignite Updated Branches: refs/heads/master 7b9526b3c -> 6dc5804af
http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java new file mode 100644 index 0000000..c08128b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * + */ +public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final int PARTITIONS_CNT = 32; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(100L * 1024 * 1024)) + ); + + cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME) + .setBackups(2) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setIndexedTypes(Integer.class, Integer.class) + .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE) + .setAffinity(new RendezvousAffinityFunction(false, PARTITIONS_CNT)) + ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * Test that partition clearing doesn't block partitions map exchange. + * + * @throws Exception If failed. + */ + public void testPartitionClearingNotBlockExchange() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, "1"); + + IgniteEx ig = (IgniteEx) startGrids(3); + ig.cluster().active(true); + + // High number of keys triggers long partition eviction. + final int keysCount = 300_000; + + try (IgniteDataStreamer ds = ig.dataStreamer(CACHE_NAME)) { + log.info("Writing initial data..."); + + ds.allowOverwrite(true); + for (int k = 1; k <= keysCount; k++) { + ds.addData(k, k); + + if (k % 50_000 == 0) + log.info("Written " + k + " entities."); + } + + log.info("Writing initial data finished."); + } + + stopGrid(2); + + awaitPartitionMapExchange(); + + try (IgniteDataStreamer ds = ig.dataStreamer(CACHE_NAME)) { + log.info("Writing external data..."); + + ds.allowOverwrite(true); + for (int k = 1; k <= keysCount; k++) { + ds.addData(k, 2 * k); + + if (k % 50_000 == 0) + log.info("Written " + k + " entities."); + } + + log.info("Writing external data finished."); + } + + IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); + + forceCheckpoint(); + + GridCachePartitionExchangeManager exchangeManager = ig.cachex(CACHE_NAME).context().shared().exchange(); + + long topVer = exchangeManager.lastTopologyFuture().topologyVersion().topologyVersion(); + + startGrid(2); + + // Check that exchange future is completed and version is incremented + GridDhtPartitionsExchangeFuture fut1 = exchangeManager.lastTopologyFuture(); + + fut1.get(); + + Assert.assertEquals(topVer + 1, fut1.topologyVersion().topologyVersion()); + + // Check that additional exchange didn't influence on asynchronous partitions eviction. + boolean asyncClearingIsRunning = false; + for (int p = 0; p < PARTITIONS_CNT; p++) { + GridDhtLocalPartition part = grid(2).cachex(CACHE_NAME).context().topology().localPartition(p); + if (part != null && part.state() == GridDhtPartitionState.MOVING && part.isClearing()) { + asyncClearingIsRunning = true; + break; + } + } + + Assert.assertTrue("Async clearing is not running at the moment", asyncClearingIsRunning); + + // Check that stopping & starting node didn't break rebalance process. + stopGrid(1); + + startGrid(1); + + // Wait for rebalance on all nodes. + for (Ignite ignite : G.allGrids()) + ignite.cache(CACHE_NAME).rebalance().get(); + + // Check no data loss. + for (int k = 1; k <= keysCount; k++) { + Integer value = cache.get(k); + Assert.assertNotNull("Value for " + k + " is null", value); + Assert.assertEquals("Check failed for " + k + " " + value, 2 * k, (int) value); + } + } + + /** + * Test that partitions belong to affinity in state RENTING or EVICTED are correctly rebalanced. + * + * @throws Exception If failed. + */ + public void testCorrectRebalancingCurrentlyRentingPartitions() throws Exception { + IgniteEx ignite = (IgniteEx) startGrids(3); + ignite.cluster().active(true); + + // High number of keys triggers long partition eviction. + final int keysCount = 500_000; + + try (IgniteDataStreamer ds = ignite.dataStreamer(CACHE_NAME)) { + log.info("Writing initial data..."); + + ds.allowOverwrite(true); + for (int k = 1; k <= keysCount; k++) { + ds.addData(k, k); + + if (k % 50_000 == 0) + log.info("Written " + k + " entities."); + } + + log.info("Writing initial data finished."); + } + + startGrid(3); + + // Trigger partition eviction from other nodes. + resetBaselineTopology(); + + stopGrid(3); + + // Trigger evicting partitions rebalancing. + resetBaselineTopology(); + + // Emulate stopping grid during partition eviction. + stopGrid(1); + + // Started node should have partition in RENTING or EVICTED state. + startGrid(1); + + awaitPartitionMapExchange(); + + // Check no data loss. + for (int k = 1; k <= keysCount; k++) { + Integer value = (Integer) ignite.cache(CACHE_NAME).get(k); + Assert.assertNotNull("Value for " + k + " is null", value); + Assert.assertEquals("Check failed for " + k + " = " + value, k, (int) value); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 8139a36..89847b6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -31,10 +31,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -73,10 +76,15 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - CacheConfiguration ccfg1 = cacheConfiguration(cacheName); - ccfg1.setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE); - ccfg1.setBackups(1); - ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setConsistentId(gridName); + + CacheConfiguration ccfg1 = cacheConfiguration(cacheName) + .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE) + .setBackups(2) + .setRebalanceMode(CacheRebalanceMode.ASYNC) + .setIndexedTypes(Integer.class, Integer.class) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); CacheConfiguration ccfg2 = cacheConfiguration("indexed"); ccfg2.setBackups(1); @@ -165,6 +173,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb stopAllGrids(); cleanPersistenceDir(); + + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE); } /** @@ -401,7 +411,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb Ignite ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); IgniteCache<Integer, TestValue> cache = ignite.cache(cacheName); @@ -565,34 +575,55 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If failed */ public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, "1"); + final Ignite ig = startGrids(4); - ig.active(true); + ig.cluster().active(true); int k = 0; try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) { ds.allowOverwrite(true); - for (int k0 = k; k < k0 + 10_000; k++) + for (int k0 = k; k < k0 + 50_000; k++) ds.addData(k, k); } - for (int t = 0; t < 10; t++) { - IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() { - @Override public void run() { - try { - stopGrid(3); + for (int t = 0; t < 5; t++) { + int t0 = t; + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + stopGrid(3); + + // Clear checkpoint history to avoid rebalance from WAL. + forceCheckpoint(); + forceCheckpoint(); + U.sleep(500); // Wait for data load. + + IgniteEx ig0 = startGrid(3); + + U.sleep(2000); // Wait for node join. - IgniteEx ig0 = startGrid(3); + if (t0 % 2 == 1) { + stopGrid(2); awaitPartitionMapExchange(); - ig0.cache(cacheName).rebalance().get(); - } - catch (Exception e) { - throw new RuntimeException(e); + // Clear checkpoint history to avoid rebalance from WAL. + forceCheckpoint(); + forceCheckpoint(); + + startGrid(2); + + awaitPartitionMapExchange(); } + + ig0.cache(cacheName).rebalance().get(); + } + catch (Exception e) { + error("Unable to start/stop grid", e); + throw new RuntimeException(e); } }); @@ -600,16 +631,22 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb ds.allowOverwrite(true); while (!fut.isDone()) { - ds.addData(k, k); + int k0 = k; - k++; + for (;k < k0 + 3; k++) + ds.addData(k, k); U.sleep(1); } } + catch (Exception e) { + log.error("Unable to write data", e); + } fut.get(); + log.info("Checking data..."); + Map<Integer, Long> cntrs = new HashMap<>(); for (int g = 0; g < 4; g++) { @@ -623,7 +660,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb } for (int k0 = 0; k0 < k; k0++) - assertEquals(String.valueOf(k0), k0, ig0.cache(cacheName).get(k0)); + assertEquals(String.valueOf(k0) + " " + g, k0, ig0.cache(cacheName).get(k0)); } assertEquals(ig.affinity(cacheName).partitions(), cntrs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java index d396f52..656e1e2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java @@ -279,21 +279,6 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { /** * */ - private void forceCheckpoint() throws Exception { - for (Ignite ignite : G.allGrids()) { - if (ignite.cluster().localNode().isClient()) - continue; - - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() - .cache().context().database(); - - dbMgr.waitForCheckpoint("test"); - } - } - - /** - * - */ private static class LimitedSizeFileIO extends FileIODecorator { /** */ private final AtomicLong availableSpaceBytes; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index 7d899ba..5ae16d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -247,8 +247,6 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testRemovesArePreloadedIfHistoryIsAvailable() throws Exception { - System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); - int entryCnt = 10_000; IgniteEx ig0 = (IgniteEx) startGrids(2); @@ -260,15 +258,11 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { for (int k = 0; k < entryCnt; k++) cache.put(k, k); - forceCheckpoint(); - stopGrid(1); for (int k = 0; k < entryCnt; k += 2) cache.remove(k); - forceCheckpoint(); - IgniteEx ig1 = startGrid(1); IgniteCache<Integer, Integer> cache1 = ig1.cache("cache1"); @@ -448,19 +442,4 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { awaitPartitionMapExchange(); } - - /** - * - */ - private void forceCheckpoint() throws Exception { - for (Ignite ignite : G.allGrids()) { - if (ignite.cluster().localNode().isClient()) - continue; - - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() - .cache().context().database(); - - dbMgr.waitForCheckpoint("test"); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java index 287f8d0..7a50c0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheRebalanceMode; @@ -45,10 +46,12 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; @@ -147,7 +150,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { public void testWalTxSimple() throws Exception { Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); try { GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() @@ -185,7 +188,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); cache = ignite.cache(CACHE_NAME); @@ -222,7 +225,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { public void testWalRecoveryRemoves() throws Exception { Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); try { GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() @@ -273,7 +276,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); cache = ignite.cache(CACHE_NAME); @@ -307,15 +310,17 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { /** * @throws Exception if failed. */ - public void testRebalanceIterator() throws Exception { - extraCcfg = new CacheConfiguration(CACHE2_NAME); + public void testHistoricalRebalanceIterator() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); + + extraCcfg = new CacheConfiguration(CACHE_NAME + "2"); extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS)); Ignite ignite = startGrid(); - ignite.active(true); - try { + ignite.cluster().active(true); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() .cache().context().database(); @@ -325,7 +330,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { int entries = 25; IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME); - IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2_NAME); + IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE_NAME + "2"); for (int i = 0; i < entries; i++) { // Put to partition 0. @@ -346,40 +351,48 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { assertEquals((Integer)(i), cache2.get(i)); } - GridCacheContext<Object, Object> cctx = ((IgniteEx)ignite).context().cache().cache(CACHE_NAME).context(); - IgniteCacheOffheapManager offh = cctx.offheap(); - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + CacheGroupContext grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME)); + IgniteCacheOffheapManager offh = grp.offheap(); + AffinityTopologyVersion topVer = grp.affinity().lastVersion(); + + IgniteDhtDemandedPartitionsMap map; for (int i = 0; i < entries; i++) { - try (IgniteRebalanceIterator it = offh.rebalanceIterator(0, topVer, (long)i)) { - assertTrue("Not historical for iteration: " + i, it.historical()); + map = new IgniteDhtDemandedPartitionsMap(); + map.addHistorical(0, i, Long.MAX_VALUE, entries); + try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) { assertNotNull(it); + assertTrue("Not historical for iteration: " + i, it.historical(0)); + for (int j = i; j < entries; j++) { assertTrue("i=" + i + ", j=" + j, it.hasNextX()); CacheDataRow row = it.next(); - assertEquals(j * PARTS, (int)row.key().value(cctx.cacheObjectContext(), false)); - assertEquals(j * PARTS, (int)row.value().value(cctx.cacheObjectContext(), false)); + assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false)); + assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false)); } assertFalse(it.hasNext()); } - try (IgniteRebalanceIterator it = offh.rebalanceIterator(1, topVer, (long)i)) { + map = new IgniteDhtDemandedPartitionsMap(); + map.addHistorical(1, i, Long.MAX_VALUE, entries); + + try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) { assertNotNull(it); - assertTrue("Not historical for iteration: " + i, it.historical()); + assertTrue("Not historical for iteration: " + i, it.historical(1)); for (int j = i; j < entries; j++) { assertTrue(it.hasNextX()); CacheDataRow row = it.next(); - assertEquals(j * PARTS + 1, (int)row.key().value(cctx.cacheObjectContext(), false)); - assertEquals(j * PARTS + 1, (int)row.value().value(cctx.cacheObjectContext(), false)); + assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false)); + assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false)); } assertFalse(it.hasNext()); @@ -391,21 +404,24 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { // Check that iterator is valid after restart. ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); - cctx = ((IgniteEx)ignite).context().cache().cache(CACHE_NAME).context(); - offh = cctx.offheap(); - topVer = cctx.affinity().affinityTopologyVersion(); + grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME)); + offh = grp.offheap(); + topVer = grp.affinity().lastVersion(); for (int i = 0; i < entries; i++) { long start = System.currentTimeMillis(); - try (IgniteRebalanceIterator it = offh.rebalanceIterator(0, topVer, (long)i)) { + map = new IgniteDhtDemandedPartitionsMap(); + map.addHistorical(0, i, Long.MAX_VALUE, entries); + + try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) { long end = System.currentTimeMillis(); info("Time to get iterator: " + (end - start)); - assertTrue("Not historical for iteration: " + i, it.historical()); + assertTrue("Not historical for iteration: " + i, it.historical(0)); assertNotNull(it); @@ -416,8 +432,8 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { CacheDataRow row = it.next(); - assertEquals(j * PARTS, (int)row.key().value(cctx.cacheObjectContext(), false)); - assertEquals(j * PARTS, (int)row.value().value(cctx.cacheObjectContext(), false)); + assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false)); + assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false)); } end = System.currentTimeMillis(); @@ -427,18 +443,21 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { assertFalse(it.hasNext()); } - try (IgniteRebalanceIterator it = offh.rebalanceIterator(1, topVer, (long)i)) { + map = new IgniteDhtDemandedPartitionsMap(); + map.addHistorical(1, i, Long.MAX_VALUE, entries); + + try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) { assertNotNull(it); - assertTrue("Not historical for iteration: " + i, it.historical()); + assertTrue("Not historical for iteration: " + i, it.historical(1)); for (int j = i; j < entries; j++) { assertTrue(it.hasNextX()); CacheDataRow row = it.next(); - assertEquals(j * PARTS + 1, (int)row.key().value(cctx.cacheObjectContext(), false)); - assertEquals(j * PARTS + 1, (int)row.value().value(cctx.cacheObjectContext(), false)); + assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false)); + assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false)); } assertFalse(it.hasNext()); @@ -447,6 +466,8 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { } finally { stopAllGrids(); + + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); } } @@ -456,7 +477,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { public void testCheckpointHistory() throws Exception { Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); try { GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() @@ -500,7 +521,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { public void testWalAfterPreloading() throws Exception { Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); try { GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() @@ -524,7 +545,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); cache = ignite.cache(CACHE_NAME); @@ -548,7 +569,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { Ignite ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() .cache().context().database(); @@ -584,7 +605,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); ignite.cache(CACHE_NAME).put(1, new IndexedValue(0)); } @@ -624,7 +645,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { Ignite ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); if (pages != null) { List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME); @@ -682,7 +703,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { Ignite ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() .cache().context().database(); @@ -752,7 +773,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { Ignite ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME); IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME); @@ -788,7 +809,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest { ignite = startGrid(0); - ignite.active(true); + ignite.cluster().active(true); cache1 = ignite.cache(CACHE_NAME); cache2 = ignite.cache(CACHE2_NAME); http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index 3b9c27e..3b7bb89 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -463,5 +463,10 @@ public class PlatformCacheWriteMetricsTask extends ComputeTaskAdapter<Long, Obje @Override public long getRebalancingStartTime() { return 63; } + + /** {@inheritDoc} */ + @Override public long getRebalanceClearingPartitionsLeft() { + return 64; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 17c2502..71abb95 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; @@ -1852,4 +1853,41 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { assertEquals(desc.config().getGroupName(), desc0.config().getGroupName()); assertEquals(desc.caches(), desc0.caches()); } + + /** + * Forces checkpoint on all available nodes. + * + * @throws IgniteCheckedException If checkpoint was failed. + */ + protected void forceCheckpoint() throws IgniteCheckedException { + forceCheckpoint(G.allGrids()); + } + + /** + * Forces checkpoint on specified node. + * + * @param node Node to force checkpoint on it. + * @throws IgniteCheckedException If checkpoint was failed. + */ + protected void forceCheckpoint(Ignite node) throws IgniteCheckedException { + forceCheckpoint(Collections.singletonList(node)); + } + + /** + * Forces checkpoint on all specified nodes. + * + * @param nodes Nodes to force checkpoint on them. + * @throws IgniteCheckedException If checkpoint was failed. + */ + protected void forceCheckpoint(Collection<Ignite> nodes) throws IgniteCheckedException { + for (Ignite ignite : nodes) { + if (ignite.cluster().localNode().isClient()) + continue; + + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() + .cache().context().database(); + + dbMgr.waitForCheckpoint("test"); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 055f610..e9c5e76 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTransfor import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeChangingTopologyTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodePartitionsExchangeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheServerNodeConcurrentStart; +import org.apache.ignite.internal.processors.cache.distributed.dht.CachePartitionPartialCountersMapSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedOptimisticTransactionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedPreloadRestartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedPrimarySyncSelfTest; @@ -298,6 +299,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(CacheComparatorTest.class)); + suite.addTest(new TestSuite(CachePartitionPartialCountersMapSelfTest.class)); + suite.addTest(new TestSuite(IgniteReflectionFactorySelfTest.class)); return suite; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 503f39a..bfdf2fd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest; @@ -150,6 +151,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class); suite.addTestSuite(GridCacheRebalancingPartitionCountersTest.class); + suite.addTestSuite(GridCacheRebalancingWithAsyncClearingTest.class); // Test for byte array value special case. suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java index c94cbd2..a112969 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -113,8 +114,8 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT startGrid(1); - //This timeout value was chosen experimentally. Grid rebalancing should already be finished before time is out. - Thread.sleep(5000); + for (Ignite ig : G.allGrids()) + ig.cache(PERSON_CACHE).rebalance().get(); while (it.hasNext()) { it.next(); @@ -159,8 +160,8 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT // Close cursor. Partitions should be released now. qryCursor.close(); - //This timeout value was chosen experimentally. Grid rebalancing should already be finished before time is out. - Thread.sleep(5000); + for (Ignite ig : G.allGrids()) + ig.cache(PERSON_CACHE).rebalance().get(); assertEquals("Wrong result set size", partsFilled, cache.query(qry).getAll().size()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java index f9ecb0b..dc0e160 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetTest.java @@ -289,9 +289,7 @@ public class CacheBasedDatasetTest extends GridCommonAbstractTest { builder.append("\t\t") .append(String.format("| %3d |", partition.id())) .append(String.format(" %7s |", partition.state())) - .append(String.format(" %7s |", partition.reload())) .append(String.format(" %13s |", partition.reservations())) - .append(String.format(" %18s |", partition.shouldBeRenting())) .append(String.format(" %8s |", partition.primary(instanceState.topVer))) .append(String.format(" %16d |", partition.dataStore().fullSize())) .append("\n");