anton-vinogradov commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r585330088



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1113,6 +1136,58 @@ private static String snapshotMetaFileName(String 
consId) {
         return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
     }
 
+    /**
+     * @param snpName Snapshot name.
+     * @param folderName The node folder name, usually it's the same as the 
U.maskForFileName(consistentId).
+     * @param grpName Cache group name.
+     * @param partId Partition id.
+     * @return Iterator over partition.
+     */
+    public GridCloseableIterator<CacheDataRow> partitionRows(String snpName,
+        String folderName,
+        String grpName,
+        int partId
+    ) throws IgniteCheckedException {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            throw new IgniteCheckedException("Snapshot directory doesn't 
exists: " + snpDir.getAbsolutePath());
+
+        File nodePath = new File(snpDir, databaseRelativePath(folderName));
+
+        if (!nodePath.exists())
+            throw new IgniteCheckedException("Consistent id directory doesn't 
exists: " + nodePath.getAbsolutePath());
+
+        List<File> grps = cacheDirectories(nodePath, name -> 
name.equals(grpName));
+
+        if (F.isEmpty(grps) || grps.size() > 1)
+            throw new IgniteCheckedException("Snapshot cache group not found 
[dir=" + snpDir.getAbsolutePath() + ", grpName=" + grpName + ']');

Review comment:
       `Snapshot cache group not found` is an incorrect statement when 
`grps.size() > 1`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File 
to, long length) {
         }
     }
 
+    /**
+     * Ves pokrit assertami absolutely ves,
+     * PageScan iterator in the ignite core est.
+     */
+    private static class PageScanIterator extends 
GridCloseableIteratorAdapter<CacheDataRow> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Page store to iterate over. */
+        @GridToStringExclude
+        private final PageStore store;
+
+        /** Page store partition id. */
+        private final int partId;
+
+        /** Grid cache shared context. */
+        private final GridCacheSharedContext<?, ?> sctx;
+
+        /** Cache object context for key/value deserialization. */
+        private final CacheObjectContext coctx;
+
+        /** Buffer to read pages. */
+        private final ByteBuffer locBuff;
+
+        /** Buffer to read the rest part of fragmented rows. */
+        private final ByteBuffer fragmentBuff;
+
+        /** Total pages in the page store. */
+        private final int pages;
+
+        /** Pages which already marked and postponed to be read on the second 
iteration. */

Review comment:
       Tail pages, contain only part of the value, to be skipped during the 
second iterations (will be processed via headers processing).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File 
to, long length) {
         }
     }
 
+    /**
+     * Ves pokrit assertami absolutely ves,
+     * PageScan iterator in the ignite core est.
+     */
+    private static class PageScanIterator extends 
GridCloseableIteratorAdapter<CacheDataRow> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Page store to iterate over. */
+        @GridToStringExclude
+        private final PageStore store;
+
+        /** Page store partition id. */
+        private final int partId;
+
+        /** Grid cache shared context. */
+        private final GridCacheSharedContext<?, ?> sctx;
+
+        /** Cache object context for key/value deserialization. */
+        private final CacheObjectContext coctx;
+
+        /** Buffer to read pages. */
+        private final ByteBuffer locBuff;
+
+        /** Buffer to read the rest part of fragmented rows. */
+        private final ByteBuffer fragmentBuff;
+
+        /** Total pages in the page store. */
+        private final int pages;
+
+        /** Pages which already marked and postponed to be read on the second 
iteration. */
+        private final BitSet markedPages;
+
+        /** Pages which already read and must be skipped. */
+        private final BitSet readPages;
+
+        /** Batch of rows read through iteration. */
+        private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+        /** {@code true} if the iteration though partition reached its end. */
+        private boolean secondScanComplete;
+
+        /**
+         * Current partition page index for read. Due to we read the partition 
twice it
+         * can't be greater that 2 * store.size().
+         */
+        private int currIdx;
+
+        /**
+         * During scanning a cache partition presented as {@code PageStore} we 
must guarantee the following:
+         * all the pages of this storage remains unchanged during the Iterator 
remains opened, the stored data
+         * keeps its consistency. We can't read the {@code PageStore} during 
an ongoing checkpoint over it.
+         *
+         * @param coctx Cache object context.
+         * @param store Page store to read.
+         * @param partId Partition id.
+         * @throws IgniteCheckedException If fails.
+         */
+        public PageScanIterator(
+            GridCacheSharedContext<?, ?> sctx,
+            CacheObjectContext coctx,
+            PageStore store,
+            int partId
+        ) throws IgniteCheckedException {
+            this.store = store;
+            this.partId = partId;
+            this.coctx = coctx;
+            this.sctx = sctx;
+
+            store.ensure();
+            pages = store.pages();
+            markedPages = new BitSet(pages);
+            readPages = new BitSet(pages);
+
+            locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+                .order(ByteOrder.nativeOrder());
+            fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+                .order(ByteOrder.nativeOrder());
+        }
+
+        /** {@inheritDoc */
+        @Override protected CacheDataRow onNext() throws 
IgniteCheckedException {
+            if (secondScanComplete && rows.isEmpty())
+                throw new NoSuchElementException("[partId=" + partId + ", 
store=" + store + ", skipPages=" + readPages + ']');
+
+            return rows.poll();
+        }
+
+        /** {@inheritDoc */
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            if (secondScanComplete && rows.isEmpty())
+                return false;
+
+            try {
+                for (; currIdx < 2 * pages && rows.isEmpty(); currIdx++) {
+                    boolean first = currIdx < pages;
+                    int pageIdx = currIdx % pages;
+
+                    if (readPages.get(pageIdx) || (!first && 
markedPages.get(pageIdx)))
+                        continue;
+
+                    if (!readPageFromStore(pageId(partId, FLAG_DATA, pageIdx), 
locBuff)) {
+                        // Skip not FLAG_DATA pages.
+                        changeBit(readPages, pageIdx);
+
+                        continue;
+                    }
+
+                    long pageAddr = bufferAddress(locBuff);
+                    DataPageIO io = getPageIO(T_DATA, getVersion(pageAddr));
+                    int freeSpace = io.getFreeSpace(pageAddr);
+                    int rowsCnt = io.getDirectCount(pageAddr);
+
+                    if (first) {
+                        // Skip empty pages.
+                        if (rowsCnt == 0) {
+                            changeBit(readPages, pageIdx);
+
+                            continue;
+                        }
+
+                        // There is no difference between a page containing an 
incomplete DataRow fragment and
+                        // the page where DataRow takes up all the free space. 
There is no a dedicated
+                        // flag for this case in page header.
+                        // During the storage scan we can skip such pages at 
the first iteration over the partition file,
+                        // since all the fragmented pages will be marked by 
BitSet array we will safely read the others
+                        // on the second iteration.
+                        if (freeSpace == 0 && rowsCnt == 1) {
+                            DataPagePayload payload = io.readPayload(pageAddr, 
0, locBuff.capacity());
+
+                            long link = payload.nextLink();
+
+                            if (link != 0)
+                                changeBit(markedPages, 
pageIndex(pageId(link)));
+
+                            continue;
+                        }
+                    }
+
+                    changeBit(readPages, pageIdx);
+
+                    for (int itemId = 0; itemId < rowsCnt; itemId++) {
+                        DataRow row = new DataRow();
+
+                        row.partition(partId);
+
+                        row.initFromPageBuffer(
+                            sctx,
+                            coctx,
+                            new IgniteThrowableFunction<Long, ByteBuffer>() {
+                                @Override public ByteBuffer apply(Long 
nextPageId) throws IgniteCheckedException {
+                                    boolean success = 
readPageFromStore(nextPageId, fragmentBuff);
+
+                                    assert success : "Only FLAG_DATA pages 
allowed: " + toDetailString(nextPageId);
+
+                                    // Fragment of page has been read, might 
be skipped further.
+                                    changeBit(readPages, 
pageIndex(nextPageId));
+
+                                    return fragmentBuff;
+                                }
+                            },
+                            locBuff,
+                            io,
+                            itemId,
+                            false,
+                            CacheDataRowAdapter.RowData.FULL,
+                            false);
+
+                        rows.add(row);
+                    }
+                }
+
+                if (currIdx == 2 * pages) {
+                    secondScanComplete = true;
+
+                    boolean set = true;
+
+                    for (int j = 0; j < pages; j++)
+                        set &= readPages.get(j);
+
+                    assert set : "readPages=" + readPages + ", pages=" + pages;
+                }
+
+                return !rows.isEmpty();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteCheckedException("Error during iteration 
through page store: " + this, e);
+            }
+        }
+
+        /**
+         * @param bitSet BitSet to change bit index.
+         * @param idx Index of bit to change.
+         */
+        private static void changeBit(BitSet bitSet, int idx) {

Review comment:
       Since we "changing", we should pass the value?
   Another case is to replace `change` with `set`.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
         doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, 
readCacheId, rowData, null, skipVer);
     }
 
+    /**
+     * @param io Data page IO.
+     * @param itemId Row item Id.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromPageBuffer(
+        GridCacheSharedContext<?, ?> sctx,
+        CacheObjectContext coctx,
+        IgniteThrowableFunction<Long, ByteBuffer> reader,
+        ByteBuffer pageBuff,
+        DataPageIO io,
+        int itemId,
+        boolean readCacheId,
+        RowData rowData,
+        boolean skipVer
+    ) throws IgniteCheckedException {
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx, 
pageBuff.capacity(), pageBuff.capacity(),
+            pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+        if (incomplete == null)
+            return;
+
+        long nextLink = incomplete.getNextLink();
+
+        if (nextLink == 0)
+            return;
+
+        do {
+            long pageId = pageId(nextLink);
+
+            try {

Review comment:
       Could we minimize `try` section?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)

Review comment:
       Do we really need a loop here?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(
+                        newVal,
+                        new GridCacheVersion(row0.version().topologyVersion(),
+                            row0.version().nodeOrder(),
+                            row0.version().order() + 1),
+                        null,
+                        null,
+                        TxState.NA,
+                        TxState.NA,
+                        TTL_ETERNAL,
+                        row0.expireTime(),
+                        true,
+                        topVer,
+                        DR_NONE,
+                        false,
+                        null);
+
+                    assertTrue(success0);
+
+                    long newPartCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+                    assertEquals(newPartCtr, partCtr);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+        });
+
+        db.waitForCheckpoint("test-checkpoint");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Path part0 = 
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        IdleVerifyResultV2 res = 
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, 
found 1 conflict partitions");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCompareHashes() throws Exception {
+        Random rnd = new Random();
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new 
HashMap<>();
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new 
HashMap<>();
+
+        IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new 
TestVisorBackupPartitionsTask(idleHashes),
+            new VisorIdleVerifyTaskArg(new 
HashSet<>(Collections.singletonList(ccfg.getName())),
+            new HashSet<>(),
+            false,
+            CacheFilterEnum.USER,
+            true));
+
+        IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new 
TestSnapshotPartitionsVerifyTask(snpHashes),
+            Collections.singletonMap(ignite.cluster().localNode(),
+                
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME, 
(String)ignite.configuration().getConsistentId()))));
+
+        assertEquals(idleHashes, snpHashes);
+        assertEquals(idleVerifyRes, snpVerifyRes);
+    }
+
+    /** */
+    private static class TestVisorBackupPartitionsTask extends 
VerifyBackupPartitionsTaskV2 {
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;

Review comment:
       javadoc missed

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1113,6 +1142,58 @@ private static String snapshotMetaFileName(String 
consId) {
         return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
     }
 
+    /**
+     * @param snpName Snapshot name.
+     * @param folderName The node folder name, usually it's the same as the 
U.maskForFileName(consistentId).
+     * @param grpName Cache group name.
+     * @param partId Partition id.
+     * @return Iterator over partition.
+     */
+    public GridCloseableIterator<CacheDataRow> partitionRows(String snpName,

Review comment:
       Not sure, but can we have *iterator name for this method?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));

Review comment:
       Should we use maxKey instead of 15_000?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));
