Mmuzaf commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r591776895
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(12008, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+ rows++;
+ }
}
- return result;
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup()
throws Exception {
+ int keysPerCache = 3700;
+ int cacheValSize = 33;
+ int rowsOnPage = 37;
+
+ CacheConfiguration<Integer, Value> ccfg1 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+ CacheConfiguration<Integer, Value> ccfg2 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx2"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ ccfg1.setGroupName(DEFAULT_CACHE_NAME);
+ ccfg2.setGroupName(DEFAULT_CACHE_NAME);
+
+ IgniteEx ignite = startGridsWithCache(1, keysPerCache, k -> new
Value(new byte[cacheValSize]), ccfg1, ccfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ AtomicReference<ByteBuffer> buffRef = new AtomicReference<>();
+ AtomicReference<Long> pageIdRef = new AtomicReference<>();
+
+ snapshotStoreFactory(ignite, new BiFunction<Integer, Boolean,
FileVersionCheckingFactory>() {
+ @Override public FileVersionCheckingFactory apply(Integer grpId,
Boolean encrypted) {
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+ return new
FileVersionCheckingFactory(storeMgr.getPageStoreFileIoFactory(),
storeMgr.getPageStoreFileIoFactory(), storeMgr::pageSize) {
+ @Override public PageStore createPageStore(
+ byte type,
+ IgniteOutClosure<Path> pathProvider,
+ LongConsumer allocatedTracker
+ ) {
+ return new FilePageStoreV2(
+ type,
+ pathProvider,
+ storeMgr.getPageStoreFileIoFactory(),
+ storeMgr.pageSize(),
+ allocatedTracker
+ ) {
+ @Override public boolean read(
+ long pageId,
+ ByteBuffer pageBuf,
+ boolean keepCrc
+ ) throws IgniteCheckedException {
+ buffRef.set(pageBuf);
+ pageIdRef.set(pageId);
+
+ return super.read(pageId, pageBuf, keepCrc);
+ }
+ };
+ }
+ };
+ }
+ });
+
+ int currRows = 0;
+ int rowsCnt;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex("tx1").context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ DataPageIO io = PageIO.getPageIO(T_DATA,
PageIO.getVersion(buffRef.get()));
+
+ long pageAddr = GridUnsafe.bufferAddress(buffRef.get());
+ rowsCnt = io.getDirectCount(pageAddr);
+
+ assertEquals(cacheValSize, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertEquals(rowsOnPage, rowsCnt);
+
+ currRows++;
+ }
+ }
+
+ assertEquals(keysPerCache, currRows / 2);
+
+ // Remove all rows in the middle of the page.
+ for (int k = 0; k < keysPerCache; k++) {
+ if (k % rowsOnPage == 1 || k % rowsOnPage == (rowsOnPage - 1))
+ continue;
+
+ ignite.getOrCreateCache(ccfg1).remove(k);
+ ignite.getOrCreateCache(ccfg2).remove(k);
+ }
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME + 2).get();
+
+ int indRowsCnt;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME + 2,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex("tx1").context().cacheObjectContext();
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]