This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 00988d2 IGNITE-15283 Remove duplicated managing of CacheDataStore in offheap manager (#9317) 00988d2 is described below commit 00988d20af19485585e98e885c610a704640c083 Author: Maxim Muzafarov <mmu...@apache.org> AuthorDate: Sat Aug 21 15:30:09 2021 +0300 IGNITE-15283 Remove duplicated managing of CacheDataStore in offheap manager (#9317) --- .../util/GridCommandHandlerClusterByClassTest.java | 2 +- .../util/GridCommandHandlerIndexingUtils.java | 4 +- .../apache/ignite/util/GridCommandHandlerTest.java | 2 +- .../pagemem/store/IgnitePageStoreManager.java | 11 +- .../cache/IgniteCacheOffheapManager.java | 50 ++---- .../cache/IgniteCacheOffheapManagerImpl.java | 191 +++++++++------------ .../dht/preloader/GridDhtPartitionDemander.java | 8 +- .../dht/preloader/GridDhtPartitionSupplier.java | 2 +- .../dht/topology/GridDhtLocalPartition.java | 14 +- .../dht/topology/GridDhtPartitionTopologyImpl.java | 20 --- .../GridCacheDatabaseSharedManager.java | 9 +- .../cache/persistence/GridCacheOffheapManager.java | 94 ++++------ .../processors/cache/persistence/RowStore.java | 19 +- .../cache/persistence/checkpoint/Checkpointer.java | 2 +- .../persistence/file/FilePageStoreManager.java | 7 +- .../cache/persistence/metastorage/MetaStorage.java | 2 +- .../UpgradePendingTreeToPerPartitionTask.java | 2 +- .../cache/persistence/tree/BPlusTree.java | 7 + .../persistence/PendingTreeCorruptionTest.java | 5 +- .../RestorePartitionStateDuringCheckpointTest.java | 7 +- .../cache/persistence/db/IgnitePdsWithTtlTest.java | 6 +- .../persistence/pagemem/NoOpPageStoreManager.java | 7 +- .../snapshot/IgniteClusterSnapshotCheckTest.java | 9 +- .../wal/memtracker/PageMemoryTracker.java | 4 +- .../cache/query/CacheScanQueryFailoverTest.java | 2 +- 25 files changed, 185 insertions(+), 301 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java index cab4a62..762ac0c 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java @@ -914,7 +914,7 @@ public class GridCommandHandlerClusterByClassTest extends GridCommandHandlerClus U.log(log, dumpWithConflicts); // Non-persistent caches do not have counter conflicts - assertContains(log, dumpWithConflicts, "found 3 conflict partitions: [counterConflicts=1, " + + assertContains(log, dumpWithConflicts, "found 4 conflict partitions: [counterConflicts=2, " + "hashConflicts=2]"); } else diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java index 1775dfa..ad1c51f 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java @@ -202,9 +202,7 @@ public class GridCommandHandlerIndexingUtils { GridCacheContext<K, V> cacheCtx = internalCache.context(); - GridDhtLocalPartition dhtLocPart = cacheCtx.dht().topology().localPartition(partId); - - CacheDataStore cacheDataStore = cacheCtx.group().offheap().dataStore(dhtLocPart); + CacheDataStore cacheDataStore = cacheCtx.dht().topology().localPartition(partId).dataStore(); String delegate = "delegate"; if (hasField(cacheDataStore, delegate)) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 5551d11..22c5fcd 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -2294,7 +2294,7 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb if (fileNameMatcher.find()) { String dumpWithConflicts = new String(Files.readAllBytes(Paths.get(fileNameMatcher.group(1)))); - assertContains(log, dumpWithConflicts, "found 1 conflict partitions: [counterConflicts=0, " + + assertContains(log, dumpWithConflicts, "found 2 conflict partitions: [counterConflicts=1, " + "hashConflicts=1]"); } else diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 957a4a5..0b3b8a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -83,15 +83,6 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException; /** - * Callback called when a partition is created on the local node. - * - * @param grpId Cache group ID where the partition is being created. - * @param partId ID of the partition being created. - * @throws IgniteCheckedException If failed to handle partition create callback. - */ - public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException; - - /** * Callback called when a partition for the given cache is evicted from the local node. * After this callback is invoked, no data associated with the partition will be stored on disk. * @@ -100,7 +91,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh * @param tag Partition tag (growing 1-based partition file version). * @throws IgniteCheckedException If failed to handle partition destroy callback. */ - public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException; + public void truncate(int grpId, int partId, int tag) throws IgniteCheckedException; /** * Checks if partition store exists. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index d3e839b..bc08c15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -101,31 +101,6 @@ public interface IgniteCacheOffheapManager { ) throws IgniteCheckedException; /** - * Partition counter update callback. May be overridden by plugin-provided subclasses. - * - * @param part Partition. - * @param cntr Partition counter. - */ - public void onPartitionCounterUpdated(int part, long cntr); - - /** - * Initial counter will be updated on state restore only - * - * @param part Partition - * @param start Start. - * @param delta Delta. - */ - public void onPartitionInitialCounterUpdated(int part, long start, long delta); - - /** - * Partition counter provider. May be overridden by plugin-provided subclasses. - * - * @param part Partition ID. - * @return Last updated counter. - */ - public long lastUpdatedPartitionCounter(int part); - - /** * @param entry Cache entry. * @return Cached row, if available, null otherwise. * @throws IgniteCheckedException If failed. @@ -153,10 +128,10 @@ public interface IgniteCacheOffheapManager { public Iterable<CacheDataStore> cacheDataStores(); /** - * @param part Partition. - * @return Data store. + * @param part Local partition or {@code null} if a related cache group is <tt>LOCAL</tt>. + * @return Cache data store associated with given partition or the cache data store for a <tt>LOCAL</tt> cache group. */ - public CacheDataStore dataStore(GridDhtLocalPartition part); + public CacheDataStore dataStore(@Nullable GridDhtLocalPartition part); /** * @param store Data store. @@ -513,12 +488,12 @@ public interface IgniteCacheOffheapManager { /** * Store entries. * - * @param partId Partition number. + * @param part Local partition. * @param infos Entry infos. * @param initPred Applied to all created rows. Each row that not matches the predicate is removed. * @throws IgniteCheckedException If failed. */ - public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos, + public void storeEntries(GridDhtLocalPartition part, Iterator<GridCacheEntryInfo> infos, IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException; /** @@ -604,18 +579,12 @@ public interface IgniteCacheOffheapManager { public long cacheEntriesCount(int cacheId); /** - * @param part Partition. - * @return Number of entries. - */ - public long totalPartitionEntriesCount(int part); - - /** * Preload a partition. Must be called under partition reservation for DHT caches. * - * @param part Partition. + * @param pardId Partition id. * @throws IgniteCheckedException If failed. */ - public void preloadPartition(int part) throws IgniteCheckedException; + public void preloadPartition(int pardId) throws IgniteCheckedException; /** * @@ -1088,6 +1057,11 @@ public interface IgniteCacheOffheapManager { public void markDestroyed() throws IgniteCheckedException; /** + * @return {@code true} If marked as destroyed. + */ + public boolean destroyed(); + + /** * Clears all the records associated with logical cache with given ID. * * @param cacheId Cache ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 0283673..19f38a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -28,8 +28,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -120,6 +118,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -165,13 +164,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** */ protected IgniteLogger log; - /** */ + /** Cache data store for <tt>LOCAL</tt> caches only. */ private CacheDataStore locCacheDataStore; /** */ - protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>(); - - /** */ private PendingEntriesTree pendingEntries; /** */ @@ -311,25 +307,39 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** - * @param part Partition. - * @return Data store for given entry. + * @param cctx Cache context. + * @param key Key. + * @return Data store. */ - @Override public CacheDataStore dataStore(GridDhtLocalPartition part) { + @Nullable private CacheDataStore dataStore(GridCacheContext<?, ?> cctx, KeyCacheObject key) { if (grp.isLocal()) return locCacheDataStore; - else { - assert part != null; - return part.dataStore(); - } + return dataStore(cctx.affinity().partition(key), false); + } + + /** {@inheritDoc} */ + @Override public CacheDataStore dataStore(@Nullable GridDhtLocalPartition part) { + if (grp.isLocal()) + return locCacheDataStore; + + assert part != null; + + return part.dataStore(); } /** - * @param part Partition. - * @return Data store for given entry. + * @param partId Partition id. + * @param includeRenting {@code true} if includeRenting partitions must also be shown. + * @return Related partition cache data store or {@code null} if partition haven't been initialized. */ - public CacheDataStore dataStore(int part) { - return grp.isLocal() ? locCacheDataStore : partDataStores.get(part); + @Nullable private CacheDataStore dataStore(int partId, boolean includeRenting) { + if (grp.isLocal()) + return locCacheDataStore; + + GridDhtLocalPartition part = grp.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, includeRenting); + + return part == null ? null : part.dataStore(); } /** {@inheritDoc} */ @@ -343,35 +353,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public long totalPartitionEntriesCount(int p) { - if (grp.isLocal()) - return locCacheDataStore.fullSize(); - else { - GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); - - return part != null ? part.dataStore().fullSize() : 0; - } - } - - /** {@inheritDoc} */ - @Override public void preloadPartition(int p) throws IgniteCheckedException { + @Override public void preloadPartition(int partId) throws IgniteCheckedException { throw new IgniteCheckedException("Operation only applicable to caches with enabled persistence"); } - /** - * @param p Partition. - * @return Partition data. - */ - @Nullable private CacheDataStore partitionData(int p) { - if (grp.isLocal()) - return locCacheDataStore; - else { - GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); - - return part != null ? part.dataStore() : null; - } - } - /** {@inheritDoc} */ @Override public long cacheEntriesCount( int cacheId, @@ -398,14 +383,31 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public long cacheEntriesCount(int cacheId, int part) { - CacheDataStore store = partitionData(part); + CacheDataStore store = dataStore(part, true); return store == null ? 0 : store.cacheSize(cacheId); } + /** {@inheritDoc} */ + @Override public Iterable<CacheDataStore> cacheDataStores() { + return cacheDataStores(F.alwaysTrue()); + } + + /** + * @param filter Filtering predicate. + * @return Iterable over all existing cache data stores except which one is marked as <tt>destroyed</tt>. + */ + private Iterable<CacheDataStore> cacheDataStores( + IgnitePredicate<GridDhtLocalPartition> filter + ) { + return grp.isLocal() ? Collections.singletonList(locCacheDataStore) : + F.iterator(grp.topology().currentLocalPartitions(), GridDhtLocalPartition::dataStore, true, + filter, p -> !p.dataStore().destroyed()); + } + /** * @param primary Primary data flag. - * @param backup Primary data flag. + * @param backup Backup data flag. * @param topVer Topology version. * @return Data stores iterator. */ @@ -414,17 +416,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (grp.isLocal()) return singletonIterator(locCacheDataStore); - else { - Iterator<GridDhtLocalPartition> it = grp.topology().currentLocalPartitions().iterator(); - if (primary && backup) - return F.iterator(it, GridDhtLocalPartition::dataStore, true); + IgnitePredicate<GridDhtLocalPartition> filter; + if (primary && backup) + filter = F.alwaysTrue(); + else { IntSet parts = ImmutableIntSet.wrap(primary ? grp.affinity().primaryPartitions(ctx.localNodeId(), topVer) : grp.affinity().backupPartitions(ctx.localNodeId(), topVer)); - return F.iterator(it, GridDhtLocalPartition::dataStore, true, part -> parts.contains(part.id())); + filter = part -> parts.contains(part.id()); } + + return cacheDataStores(filter).iterator(); } /** {@inheritDoc} */ @@ -664,20 +668,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataStore != null ? dataStore.mvccAllVersionsCursor(cctx, key, x) : EMPTY_CURSOR; } - /** - * @param cctx Cache context. - * @param key Key. - * @return Data store. - */ - @Nullable private CacheDataStore dataStore(GridCacheContext cctx, KeyCacheObject key) { - if (grp.isLocal()) - return locCacheDataStore; - - GridDhtLocalPartition part = grp.topology().localPartition(cctx.affinity().partition(key), null, false); - - return part != null ? dataStore(part) : null; - } - /** {@inheritDoc} */ @Override public boolean containsKey(GridCacheMapEntry entry) { try { @@ -690,21 +680,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } - /** {@inheritDoc} */ - @Override public void onPartitionCounterUpdated(int part, long cntr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onPartitionInitialCounterUpdated(int part, long start, long delta) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long lastUpdatedPartitionCounter(int part) { - return 0; - } - /** * Clears offheap entries. * @@ -814,7 +789,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, int part) throws IgniteCheckedException { - CacheDataStore data = partitionData(part); + CacheDataStore data = dataStore(part, true); if (data == null) return new GridEmptyCloseableIterator<>(); @@ -864,7 +839,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part, @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled) { - CacheDataStore data = partitionData(part); + CacheDataStore data = dataStore(part, true); if (data == null) return new GridEmptyCloseableIterator<>(); @@ -874,7 +849,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridIterator<CacheDataRow> partitionIterator(int part) { - CacheDataStore data = partitionData(part); + CacheDataStore data = dataStore(part, true); if (data == null) return new GridEmptyCloseableIterator<>(); @@ -1156,7 +1131,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return null; } - CacheDataStore data = partitionData(part); + CacheDataStore data = dataStore(loc); return new GridCloseableIteratorAdapter<CacheDataRow>() { /** */ @@ -1245,9 +1220,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos, + @Override public void storeEntries(GridDhtLocalPartition part, Iterator<GridCacheEntryInfo> infos, IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { - CacheDataStore dataStore = dataStore(partId); + CacheDataStore dataStore = dataStore(part); List<DataRowCacheAware> batch = new ArrayList<>(PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK); @@ -1259,7 +1234,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager batch.add(new DataRowCacheAware(info.key(), info.value(), info.version(), - partId, + part.id(), info.expireTime(), info.cacheId(), grp.storeCacheIdInDataPage())); @@ -1280,22 +1255,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { - CacheDataStore dataStore; - partStoreLock.lock(p); try { - assert !partDataStores.containsKey(p); - - dataStore = createCacheDataStore0(p); - - partDataStores.put(p, dataStore); + return createCacheDataStore0(p); } finally { partStoreLock.unlock(p); } - - return dataStore; } /** @@ -1321,15 +1288,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager FLAG_IDX ); - return new CacheDataStoreImpl(p, rowStore, dataTree, () -> pendingEntries, grp, busyLock, log); - } - - /** {@inheritDoc} */ - @Override public Iterable<CacheDataStore> cacheDataStores() { - if (grp.isLocal()) - return Collections.singleton(locCacheDataStore); - - return () -> partDataStores.values().iterator(); + return new CacheDataStoreImpl(p, rowStore, dataTree, () -> pendingEntries, grp, busyLock, log, null); } /** {@inheritDoc} */ @@ -1339,10 +1298,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager partStoreLock.lock(p); try { - boolean rmv = partDataStores.remove(p, store); - - if (!rmv) - return; // Already destroyed. + if (store.destroyed()) + return; destroyCacheDataStore0(store); } @@ -1512,6 +1469,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** */ private final int updateValSizeThreshold; + /** */ + private volatile GridQueryRowCacheCleaner rowCacheCleaner; + /** * @param partId Partition number. * @param rowStore Row store. @@ -1524,7 +1484,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager Supplier<PendingEntriesTree> pendingEntries, CacheGroupContext grp, GridSpinBusyLock busyLock, - IgniteLogger log + IgniteLogger log, + @Nullable Supplier<GridQueryRowCacheCleaner> cleaner ) { this.partId = partId; this.rowStore = rowStore; @@ -1546,6 +1507,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager mvccUpdateMarker = new MvccMarkUpdatedHandler(grp); mvccUpdateTxStateHint = new MvccUpdateTxStateHintHandler(grp); mvccApplyChanges = new MvccApplyChangesHandler(grp); + + if (cleaner == null) + rowStore.setRowCacheCleaner(() -> rowCacheCleaner); + else + rowStore.setRowCacheCleaner(cleaner); } /** {@inheritDoc} */ @@ -3025,6 +2991,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public boolean destroyed() { + return dataTree.destroyed(); + } + + /** {@inheritDoc} */ @Override public void clear(int cacheId) throws IgniteCheckedException { assert cacheId != CU.UNDEFINED_CACHE_ID; @@ -3086,7 +3057,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { - rowStore().setRowCacheCleaner(rowCacheCleaner); + this.rowCacheCleaner = rowCacheCleaner; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 12caefd..e8ccf4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -650,7 +650,7 @@ public class GridDhtPartitionDemander { if (grp.mvccEnabled()) mvccPreloadEntries(topVer, node, p, infosWrap); else { - preloadEntries(topVer, p, infosWrap); + preloadEntries(topVer, part, infosWrap); rebalanceFut.onReceivedKeys(p, e.getValue().infos().size(), node); } @@ -873,19 +873,19 @@ public class GridDhtPartitionDemander { * Adds entries to partition p. * * @param topVer Topology version. - * @param p Partition id. + * @param part Local partition. * @param infos Entries info for preload. * @throws IgniteCheckedException If failed. */ private void preloadEntries( AffinityTopologyVersion topVer, - int p, + GridDhtLocalPartition part, Iterator<GridCacheEntryInfo> infos ) throws IgniteCheckedException { // Received keys by caches, for statistics. IntHashMap<GridMutableLong> receivedKeys = new IntHashMap<>(); - grp.offheap().storeEntries(p, infos, new IgnitePredicateX<CacheDataRow>() { + grp.offheap().storeEntries(part, infos, new IgnitePredicateX<CacheDataRow>() { /** {@inheritDoc} */ @Override public boolean applyx(CacheDataRow row) throws IgniteCheckedException { receivedKeys.computeIfAbsent(row.cacheId(), cid -> new GridMutableLong()).incrementAndGet(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 437d8d2..d4264d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -302,7 +302,7 @@ public class GridDhtPartitionSupplier { assert loc != null && loc.state() == GridDhtPartitionState.OWNING : "Partition should be in OWNING state: " + loc; - supplyMsg.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); + supplyMsg.addEstimatedKeysCount(loc.dataStore().fullSize()); } for (int i = 0; i < histMap.size(); i++) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 505f935..2728c51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.collection.IntMap; import org.apache.ignite.internal.util.collection.IntRWHashMap; @@ -231,15 +230,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (grp.walEnabled() && !recovery) ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0)); - // Inject row cache cleaner on store creation - // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group - if (ctx.kernalContext().query().moduleEnabled()) { - GridQueryRowCacheCleaner cleaner = ctx.kernalContext().indexProcessor() - .rowCacheCleaner(grp.groupId()); - - if (store != null && cleaner != null) - store.setRowCacheCleaner(cleaner); - } + // Inject row cache cleaner on store creation. + // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group. + if (ctx.kernalContext().query().moduleEnabled()) + store.setRowCacheCleaner(ctx.kernalContext().indexProcessor().rowCacheCleaner(grp.groupId())); } catch (IgniteCheckedException e) { // TODO ignite-db diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 7d71982..994f228 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -925,16 +925,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // Create a partition in lost state. if (lostParts != null && lostParts.contains(p)) loc.markLost(); - - if (ctx.pageStore() != null) { - try { - ctx.pageStore().onPartitionCreated(grp.groupId(), p); - } - catch (IgniteCheckedException e) { - // TODO ignite-db - throw new IgniteException(e); - } - } } return loc; @@ -1055,16 +1045,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ctx.database().checkpointReadUnlock(); } - if (created && ctx.pageStore() != null) { - try { - ctx.pageStore().onPartitionCreated(grp.groupId(), p); - } - catch (IgniteCheckedException e) { - // TODO ignite-db - throw new IgniteException(e); - } - } - return loc; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d895154..36a077e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2699,10 +2699,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CacheGroupContext ctx = cctx.cache().cacheGroup(rbRec.groupId()); if (ctx != null && !ctx.isLocal()) { - ctx.topology().forceCreatePartition(rbRec.partitionId()); + GridDhtLocalPartition part = ctx.topology().forceCreatePartition(rbRec.partitionId()); - ctx.offheap().onPartitionInitialCounterUpdated(rbRec.partitionId(), rbRec.start(), - rbRec.range()); + ctx.offheap().dataStore(part).updateInitialCounter(rbRec.start(), rbRec.range()); } break; @@ -2915,7 +2914,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (dataEntry.partitionCounter() != 0) - cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1); + cacheCtx.offheap().dataStore(locPart).updateInitialCounter(dataEntry.partitionCounter() - 1, 1); break; @@ -2934,7 +2933,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart); if (dataEntry.partitionCounter() != 0) - cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1); + cacheCtx.offheap().dataStore(locPart).updateInitialCounter(dataEntry.partitionCounter() - 1, 1); break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 168c151..fb7e339 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -119,6 +119,7 @@ import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -271,6 +272,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException { + assert F.size(cacheDataStores().iterator(), CacheDataStore::destroyed) == 0; + // Optimization: reducing the holding time of checkpoint write lock. syncMetadata(ctx, ctx.executor(), false); } @@ -354,7 +357,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (execSvc == null) { reuseList.saveMetadata(grp.statisticsHolderData()); - for (CacheDataStore store : partDataStores.values()) + for (CacheDataStore store : cacheDataStores()) saveStoreMetadata(store, ctx, false, needSnapshot); } else { @@ -367,7 +370,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } }); - for (CacheDataStore store : partDataStores.values()) + for (CacheDataStore store : cacheDataStores()) execSvc.execute(() -> { try { saveStoreMetadata(store, ctx, false, needSnapshot); @@ -1012,12 +1015,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * Invalidates page memory for given partition. Destroys partition store. * <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread. * - * @param grpId Group ID. * @param partId Partition ID. * * @throws IgniteCheckedException If destroy has failed. */ - public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException { + public void destroyPartitionStore(int partId) throws IgniteCheckedException { PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); int tag = pageMemory.invalidate(grp.groupId(), partId); @@ -1025,39 +1027,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (grp.walEnabled()) ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId)); - ctx.pageStore().onPartitionDestroyed(grpId, partId, tag); + ctx.pageStore().truncate(grp.groupId(), partId, tag); if (grp.config().isEncryptionEnabled()) ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId); } /** {@inheritDoc} */ - @Override public void onPartitionCounterUpdated(int part, long cntr) { - CacheDataStore store = partDataStores.get(part); - - assert store != null; - - long oldCnt = store.updateCounter(); - - if (oldCnt < cntr) - store.updateCounter(cntr); - } - - /** {@inheritDoc} */ - @Override public void onPartitionInitialCounterUpdated(int part, long start, long delta) { - CacheDataStore store = partDataStores.get(part); - - assert store != null; - - store.updateInitialCounter(start, delta); - } - - /** {@inheritDoc} */ - @Override public long lastUpdatedPartitionCounter(int part) { - return partDataStores.get(part).updateCounter(); - } - - /** {@inheritDoc} */ @Override public RootPage rootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException { return indexStorage.allocateCacheIndex(cacheId, idxName, segment); } @@ -1290,14 +1266,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void preloadPartition(int part) throws IgniteCheckedException { + @Override public void preloadPartition(int partId) throws IgniteCheckedException { if (grp.isLocal()) { - dataStore(part).preload(); + dataStore(null).preload(); return; } - GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false, false); + GridDhtLocalPartition locPart = grp.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false); assert locPart != null && locPart.reservations() > 0; @@ -1312,7 +1288,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple long freeSpace() { long freeSpace = 0; - for (CacheDataStore store : partDataStores.values()) { + for (CacheDataStore store : cacheDataStores()) { assert store instanceof GridCacheDataStore; AbstractFreeList freeList = ((GridCacheDataStore)store).getCacheStoreFreeList(); @@ -1334,7 +1310,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple long emptyDataPages() { long emptyDataPages = 0; - for (CacheDataStore store : partDataStores.values()) { + for (CacheDataStore store : cacheDataStores()) { assert store instanceof GridCacheDataStore; AbstractFreeList freeList = ((GridCacheDataStore)store).getCacheStoreFreeList(); @@ -1922,13 +1898,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private volatile long nextStoreCleanTimeNanos; /** */ - private GridQueryRowCacheCleaner rowCacheCleaner; - - /** - * Mutex used to synchronise publication of initialized delegate link and actions that should change - * the delegate's state, so the delegate will not be in obsolete state. - */ - private final Object delegatePublicationMux = new Object(); + private volatile GridQueryRowCacheCleaner rowCacheCleaner; /** */ private PartitionMetaStorageImpl<SimpleDataRow> partStorage; @@ -2144,7 +2114,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple () -> pendingTree0, grp, busyLock, - log + log, + () -> rowCacheCleaner ) { /** {@inheritDoc} */ @Override public PendingEntriesTree pendingTree() { @@ -2221,11 +2192,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple pageMem.releasePage(grpId, partMetaId, partMetaPage); } - synchronized (delegatePublicationMux) { - delegate0.setRowCacheCleaner(rowCacheCleaner); - - delegate = delegate0; - } + delegate = delegate0; } catch (Throwable ex) { U.error(log, "Unhandled exception during page store initialization. All further operations will " + @@ -2614,19 +2581,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { - try { - synchronized (delegatePublicationMux) { - this.rowCacheCleaner = rowCacheCleaner; - } - - CacheDataStore delegate0 = init0(true); - - if (delegate0 != null) - delegate0.setRowCacheCleaner(rowCacheCleaner); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + this.rowCacheCleaner = rowCacheCleaner; } /** {@inheritDoc} */ @@ -2953,6 +2908,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public boolean destroyed() { + try { + CacheDataStore delegate = init0(true); + + if (delegate != null) + return delegate.destroyed(); + + return false; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { CacheDataStore delegate = init0(true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index ebe5065..422f1fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.util.Collection; +import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -50,7 +51,7 @@ public class RowStore { private final boolean persistenceEnabled; /** Row cache cleaner. */ - private volatile GridQueryRowCacheCleaner rowCacheCleaner; + private volatile Supplier<GridQueryRowCacheCleaner> rowCacheCleaner = () -> null; /** */ protected final CacheGroupContext grp; @@ -80,8 +81,10 @@ public class RowStore { public void removeRow(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException { assert link != 0; - if (rowCacheCleaner != null) - rowCacheCleaner.remove(link); + GridQueryRowCacheCleaner rowCacheCleaner0 = rowCacheCleaner.get(); + + if (rowCacheCleaner0 != null) + rowCacheCleaner0.remove(link); if (!persistenceEnabled) freeList.removeDataRowByLink(link, statHolder); @@ -146,8 +149,10 @@ public class RowStore { public boolean updateRow(long link, CacheDataRow row, IoStatisticsHolder statHolder) throws IgniteCheckedException { assert !persistenceEnabled || ctx.database().checkpointLockIsHeldByThread(); - if (rowCacheCleaner != null) - rowCacheCleaner.remove(link); + GridQueryRowCacheCleaner rowCacheCleaner0 = rowCacheCleaner.get(); + + if (rowCacheCleaner0 != null) + rowCacheCleaner0.remove(link); return freeList.updateDataRow(link, row, statHolder); } @@ -188,7 +193,9 @@ public class RowStore { * * @param rowCacheCleaner Rows cache cleaner. */ - public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { + public void setRowCacheCleaner(Supplier<GridQueryRowCacheCleaner> rowCacheCleaner) { + assert rowCacheCleaner != null; + this.rowCacheCleaner = rowCacheCleaner; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index 21032a7..051d517 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -699,7 +699,7 @@ public class Checkpointer extends GridWorker { Runnable destroyPartTask = () -> { try { - offheap.destroyPartitionStore(grpId, partId); + offheap.destroyPartitionStore(partId); req.onDone(null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 05376b5..fb60b3e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -586,12 +586,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void onPartitionCreated(int grpId, int partId) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException { + @Override public void truncate(int grpId, int partId, int tag) throws IgniteCheckedException { assert partId <= MAX_PARTITION_ID; PageStore store = getStore(grpId, partId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java index 85004de..f036546 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java @@ -189,7 +189,7 @@ public class MetaStorage implements CheckpointListener, ReadWriteMetastorage { assert cctx.pageStore() != null; int partTag = ((PageMemoryEx)dataRegion.pageMemory()).invalidate(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION); - cctx.pageStore().onPartitionDestroyed(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION, partTag); + cctx.pageStore().truncate(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION, partTag); int idxTag = ((PageMemoryEx)dataRegion.pageMemory()).invalidate(METASTORAGE_CACHE_ID, PageIdAllocator.INDEX_PARTITION); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java index 2f89696..f743049 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java @@ -305,7 +305,7 @@ public class UpgradePendingTreeToPerPartitionTask implements IgniteCallable<Bool assert PageIO.getVersion(pageAddr) != 0; IgniteCacheOffheapManager.CacheDataStore store = - ((GridCacheOffheapManager)grp.offheap()).dataStore(partition); + grp.offheap().dataStore(grp.isLocal() ? null : grp.topology().localPartition(partition)); if (store == null) { log.warning("Failed to move old-version pending entry " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index dc6aeec..ed777bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -2682,6 +2682,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** + * @return {@code True} if marked as destroyed. + */ + public boolean destroyed() { + return destroyed.get(); + } + + /** * @param pageAddr Meta page address. * @return First page IDs. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/PendingTreeCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/PendingTreeCorruptionTest.java index 7a748e3..3337fb6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/PendingTreeCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/PendingTreeCorruptionTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.util.lang.GridCursor; @@ -105,7 +104,9 @@ public class PendingTreeCorruptionTest extends GridCommonAbstractTest { int expireCacheId = CU.cacheGroupId(expireCacheName, grpName); CacheGroupContext grp = ig.context().cache().cacheGroup(CU.cacheId(grpName)); - IgniteCacheOffheapManager.CacheDataStore store = ((IgniteCacheOffheapManagerImpl)grp.offheap()).dataStore(0); + IgniteCacheOffheapManager.CacheDataStore store = grp.topology().localPartition(0).dataStore(); + + assertNotNull(store); // Get pending tree of expire cache. PendingEntriesTree pendingTree = store.pendingTree(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/RestorePartitionStateDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/RestorePartitionStateDuringCheckpointTest.java index f7a7376..1395161 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/RestorePartitionStateDuringCheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/RestorePartitionStateDuringCheckpointTest.java @@ -31,7 +31,6 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.util.typedef.F; @@ -103,8 +102,10 @@ public class RestorePartitionStateDuringCheckpointTest extends GridCommonAbstrac AtomicBoolean checkpointTriggered = new AtomicBoolean(false); doAnswer(invocation -> { - IgniteCacheOffheapManager.CacheDataStore partDataStore = ((IgniteCacheOffheapManagerImpl)cacheProcessor - .cacheGroup(grpId).offheap()).dataStore(partId); + IgniteCacheOffheapManager.CacheDataStore partDataStore = cacheProcessor.cacheGroup(grpId).topology() + .localPartition(partId).dataStore(); + + assertNotNull(partDataStore); if (partDataStore.rowStore() != null && checkpointTriggered.compareAndSet(false, true)) { info("Before write lock will be gotten on the partition meta page [pageId=" + invocation.getArgument(2) + ']'); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java index f502e58..2b89232de 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java @@ -50,7 +50,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager; @@ -541,10 +540,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { if (locPart == null) continue; - IgniteCacheOffheapManager.CacheDataStore dataStore = - ctx.cache().cacheGroup(CU.cacheId(CACHE_NAME_ATOMIC)).offheap().dataStore(locPart); - - GridCursor cur = dataStore.cursor(); + GridCursor cur = locPart.dataStore().cursor(); assertFalse(cur.next()); assertEquals(0, locPart.fullSize()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java index c53845f..ffd8c92 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java @@ -78,12 +78,7 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager { } /** {@inheritDoc} */ - @Override public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException { + @Override public void truncate(int cacheId, int partId, int tag) throws IgniteCheckedException { // No-op. } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java index c57ce1a..b724e91 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java @@ -82,6 +82,7 @@ import org.junit.Test; import static java.util.Collections.singletonList; import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName; @@ -370,7 +371,9 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest { BinaryContext binCtx = ((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext(); GridCacheAdapter<?, ?> cache = ignite.context().cache().internalCache(dfltCacheCfg.getName()); - long partCtr = cache.context().offheap().lastUpdatedPartitionCounter(PART_ID); + long partCtr = cache.context().topology().localPartition(PART_ID, NONE, false) + .dataStore() + .updateCounter(); AtomicBoolean done = new AtomicBoolean(); db.addCheckpointListener(new CheckpointListener() { @@ -414,7 +417,9 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest { assertTrue(success); - long newPartCtr = cache.context().offheap().lastUpdatedPartitionCounter(PART_ID); + long newPartCtr = cache.context().topology().localPartition(PART_ID, NONE, false) + .dataStore() + .updateCounter(); assertEquals(newPartCtr, partCtr); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java index e0b35be..1b5933b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java @@ -209,8 +209,8 @@ public class PageMemoryTracker implements IgnitePlugin { cleanupPages(fullPageId -> fullPageId.groupId() == grp.groupId()); } - @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException { - super.onPartitionDestroyed(grpId, partId, tag); + @Override public void truncate(int grpId, int partId, int tag) throws IgniteCheckedException { + super.truncate(grpId, partId, tag); cleanupPages(fullPageId -> fullPageId.groupId() == grpId && PageIdUtils.partId(fullPageId.pageId()) == partId); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java index 114b807..f2fc215 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java @@ -172,7 +172,7 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest { // Force checkpoint to destroy evicted partitions store. forceCheckpoint(grid0); - GridTestUtils.assertThrowsAnyCause(log, iter1::next, IgniteException.class, "Failed to get next data row"); + assertFalse(iter1.hasNext()); GridTestUtils.assertThrowsAnyCause(log, () -> { while (iter2.hasNext())