+                else
+                    cache.remove(rnd.nextInt(maxKey));
+            }
+
+        }, 5, "change-loader-");
+
+        fut.get();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<Integer, Value> iterated = new HashMap<>();
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            ccfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                iterated.put(row.key().value(coctx, true), 
row.value().value(coctx, true));
+            }
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+        assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+        snpCache.forEach(e -> {
+            Value val = iterated.remove(e.getKey());
+
+            assertNotNull(val);
+            assertEquals(val.arr().length, e.getValue().arr().length);
+        });
+
+        assertTrue(iterated.isEmpty());
+    }
+
+    /** @throws Exception If fails */
+    @Test
+    public void testSnapshotIterator() throws Exception {
+        int keys = 127;
+
+        IgniteEx ignite = startGridsWithCache(2,
+            dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 
1)), keys);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                // Invariant for cache: cache key always equals to cache value.
+                assertEquals("Invalid key/value pair [key=" + row.key() + ", 
val=" + row.value() + ']',
+                    row.key().value(coctx, false, 
U.resolveClassLoader(ignite.configuration())),
+                    (Integer)row.value().value(coctx, false));
+
+                rows++;
+            }
+        }
+
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorLargeRows() throws Exception {
+        int keys = 2;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+        forceCheckpoint();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                assertEquals(12008, ((Value)row.value().value(coctx, 
false)).arr().length);
+                assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+                rows++;
+            }
         }
 
