Mmuzaf commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r591748020



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
         doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, 
readCacheId, rowData, null, skipVer);
     }
 
+    /**
+     * @param io Data page IO.
+     * @param itemId Row item Id.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromPageBuffer(
+        GridCacheSharedContext<?, ?> sctx,
+        CacheObjectContext coctx,
+        IgniteThrowableFunction<Long, ByteBuffer> reader,
+        ByteBuffer pageBuff,
+        DataPageIO io,
+        int itemId,
+        boolean readCacheId,
+        RowData rowData,
+        boolean skipVer
+    ) throws IgniteCheckedException {
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx, 
pageBuff.capacity(), pageBuff.capacity(),
+            pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+        if (incomplete == null)
+            return;
+
+        long nextLink = incomplete.getNextLink();
+
+        if (nextLink == 0)
+            return;
+
+        do {
+            long pageId = pageId(nextLink);
+
+            try {
+                ByteBuffer fragmentBuff = reader.apply(pageId);
+
+                long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+                DataPageIO io2 = PageIO.getPageIO(T_DATA, 
PageIO.getVersion(fragmentBuff));
+
+                incomplete = readIncomplete(incomplete, sctx, coctx, 
fragmentBuff.capacity(), fragmentBuff.capacity(),
+                    fragmentAddr, itemId(nextLink), io2, rowData, readCacheId, 
skipVer);
+
+                if (incomplete == null)
+                    return;
+
+                nextLink = incomplete.getNextLink();
+            }
+            catch (Exception e) {

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
##########
@@ -160,6 +166,61 @@ public final void initFromLink(
         doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, 
readCacheId, rowData, null, skipVer);
     }
 
+    /**
+     * @param io Data page IO.
+     * @param itemId Row item Id.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromPageBuffer(
+        GridCacheSharedContext<?, ?> sctx,
+        CacheObjectContext coctx,
+        IgniteThrowableFunction<Long, ByteBuffer> reader,
+        ByteBuffer pageBuff,
+        DataPageIO io,
+        int itemId,
+        boolean readCacheId,
+        RowData rowData,
+        boolean skipVer
+    ) throws IgniteCheckedException {
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        IncompleteObject<?> incomplete = readIncomplete(null, sctx, coctx, 
pageBuff.capacity(), pageBuff.capacity(),
+            pageAddr, itemId, io, rowData, readCacheId, skipVer);
+
+        if (incomplete == null)
+            return;
+
+        long nextLink = incomplete.getNextLink();
+
+        if (nextLink == 0)
+            return;
+
+        do {
+            long pageId = pageId(nextLink);
+
+            try {
+                ByteBuffer fragmentBuff = reader.apply(pageId);
+
+                long fragmentAddr = GridUnsafe.bufferAddress(fragmentBuff);
+                DataPageIO io2 = PageIO.getPageIO(T_DATA, 
PageIO.getVersion(fragmentBuff));
+
+                incomplete = readIncomplete(incomplete, sctx, coctx, 
fragmentBuff.capacity(), fragmentBuff.capacity(),
+                    fragmentAddr, itemId(nextLink), io2, rowData, readCacheId, 
skipVer);
+
+                if (incomplete == null)
+                    return;
+
+                nextLink = incomplete.getNextLink();
+            }
+            catch (Exception e) {
+                throw new IgniteException("Error during reading DataRow 
[pageId=" + pageId + ']', e);
+            }
+        }
+        while (nextLink != 0);
+
+        assert isReady() : "ready";

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File 
to, long length) {
         }
     }
 
+    /**
+     * Ves pokrit assertami absolutely ves,
+     * PageScan iterator in the ignite core est.
+     */
+    private static class PageScanIterator extends 
GridCloseableIteratorAdapter<CacheDataRow> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Page store to iterate over. */
+        @GridToStringExclude
+        private final PageStore store;
+
+        /** Page store partition id. */
+        private final int partId;
+
+        /** Grid cache shared context. */
+        private final GridCacheSharedContext<?, ?> sctx;
+
+        /** Cache object context for key/value deserialization. */
+        private final CacheObjectContext coctx;
+
+        /** Buffer to read pages. */
+        private final ByteBuffer locBuff;
+
+        /** Buffer to read the rest part of fragmented rows. */
+        private final ByteBuffer fragmentBuff;
+
+        /** Total pages in the page store. */
+        private final int pages;
+
+        /** Pages which already marked and postponed to be read on the second 
iteration. */
+        private final BitSet markedPages;
+
+        /** Pages which already read and must be skipped. */
+        private final BitSet readPages;
+
+        /** Batch of rows read through iteration. */
+        private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+        /** {@code true} if the iteration though partition reached its end. */
+        private boolean secondScanComplete;
+
+        /**
+         * Current partition page index for read. Due to we read the partition 
twice it
+         * can't be greater that 2 * store.size().
+         */
+        private int currIdx;
+
+        /**
+         * During scanning a cache partition presented as {@code PageStore} we 
must guarantee the following:
+         * all the pages of this storage remains unchanged during the Iterator 
remains opened, the stored data
+         * keeps its consistency. We can't read the {@code PageStore} during 
an ongoing checkpoint over it.
+         *
+         * @param coctx Cache object context.
+         * @param store Page store to read.
+         * @param partId Partition id.
+         * @throws IgniteCheckedException If fails.
+         */
+        public PageScanIterator(
+            GridCacheSharedContext<?, ?> sctx,
+            CacheObjectContext coctx,
+            PageStore store,
+            int partId
+        ) throws IgniteCheckedException {
+            this.store = store;
+            this.partId = partId;
+            this.coctx = coctx;
+            this.sctx = sctx;
+
+            store.ensure();
+            pages = store.pages();
+            markedPages = new BitSet(pages);
+            readPages = new BitSet(pages);
+
+            locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+                .order(ByteOrder.nativeOrder());
+            fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+                .order(ByteOrder.nativeOrder());
+        }
+
+        /** {@inheritDoc */
+        @Override protected CacheDataRow onNext() throws 
IgniteCheckedException {
+            if (secondScanComplete && rows.isEmpty())
+                throw new NoSuchElementException("[partId=" + partId + ", 
store=" + store + ", skipPages=" + readPages + ']');
+
+            return rows.poll();
+        }
+
+        /** {@inheritDoc */
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            if (secondScanComplete && rows.isEmpty())
+                return false;
+
+            try {
+                for (; currIdx < 2 * pages && rows.isEmpty(); currIdx++) {
+                    boolean first = currIdx < pages;
+                    int pageIdx = currIdx % pages;
+
+                    if (readPages.get(pageIdx) || (!first && 
markedPages.get(pageIdx)))
+                        continue;
+
+                    if (!readPageFromStore(pageId(partId, FLAG_DATA, pageIdx), 
locBuff)) {
+                        // Skip not FLAG_DATA pages.
+                        changeBit(readPages, pageIdx);
+
+                        continue;
+                    }
+
+                    long pageAddr = bufferAddress(locBuff);
+                    DataPageIO io = getPageIO(T_DATA, getVersion(pageAddr));
+                    int freeSpace = io.getFreeSpace(pageAddr);
+                    int rowsCnt = io.getDirectCount(pageAddr);
+
+                    if (first) {
+                        // Skip empty pages.
+                        if (rowsCnt == 0) {
+                            changeBit(readPages, pageIdx);
+
+                            continue;
+                        }
+
+                        // There is no difference between a page containing an 
incomplete DataRow fragment and
+                        // the page where DataRow takes up all the free space. 
There is no a dedicated
+                        // flag for this case in page header.
+                        // During the storage scan we can skip such pages at 
the first iteration over the partition file,
+                        // since all the fragmented pages will be marked by 
BitSet array we will safely read the others
+                        // on the second iteration.
+                        if (freeSpace == 0 && rowsCnt == 1) {
+                            DataPagePayload payload = io.readPayload(pageAddr, 
0, locBuff.capacity());
+
+                            long link = payload.nextLink();
+
+                            if (link != 0)
+                                changeBit(markedPages, 
pageIndex(pageId(link)));
+
+                            continue;
+                        }
+                    }
+
+                    changeBit(readPages, pageIdx);
+
+                    for (int itemId = 0; itemId < rowsCnt; itemId++) {
+                        DataRow row = new DataRow();
+
+                        row.partition(partId);
+
+                        row.initFromPageBuffer(
+                            sctx,
+                            coctx,
+                            new IgniteThrowableFunction<Long, ByteBuffer>() {
+                                @Override public ByteBuffer apply(Long 
nextPageId) throws IgniteCheckedException {
+                                    boolean success = 
readPageFromStore(nextPageId, fragmentBuff);
+
+                                    assert success : "Only FLAG_DATA pages 
allowed: " + toDetailString(nextPageId);
+
+                                    // Fragment of page has been read, might 
be skipped further.
+                                    changeBit(readPages, 
pageIndex(nextPageId));
+
+                                    return fragmentBuff;
+                                }
+                            },
+                            locBuff,
+                            io,
+                            itemId,
+                            false,
+                            CacheDataRowAdapter.RowData.FULL,
+                            false);
+
+                        rows.add(row);
+                    }
+                }
+
+                if (currIdx == 2 * pages) {
+                    secondScanComplete = true;
+
+                    boolean set = true;
+
+                    for (int j = 0; j < pages; j++)
+                        set &= readPages.get(j);
+
+                    assert set : "readPages=" + readPages + ", pages=" + pages;
+                }
+
+                return !rows.isEmpty();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteCheckedException("Error during iteration 
through page store: " + this, e);
+            }
+        }
+
+        /**
+         * @param bitSet BitSet to change bit index.
+         * @param idx Index of bit to change.
+         */
+        private static void changeBit(BitSet bitSet, int idx) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to