anton-vinogradov commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r585330088
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1113,6 +1136,58 @@ private static String snapshotMetaFileName(String
consId) {
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}
+ /**
+ * @param snpName Snapshot name.
+ * @param folderName The node folder name, usually it's the same as the
U.maskForFileName(consistentId).
+ * @param grpName Cache group name.
+ * @param partId Partition id.
+ * @return Iterator over partition.
+ */
+ public GridCloseableIterator<CacheDataRow> partitionRows(String snpName,
+ String folderName,
+ String grpName,
+ int partId
+ ) throws IgniteCheckedException {
+ File snpDir = snapshotLocalDir(snpName);
+
+ if (!snpDir.exists())
+ throw new IgniteCheckedException("Snapshot directory doesn't
exists: " + snpDir.getAbsolutePath());
+
+ File nodePath = new File(snpDir, databaseRelativePath(folderName));
+
+ if (!nodePath.exists())
+ throw new IgniteCheckedException("Consistent id directory doesn't
exists: " + nodePath.getAbsolutePath());
+
+ List<File> grps = cacheDirectories(nodePath, name ->
name.equals(grpName));
+
+ if (F.isEmpty(grps) || grps.size() > 1)
+ throw new IgniteCheckedException("Snapshot cache group not found
[dir=" + snpDir.getAbsolutePath() + ", grpName=" + grpName + ']');
Review comment:
`Snapshot cache group not found` is an incorrect statement when
`grps.size() > 1`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
Review comment:
Tail pages, contain only part of the value, to be skipped during the
second iterations (will be processed via headers processing).
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
+ private final BitSet markedPages;
+
+ /** Pages which already read and must be skipped. */
+ private final BitSet readPages;
+
+ /** Batch of rows read through iteration. */
+ private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+ /** {@code true} if the iteration though partition reached its end. */
+ private boolean secondScanComplete;
+
+ /**
+ * Current partition page index for read. Due to we read the partition
twice it
+ * can't be greater that 2 * store.size().
+ */
+ private int currIdx;
+
+ /**
+ * During scanning a cache partition presented as {@code PageStore} we
must guarantee the following:
+ * all the pages of this storage remains unchanged during the Iterator
remains opened, the stored data
+ * keeps its consistency. We can't read the {@code PageStore} during
an ongoing checkpoint over it.
+ *
+ * @param coctx Cache object context.
+ * @param store Page store to read.
+ * @param partId Partition id.
+ * @throws IgniteCheckedException If fails.
+ */
+ public PageScanIterator(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ PageStore store,
+ int partId
+ ) throws IgniteCheckedException {
+ this.store = store;
+ this.partId = partId;
+ this.coctx = coctx;
+ this.sctx = sctx;
+
+ store.ensure();
+ pages = store.pages();
+ markedPages = new BitSet(pages);
+ readPages = new BitSet(pages);
+
+ locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc */
+ @Override protected CacheDataRow onNext() throws
IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ throw new NoSuchElementException("[partId=" + partId + ",
store=" + store + ", skipPages=" + readPages + ']');
+
+ return rows.poll();
+ }
+
+ /** {@inheritDoc */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ return false;
+
+ try {
+ for (; currIdx < 2 * pages && rows.isEmpty(); currIdx++) {
+ boolean first = currIdx < pages;
+ int pageIdx = currIdx % pages;
+
+ if (readPages.get(pageIdx) || (!first &&
markedPages.get(pageIdx)))
+ continue;
+
+ if (!readPageFromStore(pageId(partId, FLAG_DATA, pageIdx),
locBuff)) {
+ // Skip not FLAG_DATA pages.
+ changeBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ long pageAddr = bufferAddress(locBuff);
+ DataPageIO io = getPageIO(T_DATA, getVersion(pageAddr));
+ int freeSpace = io.getFreeSpace(pageAddr);
+ int rowsCnt = io.getDirectCount(pageAddr);
+
+ if (first) {
+ // Skip empty pages.
+ if (rowsCnt == 0) {
+ changeBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ // There is no difference between a page containing an
incomplete DataRow fragment and
+ // the page where DataRow takes up all the free space.
There is no a dedicated
+ // flag for this case in page header.
+ // During the storage scan we can skip such pages at
the first iteration over the partition file,
+ // since all the fragmented pages will be marked by
BitSet array we will safely read the others
+ // on the second iteration.
+ if (freeSpace == 0 && rowsCnt == 1) {
+ DataPagePayload payload = io.readPayload(pageAddr,
0, locBuff.capacity());
+
+ long link = payload.nextLink();
+
+ if (link != 0)
+ changeBit(markedPages,
pageIndex(pageId(link)));
+
+ continue;
+ }
+ }
+
+ changeBit(readPages, pageIdx);
+
+ for (int itemId = 0; itemId < rowsCnt; itemId++) {
+ DataRow row = new DataRow();
+
+ row.partition(partId);
+
+ row.initFromPageBuffer(
+ sctx,
+ coctx,
+ new IgniteThrowableFunction<Long, ByteBuffer>() {
+ @Override public ByteBuffer apply(Long
nextPageId) throws IgniteCheckedException {
+ boolean success =
readPageFromStore(nextPageId, fragmentBuff);
+
+ assert success : "Only FLAG_DATA pages
allowed: " + toDetailString(nextPageId);
+
+ // Fragment of page has been read, might
be skipped further.
+ changeBit(readPages,
pageIndex(nextPageId));
+
+ return fragmentBuff;
+ }
+ },
+ locBuff,
+ io,
+ itemId,
+ false,
+ CacheDataRowAdapter.RowData.FULL,
+ false);
+
+ rows.add(row);
+ }
+ }
+
+ if (currIdx == 2 * pages) {
+ secondScanComplete = true;
+
+ boolean set = true;
+
+ for (int j = 0; j < pages; j++)
+ set &= readPages.get(j);
+
+ assert set : "readPages=" + readPages + ", pages=" + pages;
+ }
+
+ return !rows.isEmpty();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Error during iteration
through page store: " + this, e);
+ }
+ }
+
+ /**
+ * @param bitSet BitSet to change bit index.
+ * @param idx Index of bit to change.
+ */
+ private static void changeBit(BitSet bitSet, int idx) {
Review comment:
Since we "changing", we should pass the value?
Another case is to replace `change` with `set`.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder,
readCacheId, rowData, null, skipVer);
}
+ /**
+ * @param io Data page IO.
+ * @param itemId Row item Id.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final void initFromPageBuffer(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ IgniteThrowableFunction<Long, ByteBuffer> reader,
+ ByteBuffer pageBuff,
+ DataPageIO io,
+ int itemId,
+ boolean readCacheId,
+ RowData rowData,
+ boolean skipVer
+ ) throws IgniteCheckedException {
+ long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+ IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx,
pageBuff.capacity(), pageBuff.capacity(),
+ pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+ if (incomplete == null)
+ return;
+
+ long nextLink = incomplete.getNextLink();
+
+ if (nextLink == 0)
+ return;
+
+ do {
+ long pageId = pageId(nextLink);
+
+ try {
Review comment:
Could we minimize `try` section?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
Review comment:
Do we really need a loop here?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success0);
+
+ long newPartCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 =
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(0));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished,
found 1 conflict partitions");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCompareHashes() throws Exception {
+ Random rnd = new Random();
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new
HashMap<>();
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new
HashMap<>();
+
+ IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new
TestVisorBackupPartitionsTask(idleHashes),
+ new VisorIdleVerifyTaskArg(new
HashSet<>(Collections.singletonList(ccfg.getName())),
+ new HashSet<>(),
+ false,
+ CacheFilterEnum.USER,
+ true));
+
+ IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new
TestSnapshotPartitionsVerifyTask(snpHashes),
+ Collections.singletonMap(ignite.cluster().localNode(),
+
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME,
(String)ignite.configuration().getConsistentId()))));
+
+ assertEquals(idleHashes, snpHashes);
+ assertEquals(idleVerifyRes, snpVerifyRes);
+ }
+
+ /** */
+ private static class TestVisorBackupPartitionsTask extends
VerifyBackupPartitionsTaskV2 {
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
Review comment:
javadoc missed
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1113,6 +1142,58 @@ private static String snapshotMetaFileName(String
consId) {
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}
+ /**
+ * @param snpName Snapshot name.
+ * @param folderName The node folder name, usually it's the same as the
U.maskForFileName(consistentId).
+ * @param grpName Cache group name.
+ * @param partId Partition id.
+ * @return Iterator over partition.
+ */
+ public GridCloseableIterator<CacheDataRow> partitionRows(String snpName,
Review comment:
Not sure, but can we have *iterator name for this method?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
Review comment:
Should we use maxKey instead of 15_000?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(12008, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+ rows++;
+ }
}
- return result;
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup()
throws Exception {
+ int keysPerCache = 3700;
+ int cacheValSize = 33;
+ int rowsOnPage = 37;
+
+ CacheConfiguration<Integer, Value> ccfg1 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+ CacheConfiguration<Integer, Value> ccfg2 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx2"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ ccfg1.setGroupName(DEFAULT_CACHE_NAME);
+ ccfg2.setGroupName(DEFAULT_CACHE_NAME);
+
+ IgniteEx ignite = startGridsWithCache(1, keysPerCache, k -> new
Value(new byte[cacheValSize]), ccfg1, ccfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ AtomicReference<ByteBuffer> buffRef = new AtomicReference<>();
+ AtomicReference<Long> pageIdRef = new AtomicReference<>();
+
+ snapshotStoreFactory(ignite, new BiFunction<Integer, Boolean,
FileVersionCheckingFactory>() {
+ @Override public FileVersionCheckingFactory apply(Integer grpId,
Boolean encrypted) {
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+ return new
FileVersionCheckingFactory(storeMgr.getPageStoreFileIoFactory(),
storeMgr.getPageStoreFileIoFactory(), storeMgr::pageSize) {
+ @Override public PageStore createPageStore(
+ byte type,
+ IgniteOutClosure<Path> pathProvider,
+ LongConsumer allocatedTracker
+ ) {
+ return new FilePageStoreV2(
+ type,
+ pathProvider,
+ storeMgr.getPageStoreFileIoFactory(),
+ storeMgr.pageSize(),
+ allocatedTracker
+ ) {
+ @Override public boolean read(
+ long pageId,
+ ByteBuffer pageBuf,
+ boolean keepCrc
+ ) throws IgniteCheckedException {
+ buffRef.set(pageBuf);
+ pageIdRef.set(pageId);
+
+ return super.read(pageId, pageBuf, keepCrc);
+ }
+ };
+ }
+ };
+ }
+ });
+
+ int currRows = 0;
+ int rowsCnt;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex("tx1").context().cacheObjectContext();
Review comment:
tx1 duplicate
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(12008, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+ rows++;
+ }
}
- return result;
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup()
throws Exception {
Review comment:
As decided, this test checks internals already checked by values read on
recovery.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
+ private final BitSet markedPages;
Review comment:
Let's rename them to tailPages or something like this.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
##########
@@ -223,6 +228,66 @@ public static String formatUpdateCountersDiff(IgniteEx ig,
List<Integer> diff) {
return diff;
}
+ /**
+ * @param updCntr Partition update counter prior check.
+ * @param grpId Group id.
+ * @param partId Partition id.
+ * @param grpName Group name.
+ * @param consId Local node consistent id.
+ * @param state Partition state to check.
+ * @param isPrimary {@code true} if partition is primary.
+ * @param partSize Partition size on disk.
+ * @param it Iterator though partition data rows.
+ * @throws IgniteCheckedException If fails.
+ * @return Map of calculated partition.
+ */
+ public static Map<PartitionKeyV2, PartitionHashRecordV2>
calculatePartitionHash(
Review comment:
Always returns singletonMap, is there any reason to return the map in
this case?
Also, because of the method name, we expect not a map, but something related
to the partition.
Also, this looks odd when we have the same (partId, grpId) at the input and
at the output as well.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder,
readCacheId, rowData, null, skipVer);
}
+ /**
+ * @param io Data page IO.
+ * @param itemId Row item Id.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final void initFromPageBuffer(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ IgniteThrowableFunction<Long, ByteBuffer> reader,
+ ByteBuffer pageBuff,
+ DataPageIO io,
+ int itemId,
+ boolean readCacheId,
+ RowData rowData,
+ boolean skipVer
+ ) throws IgniteCheckedException {
+ long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+ IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx,
pageBuff.capacity(), pageBuff.capacity(),
+ pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+ if (incomplete == null)
+ return;
+
+ long nextLink = incomplete.getNextLink();
+
+ if (nextLink == 0)
+ return;
+
+ do {
+ long pageId = pageId(nextLink);
+
+ try {
+ ByteBuffer fragmentBuff = reader.apply(pageId);
+
+ long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+ DataPageIO io2 = PageIO.getPageIO(T_DATA,
PageIO.getVersion(fragmentBuff));
+
+ incomplete = readIncomplete(incomplete, sctx, coctx,
fragmentBuff.capacity(), fragmentBuff.capacity(),
+ fragmentAddr, itemId(nextLink), io2, rowData, readCacheId,
skipVer);
+
+ if (incomplete == null)
+ return;
+
+ nextLink = incomplete.getNextLink();
+ }
+ catch (Exception e) {
+ throw new IgniteException("Error during reading DataRow
[pageId=" + pageId + ']', e);
+ }
+ }
+ while (nextLink != 0);
+
+ assert isReady() : "ready";
Review comment:
String after the assert should represent a "fail description".
`ready` seems to be a "good case" :)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
##########
@@ -504,105 +500,63 @@ private boolean
isCacheMatchFilter(DynamicCacheDescriptor desc) {
}
/**
- * @param grpCtx Group context.
+ * @param gctx Group context.
* @param part Local partition.
*/
private Future<Map<PartitionKeyV2, PartitionHashRecordV2>>
calculatePartitionHashAsync(
Review comment:
should we replace Map with Tuple here?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
Review comment:
Let's provide a comment that explains that we're changing the value at
one of the nodes to get a hash check issue.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldExtractionSelfTest.java
##########
@@ -276,7 +276,7 @@ public void testDecimalFieldMarshalling() throws Exception {
* @param marsh Binary marshaller.
* @return Binary object.
*/
- protected BinaryObjectImpl toBinary(Object obj, BinaryMarshaller marsh)
throws Exception {
+ public static BinaryObjectImpl toBinary(Object obj, BinaryMarshaller
marsh) throws Exception {
Review comment:
Seems to be not a proper solution to make this static to use at
different test.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
Review comment:
Do we have another `success` to call this `*0`?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success0);
+
+ long newPartCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 =
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(0));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished,
found 1 conflict partitions");
Review comment:
Should we check that hash verification fail is the failure reason?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
Review comment:
Let's think about naming. Now it tells that iterator will scan the page.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder,
readCacheId, rowData, null, skipVer);
}
+ /**
+ * @param io Data page IO.
+ * @param itemId Row item Id.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final void initFromPageBuffer(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ IgniteThrowableFunction<Long, ByteBuffer> reader,
+ ByteBuffer pageBuff,
+ DataPageIO io,
+ int itemId,
+ boolean readCacheId,
+ RowData rowData,
+ boolean skipVer
+ ) throws IgniteCheckedException {
+ long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+ IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx,
pageBuff.capacity(), pageBuff.capacity(),
+ pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+ if (incomplete == null)
+ return;
+
+ long nextLink = incomplete.getNextLink();
+
+ if (nextLink == 0)
+ return;
+
+ do {
+ long pageId = pageId(nextLink);
+
+ try {
+ ByteBuffer fragmentBuff = reader.apply(pageId);
+
+ long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+ DataPageIO io2 = PageIO.getPageIO(T_DATA,
PageIO.getVersion(fragmentBuff));
+
+ incomplete = readIncomplete(incomplete, sctx, coctx,
fragmentBuff.capacity(), fragmentBuff.capacity(),
+ fragmentAddr, itemId(nextLink), io2, rowData, readCacheId,
skipVer);
+
+ if (incomplete == null)
+ return;
+
+ nextLink = incomplete.getNextLink();
+ }
+ catch (Exception e) {
Review comment:
Coul we set strict type here, eg. IgniteCheckedException thrown by
`readIncomplete`?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
Review comment:
Let's replace 0 by constant referencing to partition id at the whole
test.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
Review comment:
According to its name, this test should check hashes misses, but it
checks hashes validation fail on different content.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success0);
+
+ long newPartCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 =
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(0));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished,
found 1 conflict partitions");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCompareHashes() throws Exception {
Review comment:
testClusterSnapshotHashesTheSameAsIdleVerifyHashes? %)
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success0);
+
+ long newPartCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 =
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(0));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished,
found 1 conflict partitions");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCompareHashes() throws Exception {
+ Random rnd = new Random();
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new
HashMap<>();
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new
HashMap<>();
+
+ IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new
TestVisorBackupPartitionsTask(idleHashes),
+ new VisorIdleVerifyTaskArg(new
HashSet<>(Collections.singletonList(ccfg.getName())),
+ new HashSet<>(),
+ false,
+ CacheFilterEnum.USER,
+ true));
+
+ IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new
TestSnapshotPartitionsVerifyTask(snpHashes),
+ Collections.singletonMap(ignite.cluster().localNode(),
+
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME,
(String)ignite.configuration().getConsistentId()))));
+
+ assertEquals(idleHashes, snpHashes);
+ assertEquals(idleVerifyRes, snpVerifyRes);
+ }
+
+ /** */
+ private static class TestVisorBackupPartitionsTask extends
VerifyBackupPartitionsTaskV2 {
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
+
+ /**
+ * @param hashes Map of calculated partition hashes.
+ */
+ public TestVisorBackupPartitionsTask(Map<PartitionKeyV2,
List<PartitionHashRecordV2>> hashes) {
Review comment:
Looks like TestSnapshotPartitionsVerifyTask code equals this code.
Can we resolve the duplication issue?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
##########
@@ -315,4 +347,183 @@ public void testClusterSnapshotCheckCRCFail() throws
Exception {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex,
IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckMissedHashes() throws Exception {
+ int keys = 1;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx =
((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache =
ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it =
cache.context().offheap().partitionIterator(0);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer =
cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = toBinary(new Value(bytes),
binCtx.marshaller());
+
+ boolean success0 = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success0);
+
+ long newPartCtr =
cache.context().offheap().lastUpdatedPartitionCounter(0);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 =
U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(0));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished,
found 1 conflict partitions");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCompareHashes() throws Exception {
+ Random rnd = new Random();
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleHashes = new
HashMap<>();
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpHashes = new
HashMap<>();
+
+ IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new
TestVisorBackupPartitionsTask(idleHashes),
+ new VisorIdleVerifyTaskArg(new
HashSet<>(Collections.singletonList(ccfg.getName())),
+ new HashSet<>(),
+ false,
+ CacheFilterEnum.USER,
+ true));
+
+ IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new
TestSnapshotPartitionsVerifyTask(snpHashes),
+ Collections.singletonMap(ignite.cluster().localNode(),
+
Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME,
(String)ignite.configuration().getConsistentId()))));
+
+ assertEquals(idleHashes, snpHashes);
+ assertEquals(idleVerifyRes, snpVerifyRes);
+ }
+
+ /** */
+ private static class TestVisorBackupPartitionsTask extends
VerifyBackupPartitionsTaskV2 {
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
+
+ /**
+ * @param hashes Map of calculated partition hashes.
+ */
+ public TestVisorBackupPartitionsTask(Map<PartitionKeyV2,
List<PartitionHashRecordV2>> hashes) {
+ this.hashes = hashes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable IdleVerifyResultV2
reduce(List<ComputeJobResult> results) throws IgniteException {
+ IdleVerifyResultV2 res = super.reduce(results);
+
+ for (ComputeJobResult job : results) {
+ if (job.getException() != null)
+ continue;
+
+ job.<Map<PartitionKeyV2,
PartitionHashRecordV2>>getData().forEach((k, v) ->
+ hashes.computeIfAbsent(k, k0 -> new ArrayList<>()).add(v));
+ }
+
+ return res;
+ }
+ }
+
+ /** */
+ private static class TestSnapshotPartitionsVerifyTask extends
SnapshotPartitionsVerifyTask {
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes;
Review comment:
javadoc missed
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(12008, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+ rows++;
+ }
}
- return result;
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorDirectIndirectCountersWithCacheGroup()
throws Exception {
+ int keysPerCache = 3700;
+ int cacheValSize = 33;
+ int rowsOnPage = 37;
+
+ CacheConfiguration<Integer, Value> ccfg1 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+ CacheConfiguration<Integer, Value> ccfg2 = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx2"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ ccfg1.setGroupName(DEFAULT_CACHE_NAME);
+ ccfg2.setGroupName(DEFAULT_CACHE_NAME);
+
+ IgniteEx ignite = startGridsWithCache(1, keysPerCache, k -> new
Value(new byte[cacheValSize]), ccfg1, ccfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ AtomicReference<ByteBuffer> buffRef = new AtomicReference<>();
+ AtomicReference<Long> pageIdRef = new AtomicReference<>();
+
+ snapshotStoreFactory(ignite, new BiFunction<Integer, Boolean,
FileVersionCheckingFactory>() {
+ @Override public FileVersionCheckingFactory apply(Integer grpId,
Boolean encrypted) {
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+ return new
FileVersionCheckingFactory(storeMgr.getPageStoreFileIoFactory(),
storeMgr.getPageStoreFileIoFactory(), storeMgr::pageSize) {
+ @Override public PageStore createPageStore(
+ byte type,
+ IgniteOutClosure<Path> pathProvider,
+ LongConsumer allocatedTracker
+ ) {
+ return new FilePageStoreV2(
+ type,
+ pathProvider,
+ storeMgr.getPageStoreFileIoFactory(),
+ storeMgr.pageSize(),
+ allocatedTracker
+ ) {
+ @Override public boolean read(
+ long pageId,
+ ByteBuffer pageBuf,
+ boolean keepCrc
+ ) throws IgniteCheckedException {
+ buffRef.set(pageBuf);
+ pageIdRef.set(pageId);
+
+ return super.read(pageId, pageBuf, keepCrc);
+ }
+ };
+ }
+ };
+ }
+ });
+
+ int currRows = 0;
+ int rowsCnt;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex("tx1").context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ DataPageIO io = PageIO.getPageIO(T_DATA,
PageIO.getVersion(buffRef.get()));
+
+ long pageAddr = GridUnsafe.bufferAddress(buffRef.get());
+ rowsCnt = io.getDirectCount(pageAddr);
+
+ assertEquals(cacheValSize, ((Value)row.value().value(coctx,
false)).arr().length);
+ assertEquals(rowsOnPage, rowsCnt);
+
+ currRows++;
+ }
+ }
+
+ assertEquals(keysPerCache, currRows / 2);
+
+ // Remove all rows in the middle of the page.
+ for (int k = 0; k < keysPerCache; k++) {
+ if (k % rowsOnPage == 1 || k % rowsOnPage == (rowsOnPage - 1))
+ continue;
+
+ ignite.getOrCreateCache(ccfg1).remove(k);
+ ignite.getOrCreateCache(ccfg2).remove(k);
+ }
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME + 2).get();
+
+ int indRowsCnt;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME + 2,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex("tx1").context().cacheObjectContext();
Review comment:
tx1 duplicate
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(12008, ((Value)row.value().value(coctx,
false)).arr().length);
Review comment:
Should be const.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
##########
@@ -388,31 +403,266 @@ public void testLocalSnapshotOnCacheStopped() throws
Exception {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src,
Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 60_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new
Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (!Thread.currentThread().isInterrupted() && startTime +
loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(15_000), new Value(new
byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 5, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true),
row.value().value(coctx, true));
+ }
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false,
1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter =
snp(ignite).partitionRows(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx =
ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ",
val=" + row.value() + ']',
+ row.key().value(coctx, false,
U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new
CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[12008]));
Review comment:
12008 should be const with an explanation of why.
Should be calculated using page size, since default may be changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]