-        return result;
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup() 
throws Exception {
+        int keysPerCache = 3700;
+        int cacheValSize = 33;
+        int rowsOnPage = 37;
+
+        CacheConfiguration<Integer, Value> ccfg1 = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+        CacheConfiguration<Integer, Value> ccfg2 = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx2"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        ccfg1.setGroupName(DEFAULT_CACHE_NAME);
+        ccfg2.setGroupName(DEFAULT_CACHE_NAME);
+
+        IgniteEx ignite = startGridsWithCache(1, keysPerCache, k -> new 
Value(new byte[cacheValSize]), ccfg1, ccfg2);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        AtomicReference<ByteBuffer> buffRef = new AtomicReference<>();
+        AtomicReference<Long> pageIdRef = new AtomicReference<>();
+
+        snapshotStoreFactory(ignite, new BiFunction<Integer, Boolean, 
FileVersionCheckingFactory>() {
+            @Override public FileVersionCheckingFactory apply(Integer grpId, 
Boolean encrypted) {
+                FilePageStoreManager storeMgr = 
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+                return new 
FileVersionCheckingFactory(storeMgr.getPageStoreFileIoFactory(), 
storeMgr.getPageStoreFileIoFactory(), storeMgr::pageSize) {
+                    @Override public PageStore createPageStore(
+                        byte type,
+                        IgniteOutClosure<Path> pathProvider,
+                        LongConsumer allocatedTracker
+                    ) {
+                        return new FilePageStoreV2(
+                            type,
+                            pathProvider,
+                            storeMgr.getPageStoreFileIoFactory(),
+                            storeMgr.pageSize(),
+                            allocatedTracker
+                        ) {
+                            @Override public boolean read(
+                                long pageId,
+                                ByteBuffer pageBuf,
+                                boolean keepCrc
+                            ) throws IgniteCheckedException {
+                                buffRef.set(pageBuf);
+                                pageIdRef.set(pageId);
+
+                                return super.read(pageId, pageBuf, keepCrc);
+                            }
+                        };
+                    }
+                };
+            }
+        });
+
+        int currRows = 0;
+        int rowsCnt;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex("tx1").context().cacheObjectContext();

Review comment:
       tx1 duplicate

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));
+                else
+                    cache.remove(rnd.nextInt(maxKey));
+            }
+
+        }, 5, "change-loader-");
+
+        fut.get();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<Integer, Value> iterated = new HashMap<>();
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            ccfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                iterated.put(row.key().value(coctx, true), 
row.value().value(coctx, true));
+            }
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+        assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+        snpCache.forEach(e -> {
+            Value val = iterated.remove(e.getKey());
+
+            assertNotNull(val);
+            assertEquals(val.arr().length, e.getValue().arr().length);
+        });
+
+        assertTrue(iterated.isEmpty());
+    }
+
+    /** @throws Exception If fails */
+    @Test
+    public void testSnapshotIterator() throws Exception {
+        int keys = 127;
+
+        IgniteEx ignite = startGridsWithCache(2,
+            dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 
1)), keys);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                // Invariant for cache: cache key always equals to cache value.
+                assertEquals("Invalid key/value pair [key=" + row.key() + ", 
val=" + row.value() + ']',
+                    row.key().value(coctx, false, 
U.resolveClassLoader(ignite.configuration())),
+                    (Integer)row.value().value(coctx, false));
+
+                rows++;
+            }
+        }
+
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorLargeRows() throws Exception {
+        int keys = 2;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+        forceCheckpoint();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                assertEquals(12008, ((Value)row.value().value(coctx, 
false)).arr().length);
+                assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+                rows++;
+            }
         }
 
