nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336984067
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##########
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO<L>
         }
     }
 
+    /**
+     * Cursor that filters and removes rows.
+     *
+     * Most rows are expected to be deleted "in place" on a leaf page.
+     * The last row and the rightmost row on a page are deleted starting from 
root.
+     */
+    private class PurgeCursor implements GridCursor<Void> {
+        /** Row filter. */
+        private final TreeRowClosure<L, T> clo;
+
+        /** Current page id. */
+        private long curPageId = 0L;
+
+        /** Last row to get back to if current page turns out removed. */
+        private L lastRow = null;
+
+        /** Row to remove starting "from root". */
+        private L removeRow = null;
+
+        /**
+         * Constructor.
+         *
+         * @param clo Row filter.
+         */
+        PurgeCursor(TreeRowClosure clo) {
+            this.clo = clo;
+        }
+
+        /**
+         * Initializes the cursor.
+         *
+         * @throws IgniteCheckedException If failed.
+         */
+        void start() throws IgniteCheckedException {
+            checkDestroyed();
+
+            long metaPage = acquirePage(metaPageId);
+            try {
+                curPageId = getFirstPageId(metaPageId, metaPage, 0);
+            }
+            finally {
+                releasePage(metaPageId, metaPage);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() throws IgniteCheckedException {
+            checkDestroyed();
+
+            if (curPageId == 0L && lastRow == null)
+                return false;
+
+            return next0(curPageId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void get() throws IgniteCheckedException {
+            checkDestroyed();
+
+            try {
+                doPurge(curPageId);
+
+                if (removeRow != null) {
+                    doRemove(removeRow, false);
+
+                    lastRow = removeRow;
+                    removeRow = null;
+                }
+            }
+            catch (RuntimeException | AssertionError e) {
+                throw new BPlusTreeRuntimeException(e, grpId);
+            }
+
+            return null;
+        }
+
+        /**
+         * Continues scanning the leaf pages of the tree.
+         *
+         * @param pageId Page id to start with.
+         * @return {@code true} if there is a row(s) to remove.
+         * @throws IgniteCheckedException If failed.
+         */
+        private boolean next0(long pageId) throws IgniteCheckedException {
+            for (;;) {
+                if (pageId == 0L && lastRow != null) {
+                    ReinitHelper reinit = new ReinitHelper(lastRow);
+
+                    doFind(reinit);
+
+                    pageId = reinit.pageId();
+                }
+
+                long page = acquirePage(pageId);
+                try {
+                    long pageAddr = readLock(pageId, page);
+
+                    if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+                        pageId = 0L;
+
+                        continue;
+                    }
+
+                    try {
+                        for (;;) {
+                            BPlusIO<L> io = io(pageAddr);
+
+                            assert io.isLeaf();
+
+                            long nextPageId = io.getForward(pageAddr);
+
+                            int cnt = io.getCount(pageAddr);
+
+                            if (cnt == 0) {
+                                assert nextPageId == 0L;
+
+                                return false;
+                            }
+
+                            for (int idx = 0; idx < cnt; idx++) {
+                                if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+                                    lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+                                    curPageId = pageId;
+
+                                    return true;
+                                }
+                            }
+                            lastRow = null;
+
+                            if (nextPageId == 0L)
+                                return false;
+
+                            long nextPage = acquirePage(nextPageId);
+                            try {
+                                long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+                                assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+                                try {
+                                    long pa = pageAddr;
+                                    pageAddr = 0L;
+                                    readUnlock(pageId, page, pa);
+
+                                    long p = page;
+                                    page = 0L;
+                                    releasePage(pageId, p);
+
+                                    pageId = nextPageId;
+                                    page = nextPage;
+                                    pageAddr = nextPageAddr;
+
+                                    nextPageId = 0L;
+                                    nextPageAddr = 0L;
+                                    nextPage = 0L;
+                                }
+                                finally {
+                                    if (nextPageAddr != 0L)
+                                        readUnlock(nextPageId, nextPage, 
nextPageAddr);
+                                }
+                            }
+                            finally {
+                                if (nextPage != 0L)
+                                    releasePage(nextPageId, nextPage);
+                            }
+                        }
+                    }
+                    finally {
+                        if (pageAddr != 0L)
+                            readUnlock(pageId, page, pageAddr);
+                    }
+                }
+                finally {
+                    if (page != 0L)
+                        releasePage(pageId, page);
+                }
+            }
+        }
+
+        /**
+         * Removes the filtered rows from the page.
+         * Indicates the necessity to remove some row from "root".
+         *
+         * @param pageId Page id to start with.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void doPurge(long pageId) throws IgniteCheckedException {
+            boolean dirty = false;
+
+            long page = acquirePage(pageId);
+            try {
+                long pageAddr = writeLock(pageId, page);
+
+                // If concurrent merge occurred we have to reinitialize cursor 
from the last returned row.
+                if (pageAddr == 0L) {
+                    assert lastRow != null;
+
+                    curPageId = 0L;
+
+                    return; // Retry.
+                }
+
+                try {
+                    BPlusIO<L> io = io(pageAddr);
+
+                    assert io.isLeaf();
+
+                    int cnt = io.getCount(pageAddr);
+
+                    assert cnt > 0 : cnt; // Empty leaf is nonsensical.
+
+                    dirty = modifyPage(pageId, page, null, io, pageAddr, cnt);
+                }
+                finally {
+                    writeUnlock(pageId, page, pageAddr, null, dirty);
+                }
+            }
+            finally {
+                releasePage(pageId, page);
+            }
+        }
+
+        /**
+         * Does actual modifications on the page.
+         *
+         * @param io Page IO.
+         * @param pageAddr Page address.
+         * @param cnt Item count.
+         * @return {@code true} if page was modified.
+         */
+        private boolean modifyPage(long pageId, long page, Boolean walPlc, 
BPlusIO<L> io, long pageAddr, int cnt)
 
 Review comment:
   Why do we need `walPlc` parameter? 
   We call this method in a single place and pass `null` constant in it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to