Mmuzaf commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r584881974
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /** */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which must be skipped at the second iteration. */
+ private final AtomicBitSet skipPages;
+
+ /** Batch of rows read through iteration. */
+ private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+ /** {@code true} if the iteration reached its end. */
+ private boolean finished;
+
+ /**
+ * Current partition page index for read. Due to we read the partition
twice it
+ * can't be greater that 2 * store.size().
+ */
+ private int currIdx = -1;
+
+ /**
+ * @param coctx Cache object context.
+ * @param store Page store to read.
+ * @param partId Partition id.
+ * @throws IgniteCheckedException If fails.
+ */
+ public PageScanIterator(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ PageStore store,
+ int partId
+ ) throws IgniteCheckedException {
+ // CacheGroupContext may be null. It means that this cache group
is under eviction for now.
+ // Since the persisted cache groups and their partitions are
guarded by external machinery we
+ // can avoid it here.
+ this.store = store;
+ this.partId = partId;
+ this.coctx = coctx;
+ this.sctx = sctx;
+
+ store.sync();
+ pages = store.pages();
+ skipPages = new AtomicBitSet(pages);
+
+ locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc */
+ @Override protected CacheDataRow onNext() throws
IgniteCheckedException {
+ if (finished && rows.isEmpty())
+ throw new NoSuchElementException("[partId=" + partId + ",
store=" + store + ", skipPages=" + skipPages + ']');
+
+ return rows.poll();
+ }
+
+ /** {@inheritDoc */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (finished && rows.isEmpty())
+ return false;
+
+ try {
+ while (rows.isEmpty() && ++currIdx < 2 * pages) {
+ int pageIdx = currIdx % pages;
+ boolean firstScan = currIdx < pages;
+
+ if (skipPages.check(pageIdx))
+ continue;
+
+ locBuff.clear();
+
+ long pageId = PageIdUtils.pageId(partId,
PageIdAllocator.FLAG_DATA, pageIdx);
+ boolean success = store.read(pageId, locBuff, true);
+
+ assert success : PageIdUtils.toDetailString(pageId);
+
+ // Skip not data pages.
+ if (firstScan && PageIO.getType(locBuff) != T_DATA) {
+ skipPages.touch(pageIdx);
+
+ continue;
+ }
+
+ long pageAddr = GridUnsafe.bufferAddress(locBuff);
+
+ DataPageIO io = PageIO.getPageIO(T_DATA,
PageIO.getVersion(locBuff));
+ int freeSpace = io.getFreeSpace(pageAddr);
+ int rowsCnt = io.getDirectCount(pageAddr);
+
+ if (firstScan && rowsCnt == 0) {
+ skipPages.touch(pageIdx);
+
+ continue;
+ }
+
+ // For pages which contains only the incomplete fragment
of a data row
+ // the rowsCnt will always be equal to 1. Skip such pages
and read them
+ // on the second iteration.
Review comment:
I've updated the comment, please, take a look.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]