-        return result;
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup() 
throws Exception {

Review comment:
       As decided, this test checks internals already checked by values read on 
recovery.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File 
to, long length) {
         }
     }
 
+    /**
+     * Ves pokrit assertami absolutely ves,
+     * PageScan iterator in the ignite core est.
+     */
+    private static class PageScanIterator extends 
GridCloseableIteratorAdapter<CacheDataRow> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Page store to iterate over. */
+        @GridToStringExclude
+        private final PageStore store;
+
+        /** Page store partition id. */
+        private final int partId;
+
+        /** Grid cache shared context. */
+        private final GridCacheSharedContext<?, ?> sctx;
+
+        /** Cache object context for key/value deserialization. */
+        private final CacheObjectContext coctx;
+
+        /** Buffer to read pages. */
+        private final ByteBuffer locBuff;
+
+        /** Buffer to read the rest part of fragmented rows. */
+        private final ByteBuffer fragmentBuff;
+
+        /** Total pages in the page store. */
+        private final int pages;
+
+        /** Pages which already marked and postponed to be read on the second 
iteration. */
+        private final BitSet markedPages;

Review comment:
       Let's rename them to tailPages or something like this.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
##########
@@ -223,6 +228,66 @@ public static String formatUpdateCountersDiff(IgniteEx ig, 
List<Integer> diff) {
         return diff;
     }
 
+    /**
+     * @param updCntr Partition update counter prior check.
+     * @param grpId Group id.
+     * @param partId Partition id.
+     * @param grpName Group name.
+     * @param consId Local node consistent id.
+     * @param state Partition state to check.
+     * @param isPrimary {@code true} if partition is primary.
+     * @param partSize Partition size on disk.
+     * @param it Iterator though partition data rows.
+     * @throws IgniteCheckedException If fails.
+     * @return Map of calculated partition.
+     */
+    public static Map<PartitionKeyV2, PartitionHashRecordV2> 
calculatePartitionHash(

Review comment:
       Always returns singletonMap, is there any reason to return the map in 
this case?
   Also, because of the method name, we expect not a map, but something related 
to the partition.
   Also, this looks odd when we have the same (partId, grpId) at the input and 
at the output as well.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
         doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, 
readCacheId, rowData, null, skipVer);
     }
 
+    /**
+     * @param io Data page IO.
+     * @param itemId Row item Id.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromPageBuffer(
+        GridCacheSharedContext<?, ?> sctx,
+        CacheObjectContext coctx,
+        IgniteThrowableFunction<Long, ByteBuffer> reader,
+        ByteBuffer pageBuff,
+        DataPageIO io,
+        int itemId,
+        boolean readCacheId,
+        RowData rowData,
+        boolean skipVer
+    ) throws IgniteCheckedException {
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx, 
pageBuff.capacity(), pageBuff.capacity(),
+            pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+        if (incomplete == null)
+            return;
+
+        long nextLink = incomplete.getNextLink();
+
+        if (nextLink == 0)
+            return;
+
+        do {
+            long pageId = pageId(nextLink);
+
+            try {
+                ByteBuffer fragmentBuff = reader.apply(pageId);
+
+                long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+                DataPageIO io2 = PageIO.getPageIO(T_DATA, 
PageIO.getVersion(fragmentBuff));
+
+                incomplete = readIncomplete(incomplete, sctx, coctx, 
fragmentBuff.capacity(), fragmentBuff.capacity(),
+                    fragmentAddr, itemId(nextLink), io2, rowData, readCacheId, 
skipVer);
+
+                if (incomplete == null)
+                    return;
+
+                nextLink = incomplete.getNextLink();
+            }
+            catch (Exception e) {
+                throw new IgniteException("Error during reading DataRow 
[pageId=" + pageId + ']', e);
+            }
+        }
+        while (nextLink != 0);
+
+        assert isReady() : "ready";

Review comment:
       String after the assert should represent a "fail description".
   `ready` seems to be a "good case" :)

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
##########
@@ -504,105 +500,63 @@ private boolean 
isCacheMatchFilter(DynamicCacheDescriptor desc) {
         }
 
         /**
-         * @param grpCtx Group context.
+         * @param gctx Group context.
          * @param part Local partition.
          */
         private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> 
