tkalkirill commented on code in PR #791:
URL: https://github.com/apache/ignite-3/pull/791#discussion_r860952204


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1598,4 +1727,327 @@ public interface PageChangeTracker {
          */
         void apply(long page, FullPageId fullPageId, PageMemoryEx 
pageMemoryEx);
     }
+
+    /**
+     * Heuristic method which allows a thread to check if it is safe to start 
memory structure modifications in regard with checkpointing.
+     * May return false-negative result during or after partition eviction.
+     *
+     * @return {@code False} if there are too many dirty pages and a thread 
should wait for a checkpoint to begin.
+     */
+    public boolean safeToUpdate() {
+        if (segments != null) {
+            return safeToUpdate.get();
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns number of pages used in checkpoint buffer.
+     */
+    public int usedCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.size();
+    }
+
+    /**
+     * Returns max number of pages in checkpoint buffer.
+     */
+    public int maxCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.pages();
+    }
+
+    private void releaseCheckpointBufferPage(long tmpBufPtr) {
+        checkpointPool.releaseFreePage(tmpBufPtr);
+    }
+
+    /**
+     * Returns {@code true} if it was added to the checkpoint list.
+     *
+     * @param pageId Page ID to check if it was added to the checkpoint list.
+     */
+    boolean isInCheckpoint(FullPageId pageId) {
+        Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        return pages0 != null && pages0.contains(pageId);
+    }
+
+    /**
+     * Returns {@code true} if remove successfully.
+     *
+     * @param fullPageId Page ID to clear.
+     */
+    boolean clearCheckpoint(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        assert pages0 != null;
+
+        return pages0.markAsSaved(fullPageId);
+    }
+
+    private void copyPageForCheckpoint(
+            long absPtr,
+            FullPageId fullId,
+            ByteBuffer buf,
+            Integer tag,
+            boolean pageSingleAcquire,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert absPtr != 0;
+        assert isAcquired(absPtr) || !isInCheckpoint(fullId);
+
+        // Exception protection flag.
+        // No need to write if exception occurred.
+        boolean canWrite = false;
+
+        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
+
+        if (!locked) {
+            // We release the page only once here because this page will be 
copied sometime later and
+            // will be released properly then.
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            buf.clear();
+
+            if (isInCheckpoint(fullId)) {
+                pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+            }
+
+            return;
+        }
+
+        if (!clearCheckpoint(fullId)) {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            return;
+        }
+
+        try {
+            long tmpRelPtr = tempBufferPointer(absPtr);
+
+            if (tmpRelPtr != INVALID_REL_PTR) {
+                tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+                long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+                copyInBuffer(tmpAbsPtr, buf);
+
+                fullPageId(tmpAbsPtr, NULL_PAGE);
+
+                setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                releaseCheckpointBufferPage(tmpRelPtr);
+
+                // Need release again because we pin page when resolve abs 
pointer,
+                // and page did not have tmp buffer page.
+                if (!pageSingleAcquire) {
+                    PageHeader.releasePage(absPtr);
+                }
+            } else {
+                copyInBuffer(absPtr, buf);
+
+                dirty(absPtr, false);
+            }
+
+            assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + 
hexLong(fullId.pageId());
+            assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId 
= " + hexLong(fullId.pageId());
+
+            canWrite = true;
+        } finally {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (canWrite) {
+                buf.rewind();
+
+                pageStoreWriter.writePage(fullId, buf, tag);
+
+                buf.rewind();
+            }
+
+            // We pinned the page either when allocated the temp buffer, or 
when resolved abs pointer.
+            // Must release the page only after write unlock.
+            PageHeader.releasePage(absPtr);
+        }
+    }
+
+    /**
+     * Prepare page for write during checkpoint. {@link PageStoreWriter} will 
be called when the page will be ready to write.
+     *
+     * @param fullId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link
+     * #beginCheckpoint(CompletableFuture)} method call.
+     * @param buf Temporary buffer to write changes into.
+     * @param pageStoreWriter Checkpoint page write context.
+     * @throws IgniteInternalCheckedException If failed to obtain page data.
+     */
+    public void checkpointWritePage(
+            FullPageId fullId,
+            ByteBuffer buf,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert buf.remaining() == pageSize();
+
+        Segment seg = segment(fullId.groupId(), fullId.pageId());
+
+        long absPtr = 0;
+
+        long relPtr;
+
+        int tag;
+
+        boolean pageSingleAcquire = false;
+
+        seg.readLock().lock();
+
+        try {
+            if (!isInCheckpoint(fullId)) {
+                return;
+            }
+
+            relPtr = resolveRelativePointer(seg, fullId, tag = 
generationTag(seg, fullId));
+
+            // Page may have been cleared during eviction. We have nothing to 
do in this case.
+            if (relPtr == INVALID_REL_PTR) {
+                return;
+            }
+
+            if (relPtr != OUTDATED_REL_PTR) {
+                absPtr = seg.absolute(relPtr);
+
+                // Pin the page until page will not be copied. This helpful to 
prevent page replacement of this page.
+                if (tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+                    PageHeader.acquirePage(absPtr);
+                } else {
+                    pageSingleAcquire = true;
+                }
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        if (relPtr == OUTDATED_REL_PTR) {
+            seg.writeLock().lock();
+
+            try {
+                // Double-check.
+                relPtr = resolveRelativePointer(seg, fullId, 
generationTag(seg, fullId));
+
+                if (relPtr == INVALID_REL_PTR) {
+                    return;
+                }
+
+                if (relPtr == OUTDATED_REL_PTR) {
+                    relPtr = seg.refreshOutdatedPage(
+                            fullId.groupId(),
+                            fullId.effectivePageId(),
+                            true
+                    );
+
+                    seg.pageReplacementPolicy.onRemove(relPtr);
+
+                    seg.pool.releaseFreePage(relPtr);
+                }
+
+                return;
+            } finally {
+                seg.writeLock().unlock();
+            }
+        }
+
+        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, 
pageStoreWriter);
+    }
+
+    /**
+     * Get arbitrary page from cp buffer.
+     */
+    public FullPageId pullPageFromCpBuffer() {
+        long idx = getLong(checkpointPool.lastAllocatedIdxPtr);
+
+        long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+        while (--lastIdx > 1) {
+            assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+            long relative = checkpointPool.relative(lastIdx);
+
+            long freePageAbsPtr = checkpointPool.absolute(relative);
+
+            FullPageId fullPageId = fullPageId(freePageAbsPtr);
+
+            if (fullPageId.pageId() == NULL_PAGE.pageId() || 
fullPageId.groupId() == NULL_PAGE.groupId()) {
+                continue;
+            }
+
+            if (!isInCheckpoint(fullPageId)) {
+                continue;
+            }
+
+            return fullPageId;
+        }
+
+        return NULL_PAGE;
+    }
+
+    /**
+     * Gets a collection of dirty page IDs since the last checkpoint. If a 
dirty page is being written after the checkpointing operation
+     * begun, the modifications will be written to a temporary buffer which 
will be flushed to the main memory after the checkpointing
+     * finished. This method must be called when no concurrent operations on 
pages are performed.
+     *
+     * @param allowToReplace The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @return Collection view of dirty page IDs.
+     * @throws IgniteInternalException If checkpoint has been already started 
and was not finished.
+     */
+    public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> 
allowToReplace) throws IgniteInternalException {
+        if (segments == null) {
+            return List.of();
+        }
+
+        Collection<FullPageId>[] collections = new Collection[segments.length];
+
+        for (int i = 0; i < segments.length; i++) {
+            Segment seg = segments[i];
+
+            if (seg.checkpointPages != null) {
+                throw new IgniteInternalException("Failed to begin checkpoint 
(it is already in progress).");
+            }
+
+            Collection<FullPageId> dirtyPages = seg.dirtyPages;
+            collections[i] = dirtyPages;
+
+            seg.checkpointPages = new CheckpointPages(dirtyPages, 
allowToReplace);
+
+            seg.resetDirtyPages();
+        }
+
+        safeToUpdate.set(true);
+
+        return CollectionUtils.union(collections);

Review Comment:
   This is an analogue of 
**org.apache.ignite.internal.util.GridMultiCollectionWrapper** from 2.0, only 
without the ability to remove elements, but we will not need this (about remove 
items), we will also need only iteration and size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to