calculatePartitionHashAsync(

Review comment:
       should we replace Map with Tuple here?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))

Review comment:
       Let's provide a comment that explains that we're changing the value at 
one of the nodes to get a hash check issue.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldExtractionSelfTest.java
##########
@@ -276,7 +276,7 @@ public void testDecimalFieldMarshalling() throws Exception {
      * @param marsh Binary marshaller.
      * @return Binary object.
      */
-    protected BinaryObjectImpl toBinary(Object obj, BinaryMarshaller marsh) 
throws Exception {
+    public static BinaryObjectImpl toBinary(Object obj, BinaryMarshaller 
marsh) throws Exception {

Review comment:
       Seems to be not a proper solution to make this static to use at 
different test.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(

Review comment:
       Do we have another `success` to call this `*0`?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(
+                        newVal,
+                        new GridCacheVersion(row0.version().topologyVersion(),
+                            row0.version().nodeOrder(),
+                            row0.version().order() + 1),
+                        null,
+                        null,
+                        TxState.NA,
+                        TxState.NA,
+                        TTL_ETERNAL,
+                        row0.expireTime(),
+                        true,
+                        topVer,
+                        DR_NONE,
+                        false,
+                        null);
+
+                    assertTrue(success0);
+
+                    long newPartCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+                    assertEquals(newPartCtr, partCtr);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+        });
+
+        db.waitForCheckpoint("test-checkpoint");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Path part0 = 
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        IdleVerifyResultV2 res = 
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, 
found 1 conflict partitions");

Review comment:
       Should we check that hash verification fail is the failure reason?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File 
to, long length) {
         }
     }
 
+    /**
+     * Ves pokrit assertami absolutely ves,
+     * PageScan iterator in the ignite core est.
+     */
+    private static class PageScanIterator extends 
GridCloseableIteratorAdapter<CacheDataRow> {

Review comment:
       Let's think about naming. Now it tells that iterator will scan the page.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
         doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, 
readCacheId, rowData, null, skipVer);
     }
 
+    /**
+     * @param io Data page IO.
+     * @param itemId Row item Id.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromPageBuffer(
+        GridCacheSharedContext<?, ?> sctx,
+        CacheObjectContext coctx,
+        IgniteThrowableFunction<Long, ByteBuffer> reader,
+        ByteBuffer pageBuff,
+        DataPageIO io,
+        int itemId,
+        boolean readCacheId,
+        RowData rowData,
+        boolean skipVer
+    ) throws IgniteCheckedException {
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx, 
pageBuff.capacity(), pageBuff.capacity(),
+            pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+        if (incomplete == null)
+            return;
+
+        long nextLink = incomplete.getNextLink();
+
+        if (nextLink == 0)
+            return;
+
+        do {
+            long pageId = pageId(nextLink);
+
+            try {
+                ByteBuffer fragmentBuff = reader.apply(pageId);
+
+                long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+                DataPageIO io2 = PageIO.getPageIO(T_DATA, 
PageIO.getVersion(fragmentBuff));
+
+                incomplete = readIncomplete(incomplete, sctx, coctx, 
fragmentBuff.capacity(), fragmentBuff.capacity(),
+                    fragmentAddr, itemId(nextLink), io2, rowData, readCacheId, 
skipVer);
+
+                if (incomplete == null)
+                    return;
+
+                nextLink = incomplete.getNextLink();
+            }
+            catch (Exception e) {

Review comment:
       Coul we set strict type here, eg. IgniteCheckedException thrown by 
`readIncomplete`?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);

Review comment:
       Let's replace 0 by constant referencing to partition id at the whole 
test.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {

Review comment:
       According to its name, this test should check hashes misses, but it 
checks hashes validation fail on different content.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(
+                        newVal,
+                        new GridCacheVersion(row0.version().topologyVersion(),
+                            row0.version().nodeOrder(),
+                            row0.version().order() + 1),
+                        null,
+                        null,
+                        TxState.NA,
+                        TxState.NA,
+                        TTL_ETERNAL,
+                        row0.expireTime(),
+                        true,
+                        topVer,
+                        DR_NONE,
+                        false,
+                        null);
+
+                    assertTrue(success0);
+
+                    long newPartCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+                    assertEquals(newPartCtr, partCtr);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+        });
+
+        db.waitForCheckpoint("test-checkpoint");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Path part0 = 
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        IdleVerifyResultV2 res = 
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, 
found 1 conflict partitions");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCompareHashes() throws Exception {

Review comment:
       testClusterSnapshotHashesTheSameAsIdleVerifyHashes? %) 

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(
+                        newVal,
+                        new GridCacheVersion(row0.version().topologyVersion(),
+                            row0.version().nodeOrder(),
+                            row0.version().order() + 1),
+                        null,
+                        null,
+                        TxState.NA,
+                        TxState.NA,
+                        TTL_ETERNAL,
+                        row0.expireTime(),
+                        true,
+                        topVer,
+                        DR_NONE,
+                        false,
+                        null);
+
+                    assertTrue(success0);
+
+                    long newPartCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+                    assertEquals(newPartCtr, partCtr);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+        });
+
+        db.waitForCheckpoint("test-checkpoint");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Path part0 = 
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        IdleVerifyResultV2 res = 
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, 
found 1 conflict partitions");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCompareHashes() throws Exception {
+        Random rnd = new Random();
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new 
HashMap<>();
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new 
HashMap<>();
+
+        IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new 
TestVisorBackupPartitionsTask(idleHashes),
+            new VisorIdleVerifyTaskArg(new 
HashSet<>(Collections.singletonList(ccfg.getName())),
+            new HashSet<>(),
+            false,
+            CacheFilterEnum.USER,
+            true));
+
+        IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new 
TestSnapshotPartitionsVerifyTask(snpHashes),
+            Collections.singletonMap(ignite.cluster().localNode(),
+                
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME, 
(String)ignite.configuration().getConsistentId()))));
+
+        assertEquals(idleHashes, snpHashes);
+        assertEquals(idleVerifyRes, snpVerifyRes);
+    }
+
+    /** */
+    private static class TestVisorBackupPartitionsTask extends 
VerifyBackupPartitionsTaskV2 {
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
+
+        /**
+         * @param hashes Map of calculated partition hashes.
+         */
+        public TestVisorBackupPartitionsTask(Map<PartitionKeyV2, 
List<PartitionHashRecordV2>> hashes) {

Review comment:
       Looks like TestSnapshotPartitionsVerifyTask code equals this code.
   Can we resolve the duplication issue?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws 
Exception {
         Exception ex = res.exceptions().values().iterator().next();
         assertTrue(X.hasCause(ex, 
IgniteDataIntegrityViolationException.class));
     }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedHashes() throws Exception {
+        int keys = 1;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+        forceCheckpoint(ignite);
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        BinaryContext binCtx = 
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+        GridCacheAdapter<?, ?> cache = 
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+        long partCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+        AtomicBoolean done = new AtomicBoolean();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                if (!done.compareAndSet(false, true))
+                    return;
+
+                GridIterator<CacheDataRow> it = 
cache.context().offheap().partitionIterator(0);
+
+                assertTrue(it.hasNext());
+
+                CacheDataRow row0 = it.nextX();
+
+                AffinityTopologyVersion topVer = 
cctx.exchange().readyAffinityVersion();
+                GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+                byte[] bytes = new byte[2000];
+                new Random().nextBytes(bytes);
+
+                try {
+                    BinaryObjectImpl newVal = toBinary(new Value(bytes), 
binCtx.marshaller());
+
+                    boolean success0 = cached.initialValue(
+                        newVal,
+                        new GridCacheVersion(row0.version().topologyVersion(),
+                            row0.version().nodeOrder(),
+                            row0.version().order() + 1),
+                        null,
+                        null,
+                        TxState.NA,
+                        TxState.NA,
+                        TTL_ETERNAL,
+                        row0.expireTime(),
+                        true,
+                        topVer,
+                        DR_NONE,
+                        false,
+                        null);
+
+                    assertTrue(success0);
+
+                    long newPartCtr = 
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+                    assertEquals(newPartCtr, partCtr);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+
+            }
+        });
+
+        db.waitForCheckpoint("test-checkpoint");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Path part0 = 
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        IdleVerifyResultV2 res = 
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, 
found 1 conflict partitions");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCompareHashes() throws Exception {
+        Random rnd = new Random();
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new 
HashMap<>();
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new 
HashMap<>();
+
+        IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new 
TestVisorBackupPartitionsTask(idleHashes),
+            new VisorIdleVerifyTaskArg(new 
HashSet<>(Collections.singletonList(ccfg.getName())),
+            new HashSet<>(),
+            false,
+            CacheFilterEnum.USER,
+            true));
+
+        IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new 
TestSnapshotPartitionsVerifyTask(snpHashes),
+            Collections.singletonMap(ignite.cluster().localNode(),
+                
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME, 
(String)ignite.configuration().getConsistentId()))));
+
+        assertEquals(idleHashes, snpHashes);
+        assertEquals(idleVerifyRes, snpVerifyRes);
+    }
+
+    /** */
+    private static class TestVisorBackupPartitionsTask extends 
VerifyBackupPartitionsTaskV2 {
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
+
+        /**
+         * @param hashes Map of calculated partition hashes.
+         */
+        public TestVisorBackupPartitionsTask(Map<PartitionKeyV2, 
List<PartitionHashRecordV2>> hashes) {
+            this.hashes = hashes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable IdleVerifyResultV2 
reduce(List<ComputeJobResult> results) throws IgniteException {
+            IdleVerifyResultV2 res = super.reduce(results);
+
+            for (ComputeJobResult job : results) {
+                if (job.getException() != null)
+                    continue;
+
+                job.<Map<PartitionKeyV2, 
PartitionHashRecordV2>>getData().forEach((k, v) ->
+                    hashes.computeIfAbsent(k, k0 -> new ArrayList<>()).add(v));
+            }
+
+            return res;
+        }
+    }
+
+    /** */
+    private static class TestSnapshotPartitionsVerifyTask extends 
SnapshotPartitionsVerifyTask {
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;

Review comment:
       javadoc missed

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));
+                else
+                    cache.remove(rnd.nextInt(maxKey));
+            }
+
+        }, 5, "change-loader-");
+
+        fut.get();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<Integer, Value> iterated = new HashMap<>();
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            ccfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                iterated.put(row.key().value(coctx, true), 
row.value().value(coctx, true));
+            }
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+        assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+        snpCache.forEach(e -> {
+            Value val = iterated.remove(e.getKey());
+
+            assertNotNull(val);
+            assertEquals(val.arr().length, e.getValue().arr().length);
+        });
+
+        assertTrue(iterated.isEmpty());
+    }
+
+    /** @throws Exception If fails */
+    @Test
+    public void testSnapshotIterator() throws Exception {
+        int keys = 127;
+
+        IgniteEx ignite = startGridsWithCache(2,
+            dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 
1)), keys);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                // Invariant for cache: cache key always equals to cache value.
+                assertEquals("Invalid key/value pair [key=" + row.key() + ", 
val=" + row.value() + ']',
+                    row.key().value(coctx, false, 
U.resolveClassLoader(ignite.configuration())),
+                    (Integer)row.value().value(coctx, false));
+
+                rows++;
+            }
+        }
+
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorLargeRows() throws Exception {
+        int keys = 2;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+        forceCheckpoint();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                assertEquals(12008, ((Value)row.value().value(coctx, 
false)).arr().length);
+                assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+                rows++;
+            }
         }
 
-        return result;
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup() 
throws Exception {
+        int keysPerCache = 3700;
+        int cacheValSize = 33;
+        int rowsOnPage = 37;
+
+        CacheConfiguration<Integer, Value> ccfg1 = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+        CacheConfiguration<Integer, Value> ccfg2 = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx2"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        ccfg1.setGroupName(DEFAULT_CACHE_NAME);
+        ccfg2.setGroupName(DEFAULT_CACHE_NAME);
+
+        IgniteEx ignite = startGridsWithCache(1, keysPerCache, k -> new 
Value(new byte[cacheValSize]), ccfg1, ccfg2);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        AtomicReference<ByteBuffer> buffRef = new AtomicReference<>();
+        AtomicReference<Long> pageIdRef = new AtomicReference<>();
+
+        snapshotStoreFactory(ignite, new BiFunction<Integer, Boolean, 
FileVersionCheckingFactory>() {
+            @Override public FileVersionCheckingFactory apply(Integer grpId, 
Boolean encrypted) {
+                FilePageStoreManager storeMgr = 
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+                return new 
FileVersionCheckingFactory(storeMgr.getPageStoreFileIoFactory(), 
storeMgr.getPageStoreFileIoFactory(), storeMgr::pageSize) {
+                    @Override public PageStore createPageStore(
+                        byte type,
+                        IgniteOutClosure<Path> pathProvider,
+                        LongConsumer allocatedTracker
+                    ) {
+                        return new FilePageStoreV2(
+                            type,
+                            pathProvider,
+                            storeMgr.getPageStoreFileIoFactory(),
+                            storeMgr.pageSize(),
+                            allocatedTracker
+                        ) {
+                            @Override public boolean read(
+                                long pageId,
+                                ByteBuffer pageBuf,
+                                boolean keepCrc
+                            ) throws IgniteCheckedException {
+                                buffRef.set(pageBuf);
+                                pageIdRef.set(pageId);
+
+                                return super.read(pageId, pageBuf, keepCrc);
+                            }
+                        };
+                    }
+                };
+            }
+        });
+
+        int currRows = 0;
+        int rowsCnt;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex("tx1").context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                DataPageIO io = PageIO.getPageIO(T_DATA, 
PageIO.getVersion(buffRef.get()));
+
+                long pageAddr = GridUnsafe.bufferAddress(buffRef.get());
+                rowsCnt = io.getDirectCount(pageAddr);
+
+                assertEquals(cacheValSize, ((Value)row.value().value(coctx, 
false)).arr().length);
+                assertEquals(rowsOnPage, rowsCnt);
+
+                currRows++;
+            }
+        }
+
+        assertEquals(keysPerCache, currRows / 2);
+
+        // Remove all rows in the middle of the page.
+        for (int k = 0; k < keysPerCache; k++) {
+            if (k % rowsOnPage == 1 || k % rowsOnPage == (rowsOnPage - 1))
+                continue;
+
+            ignite.getOrCreateCache(ccfg1).remove(k);
+            ignite.getOrCreateCache(ccfg2).remove(k);
+        }
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME + 2).get();
+
+        int indRowsCnt;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME + 2,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex("tx1").context().cacheObjectContext();

Review comment:
       tx1 duplicate

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));
+                else
+                    cache.remove(rnd.nextInt(maxKey));
+            }
+
+        }, 5, "change-loader-");
+
+        fut.get();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<Integer, Value> iterated = new HashMap<>();
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            ccfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                iterated.put(row.key().value(coctx, true), 
row.value().value(coctx, true));
+            }
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+        assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+        snpCache.forEach(e -> {
+            Value val = iterated.remove(e.getKey());
+
+            assertNotNull(val);
+            assertEquals(val.arr().length, e.getValue().arr().length);
+        });
+
+        assertTrue(iterated.isEmpty());
+    }
+
+    /** @throws Exception If fails */
+    @Test
+    public void testSnapshotIterator() throws Exception {
+        int keys = 127;
+
+        IgniteEx ignite = startGridsWithCache(2,
+            dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 
1)), keys);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                // Invariant for cache: cache key always equals to cache value.
+                assertEquals("Invalid key/value pair [key=" + row.key() + ", 
val=" + row.value() + ']',
+                    row.key().value(coctx, false, 
U.resolveClassLoader(ignite.configuration())),
+                    (Integer)row.value().value(coctx, false));
+
+                rows++;
+            }
+        }
+
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorLargeRows() throws Exception {
+        int keys = 2;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+        forceCheckpoint();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                assertEquals(12008, ((Value)row.value().value(coctx, 
false)).arr().length);

Review comment:
       Should be const.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws 
Exception {
         snpFut.get(5_000, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * @param src Source node to calculate.
-     * @param grps Groups to collect owning parts.
-     * @param rmtNodeId Remote node id.
-     * @return Map of collected parts.
-     */
-    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, 
Set<Integer> grps, UUID rmtNodeId) {
-        Map<Integer, Set<Integer>> result = new HashMap<>();
-
-        for (Integer grpId : grps) {
-            Set<Integer> parts = src.context()
-                .cache()
-                .cacheGroup(grpId)
-                .topology()
-                .partitions(rmtNodeId)
-                .entrySet()
-                .stream()
-                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
-
-            result.put(grpId, parts);
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorRandomizedLoader() throws Exception {
+        Random rnd = new Random();
+        int maxKey = 15_000;
+        int maxValSize = 32_768;
+        int loadingTimeMs = 60_000;
+
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>("tx1"))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new 
Value(new byte[1024]), ccfg);
+
+        IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+        long startTime = U.currentTimeMillis();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (!Thread.currentThread().isInterrupted() && startTime + 
loadingTimeMs > U.currentTimeMillis()) {
+                if (rnd.nextBoolean())
+                    cache.put(rnd.nextInt(15_000), new Value(new 
byte[rnd.nextInt(maxValSize)]));
+                else
+                    cache.remove(rnd.nextInt(maxKey));
+            }
+
+        }, 5, "change-loader-");
+
+        fut.get();
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        Map<Integer, Value> iterated = new HashMap<>();
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            ccfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                iterated.put(row.key().value(coctx, true), 
row.value().value(coctx, true));
+            }
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+        assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+        snpCache.forEach(e -> {
+            Value val = iterated.remove(e.getKey());
+
+            assertNotNull(val);
+            assertEquals(val.arr().length, e.getValue().arr().length);
+        });
+
+        assertTrue(iterated.isEmpty());
+    }
+
+    /** @throws Exception If fails */
+    @Test
+    public void testSnapshotIterator() throws Exception {
+        int keys = 127;
+
+        IgniteEx ignite = startGridsWithCache(2,
+            dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 
1)), keys);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        int rows = 0;
+
+        try (GridCloseableIterator<CacheDataRow> iter = 
snp(ignite).partitionRows(SNAPSHOT_NAME,
+            ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+            dfltCacheCfg.getName(),
+            0)
+        ) {
+            CacheObjectContext coctx = 
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+            while (iter.hasNext()) {
+                CacheDataRow row = iter.next();
+
+                // Invariant for cache: cache key always equals to cache value.
+                assertEquals("Invalid key/value pair [key=" + row.key() + ", 
val=" + row.value() + ']',
+                    row.key().value(coctx, false, 
U.resolveClassLoader(ignite.configuration())),
+                    (Integer)row.value().value(coctx, false));
+
+                rows++;
+            }
+        }
+
+        assertEquals("Invalid number of rows: " + rows, keys, rows);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotIteratorLargeRows() throws Exception {
+        int keys = 2;
+        CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new 
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+            .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+        IgniteEx ignite = startGridsWithoutCache(2);
+
+        for (int i = 0; i < keys; i++)
+            ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));

Review comment:
       12008 should be const with an explanation of why.
   Should be calculated using page size, since default may be changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to