ibessonov commented on code in PR #4437:
URL: https://github.com/apache/ignite-3/pull/4437#discussion_r1772756057


##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.tree.persistence;
+
+import static 
org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
+import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.LongStream;
+import org.apache.ignite.internal.components.LogSyncer;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import 
org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
+import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Integration tests for testing page replacement. */
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+public class ItPageReplacementTest extends BaseIgniteAbstractTest {
+    private static final String NODE_NAME = "test";
+
+    private static final int GROUP_ID = 1;
+
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_COUNT = 1;
+
+    private static final int PAGE_SIZE = 512;
+
+    private static final int PAGE_COUNT = 1024;
+
+    private static final int MAX_MEMORY_SIZE = PAGE_COUNT * PAGE_SIZE;
+
+    private static final int CPUS = Math.min(4, 
Runtime.getRuntime().availableProcessors());
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private PageMemoryCheckpointConfiguration checkpointConfig;
+
+    @InjectConfiguration(
+            polymorphicExtensions = 
PersistentPageMemoryProfileConfigurationSchema.class,
+            value = "mock = {"
+                    + "engine=aipersist, "
+                    + "size=" + MAX_MEMORY_SIZE
+                    + "}"
+    )
+    private StorageProfileConfiguration storageProfileCfg;
+
+    private FilePageStoreManager filePageStoreManager;
+
+    private PartitionMetaManager partitionMetaManager;
+
+    private CheckpointManager checkpointManager;
+
+    private PersistentPageMemory pageMemory;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        FailureManager failureManager = mock(FailureManager.class);
+
+        var ioRegistry = new TestPageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        filePageStoreManager = new FilePageStoreManager(
+                NODE_NAME,
+                workDir,
+                new RandomAccessFileIoFactory(),
+                PAGE_SIZE,
+                failureManager
+        );
+
+        partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE, 
FakePartitionMeta.FACTORY);
+
+        var dataRegionList = new ArrayList<DataRegion<PersistentPageMemory>>();
+
+        checkpointManager = new CheckpointManager(
+                NODE_NAME,
+                null,
+                null,
+                failureManager,
+                checkpointConfig,
+                filePageStoreManager,
+                partitionMetaManager,
+                dataRegionList,
+                ioRegistry,
+                mock(LogSyncer.class),
+                PAGE_SIZE
+        );
+
+        pageMemory = new PersistentPageMemory(
+                (PersistentPageMemoryProfileConfiguration) 
fixConfiguration(storageProfileCfg),
+                ioRegistry,
+                LongStream.range(0, CPUS).map(i -> MAX_MEMORY_SIZE / 
CPUS).toArray(),
+                10 * MiB,
+                filePageStoreManager,
+                null,
+                (pageMemory0, fullPageId, buf) -> 
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf, 
true),
+                checkpointManager.checkpointTimeoutLock(),
+                PAGE_SIZE
+        );
+
+        dataRegionList.add(() -> pageMemory);
+
+        filePageStoreManager.start();
+        checkpointManager.start();
+        pageMemory.start();
+
+        createPartitionFilePageStoresIfMissing();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                checkpointManager == null ? null : checkpointManager::stop,
+                pageMemory == null ? null : () -> pageMemory.stop(true),
+                filePageStoreManager == null ? null : 
filePageStoreManager::stop
+        );
+    }
+
+    @Test
+    void testPageReplacement() throws Exception {

Review Comment:
   Please run this test several thousand times, if possible. I have a feeling 
that it might potentially be unstable



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java:
##########
@@ -18,22 +18,20 @@
 package org.apache.ignite.internal.pagememory.persistence;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 
-/**
- * Interface for write page to {@link PageStore}.
- */
+/** Interface for IO writing dirty pages in {@link PageStore} on checkpoint. */
 public interface PageStoreWriter {
     /**
-     * Callback for write page. {@link PersistentPageMemory} will copy page 
content to buffer before call.
+     * Writes page to {@link PageStore}. {@link PersistentPageMemory} will 
copy page content to buffer before call.
      *
      * @param fullPageId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link
-     * PersistentPageMemory#beginCheckpoint(CompletableFuture)} method call.
+     * PersistentPageMemory#beginCheckpoint} method call.

Review Comment:
   Was this necessary? :)



##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.tree.persistence;
+
+import static 
org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
+import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.LongStream;
+import org.apache.ignite.internal.components.LogSyncer;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import 
org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
+import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Integration tests for testing page replacement. */
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+public class ItPageReplacementTest extends BaseIgniteAbstractTest {
+    private static final String NODE_NAME = "test";
+
+    private static final int GROUP_ID = 1;
+
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_COUNT = 1;
+
+    private static final int PAGE_SIZE = 512;
+
+    private static final int PAGE_COUNT = 1024;
+
+    private static final int MAX_MEMORY_SIZE = PAGE_COUNT * PAGE_SIZE;
+
+    private static final int CPUS = Math.min(4, 
Runtime.getRuntime().availableProcessors());
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private PageMemoryCheckpointConfiguration checkpointConfig;
+
+    @InjectConfiguration(
+            polymorphicExtensions = 
PersistentPageMemoryProfileConfigurationSchema.class,
+            value = "mock = {"
+                    + "engine=aipersist, "
+                    + "size=" + MAX_MEMORY_SIZE
+                    + "}"
+    )
+    private StorageProfileConfiguration storageProfileCfg;
+
+    private FilePageStoreManager filePageStoreManager;
+
+    private PartitionMetaManager partitionMetaManager;
+
+    private CheckpointManager checkpointManager;
+
+    private PersistentPageMemory pageMemory;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        FailureManager failureManager = mock(FailureManager.class);
+
+        var ioRegistry = new TestPageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        filePageStoreManager = new FilePageStoreManager(
+                NODE_NAME,
+                workDir,
+                new RandomAccessFileIoFactory(),
+                PAGE_SIZE,
+                failureManager
+        );
+
+        partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE, 
FakePartitionMeta.FACTORY);
+
+        var dataRegionList = new ArrayList<DataRegion<PersistentPageMemory>>();
+
+        checkpointManager = new CheckpointManager(
+                NODE_NAME,
+                null,
+                null,
+                failureManager,
+                checkpointConfig,
+                filePageStoreManager,
+                partitionMetaManager,
+                dataRegionList,
+                ioRegistry,
+                mock(LogSyncer.class),
+                PAGE_SIZE
+        );
+
+        pageMemory = new PersistentPageMemory(
+                (PersistentPageMemoryProfileConfiguration) 
fixConfiguration(storageProfileCfg),
+                ioRegistry,
+                LongStream.range(0, CPUS).map(i -> MAX_MEMORY_SIZE / 
CPUS).toArray(),
+                10 * MiB,
+                filePageStoreManager,
+                null,
+                (pageMemory0, fullPageId, buf) -> 
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf, 
true),
+                checkpointManager.checkpointTimeoutLock(),
+                PAGE_SIZE
+        );
+
+        dataRegionList.add(() -> pageMemory);
+
+        filePageStoreManager.start();
+        checkpointManager.start();
+        pageMemory.start();
+
+        createPartitionFilePageStoresIfMissing();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                checkpointManager == null ? null : checkpointManager::stop,
+                pageMemory == null ? null : () -> pageMemory.stop(true),
+                filePageStoreManager == null ? null : 
filePageStoreManager::stop
+        );
+    }
+
+    @Test
+    void testPageReplacement() throws Exception {
+        int pageCountToCreate = 128 * PAGE_COUNT;
+        int createPageThreadCount = 4;
+
+        CompletableFuture<Long> createPagesFuture = runMultiThreadedAsync(
+                () -> {
+                    // The values are taken empirically, if make the batch 
size more than 10, then sometimes it falls with IOOM.
+                    createAndFillTestSimpleValuePages(pageCountToCreate, 10);
+
+                    return null;
+                },
+                createPageThreadCount,
+                "allocate-page-thread"
+        );
+
+        assertThat(createPagesFuture, willCompleteSuccessfully());
+
+        assertTrue(pageMemory.isPageReplacementOccurs());

Review Comment:
   Writing "is ... occurs" is grammatically incorrect, there's "two s" here, 
should only be one. Can we call it `pageReplacementOccurred`, like it's done in 
javadoc? Would look much better



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>
+         *     <ul>

Review Comment:
   ```suggestion
            *     <ul>
            *         <li>Page belongs to current checkpoint.</li>
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>
+         *     <ul>
+         *         <li>If the dirty page sorting phase is complete, otherwise 
we wait for it. This is necessary so that we can safely
+         *         create partition delta files in which the dirty page order 
must be preserved.</li>
+         *         <li>If the checkpoint dirty page writer has not started 
writing the page or has already written it.</li>
+         *         <li>If the delta file fsync phase is not ready to start or 
is not in progress. This is necessary so that the data
+         *         remains consistent after the fsync phase is complete. If 
the phase has not yet begun, we will block it until we complete
+         *         the replacement.</li>

Review Comment:
   > or is not in progress.
   
   How is it possible to already start fsync phase _before_ all pages have been 
written to delta file? Seems like a mistake



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1511,25 +1526,28 @@ public boolean tryToRemovePage(FullPageId fullPageId, 
long absPtr) throws Ignite
 
             if (isDirty(absPtr)) {
                 CheckpointPages checkpointPages = this.checkpointPages;
-                // Can evict a dirty page only if should be written by a 
checkpoint.
-                // These pages does not have tmp buffer.
-                if (checkpointPages != null && 
checkpointPages.allowToSave(fullPageId)) {
-                    WriteDirtyPage writeDirtyPage = 
delayedPageReplacementTracker.delayedPageWrite();
-
-                    writeDirtyPage.write(PersistentPageMemory.this, 
fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()));
-
-                    setDirty(fullPageId, absPtr, false, true);
+                // Can replace a dirty page only if should be written by a 
checkpoint.

Review Comment:
   ```suggestion
                   // Can replace a dirty page only if it should be written by 
a checkpoint.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for synchronizing page replacement and the beginning of the 
fsync phase at a checkpoint.
+ *
+ * <p>For data consistency, it is important for us that page replacement 
occurs strictly before the beginning of the fsync phase.</p>

Review Comment:
   I may think that I'm picking, but I'd rather say that "fsync should occur 
after page replacement", because that's what we'll see in the implementation - 
active waiting for replacement to be finished



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1974,16 +2013,14 @@ public void checkpointWritePage(
         seg.readLock().lock();
 
         try {
-            if (!isInCheckpoint(fullId)) {
+            if (!removeFromCheckpoint(seg, fullId)) {
                 return;
             }
 
             relPtr = resolveRelativePointer(seg, fullId, tag = 
generationTag(seg, fullId));
 
             // Page may have been cleared during eviction. We have nothing to 
do in this case.
-            if (relPtr == INVALID_REL_PTR) {
-                return;
-            }
+            assert relPtr != INVALID_REL_PTR : "Page was removed by page 
replacing, which should not have happened: " + fullId;

Review Comment:
   I believe that this assertion will turn back into a condition once you 
revert unnecessary changes



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for synchronizing page replacement and the beginning of the 
fsync phase at a checkpoint.
+ *
+ * <p>For data consistency, it is important for us that page replacement 
occurs strictly before the beginning of the fsync phase.</p>
+ *
+ * <p>Usage:</p>
+ * <ul>
+ *     <li>{@link #tryBlock(FullPageId)} - before you need to perform a page 
replacement.</li>
+ *     <li>{@link #unblock(FullPageId, Throwable)} - after the page 
replacement has finished and written to disk. The method must be
+ *     invoked even if any error occurred, so as not to hang a checkpoint.</li>
+ *     <li>{@link #stopBlocking()} - must be invoked before the start of the 
fsync phase on the checkpoint and wait for the future to
+ *     complete in order to safely perform the phase.</li>
+ * </ul>
+ *
+ * <p>Thread safe.</p>
+ */
+class CheckpointPageReplacement {
+    /** IDs of pages for which page replacement is in progress. */
+    private final Set<FullPageId> pageIds = ConcurrentHashMap.newKeySet();
+
+    private final CompletableFuture<Void> stopBlockingFuture = new 
CompletableFuture<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Tries to block the start of the fsync phase at a checkpoint before 
replacing the page.
+     *
+     * <p>It is expected that if the method returns {@code true}, it will not 
be invoked a second time with the same page ID.</p>
+     *
+     * @param pageId Page ID for which page replacement is expected to begin.
+     * @return {@code True} if the blocking was successful, {@code false} if 
the fsync phase is about to begin.
+     * @see #unblock(FullPageId, Throwable)
+     * @see #stopBlocking()
+     */
+    boolean tryBlock(FullPageId pageId) {
+        if (!busyLock.enterBusy()) {
+            return false;
+        }
+
+        try {
+            boolean added = pageIds.add(pageId);
+
+            assert added : "Page is already in the process of being replaced: 
" + pageId;
+
+            return true;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Unblocks the start of the fsync phase at a checkpoint after the page 
replacement is completed.
+     *
+     * <p>It is expected that the method will be invoked once and after the 
{@link #tryBlock(FullPageId)} returns {@code true} for same
+     * page ID.</p>
+     *
+     * <p>The fsync phase will only be started after page replacement has been 
completed for all pages for which
+     * {@link #tryBlock(FullPageId)} was invoked before {@link 
#stopBlocking()} was invoked, or no page replacement occurred at all.</p>
+     *
+     * <p>If any error occurred during page replacement, then the future from 
{@link #stopBlocking()} will be completed with the first
+     * error.</p>
+     *
+     * @param pageId Page ID for which the page replacement has ended.
+     * @param error Error on page replacement, {@code null} if missing.
+     * @see #tryBlock(FullPageId)
+     * @see #stopBlocking()
+     */
+    void unblock(FullPageId pageId, @Nullable Throwable error) {
+        boolean removed = pageIds.remove(pageId);
+
+        assert removed : "Replacement for the page either did not start or 
ended: " + pageId;
+
+        if (error != null) {
+            stopBlockingFuture.completeExceptionally(error);
+
+            return;
+        }
+
+        if (!busyLock.enterBusy()) {
+            if (pageIds.isEmpty()) {
+                stopBlockingFuture.complete(null);
+            }
+        } else {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops new blocks before the fsync phase starts at a checkpoint.
+     *
+     * @return Future that will be completed successfully if all blocks are 
completed before the current method is invoked, either if there
+     *      were none, or with an error from the first unlock.
+     * @see #tryBlock(FullPageId)
+     * @see #unblock(FullPageId, Throwable)
+     */
+    CompletableFuture<Void> stopBlocking() {
+        if (stopGuard.compareAndSet(false, true)) {

Review Comment:
   I wonder if we actually try stopping it twice. Why do we need this 
protection?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for synchronizing page replacement and the beginning of the 
fsync phase at a checkpoint.
+ *
+ * <p>For data consistency, it is important for us that page replacement 
occurs strictly before the beginning of the fsync phase.</p>
+ *
+ * <p>Usage:</p>
+ * <ul>
+ *     <li>{@link #tryBlock(FullPageId)} - before you need to perform a page 
replacement.</li>
+ *     <li>{@link #unblock(FullPageId, Throwable)} - after the page 
replacement has finished and written to disk. The method must be
+ *     invoked even if any error occurred, so as not to hang a checkpoint.</li>
+ *     <li>{@link #stopBlocking()} - must be invoked before the start of the 
fsync phase on the checkpoint and wait for the future to
+ *     complete in order to safely perform the phase.</li>
+ * </ul>
+ *
+ * <p>Thread safe.</p>
+ */
+class CheckpointPageReplacement {
+    /** IDs of pages for which page replacement is in progress. */
+    private final Set<FullPageId> pageIds = ConcurrentHashMap.newKeySet();
+
+    private final CompletableFuture<Void> stopBlockingFuture = new 
CompletableFuture<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Tries to block the start of the fsync phase at a checkpoint before 
replacing the page.
+     *
+     * <p>It is expected that if the method returns {@code true}, it will not 
be invoked a second time with the same page ID.</p>
+     *
+     * @param pageId Page ID for which page replacement is expected to begin.
+     * @return {@code True} if the blocking was successful, {@code false} if 
the fsync phase is about to begin.
+     * @see #unblock(FullPageId, Throwable)
+     * @see #stopBlocking()
+     */
+    boolean tryBlock(FullPageId pageId) {
+        if (!busyLock.enterBusy()) {
+            return false;
+        }

Review Comment:
   I don't think it's necessary, it's impossible to find a dirty page after all 
dirty pages have been checkpointed/replaced. Or is it?
   
   Returning `true`/`false` also seems redundant. What do you think?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java:
##########
@@ -17,75 +17,174 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 
 import java.util.Set;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.storage.StorageException;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * View of pages which should be stored during current checkpoint.
+ * Class contains dirty pages of the segment that will need to be written at a 
checkpoint or page replacement. It also contains helper
+ * methods before writing pages.
+ *
+ * <p>For correct parallel operation of the checkpoint writer and page 
replacement, external synchronization must be used, for example,
+ * segment read and write locks.</p>
  */
 public class CheckpointPages {
-    private final Set<FullPageId> segmentPages;
+    private final Set<FullPageId> pageIds;
 
-    private final CompletableFuture<?> allowToReplace;
+    private final CheckpointProgressImpl checkpointProgress;
 
     /**
      * Constructor.
      *
-     * @param pages Pages which would be stored to disk in current checkpoint, 
does not copy the set.
-     * @param replaceFuture The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @param pageIds Dirty page IDs in the segment that should be written at 
a checkpoint or page replacement.
+     * @param checkpointProgress Progress of the current checkpoint at which 
the object was created.
      */
-    public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?> 
replaceFuture) {
-        segmentPages = pages;
-        allowToReplace = replaceFuture;
+    public CheckpointPages(Set<FullPageId> pageIds, CheckpointProgress 
checkpointProgress) {
+        this.pageIds = pageIds;
+        this.checkpointProgress = (CheckpointProgressImpl) checkpointProgress;
     }
 
     /**
-     * Returns {@code true} If fullPageId is allowable to store to disk.
+     * Checks if the page is available for replacement, without removing the 
page.
+     *
+     * <p>Page is available for replacement if the following conditions are 
met:</p>
+     * <ul>
+     *     <li>If the dirty page sorting phase is complete, otherwise we wait 
for it. This is necessary so that we can safely create
+     *     partition delta files in which the dirty page order must be 
preserved.</li>
+     *     <li>If the checkpoint dirty page writer has not started writing the 
page or has already written it.</li>
+     *     <li>If the delta file fsync phase is not ready to start or is not 
in progress. This is necessary so that the data remains
+     *     consistent after the fsync phase is complete. If the phase has not 
yet begun, we will block it until we complete the
+     *     replacement.</li>
+     * </ul>
      *
-     * @param fullPageId Page id for checking.
-     * @throws IgniteInternalCheckedException If the waiting sign which allows 
replacing pages from a checkpoint by page replacer fails.
+     * <p>It is expected that if the method returns {@code true}, it will not 
be invoked again for the same page ID, then
+     * {@link #finishReplace} will be invoked later.</p>
+     *
+     * @param pageId Page ID of the replacement candidate.
+     * @return {@code True} if the page is available for replacement, {@code 
false} if not.
+     * @throws StorageException If any error occurred while waiting for the 
dirty page sorting phase to complete at a checkpoint.
+     * @see #finishReplace(FullPageId, Throwable)
      */
-    public boolean allowToSave(FullPageId fullPageId) throws 
IgniteInternalCheckedException {
+    public boolean allowToReplace(FullPageId pageId) throws StorageException {

Review Comment:
   Why don't we combine this check and `remove` into a single method? They 
should be called together anyway



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java:
##########
@@ -17,75 +17,174 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 
 import java.util.Set;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.storage.StorageException;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * View of pages which should be stored during current checkpoint.
+ * Class contains dirty pages of the segment that will need to be written at a 
checkpoint or page replacement. It also contains helper
+ * methods before writing pages.
+ *
+ * <p>For correct parallel operation of the checkpoint writer and page 
replacement, external synchronization must be used, for example,
+ * segment read and write locks.</p>
  */
 public class CheckpointPages {
-    private final Set<FullPageId> segmentPages;
+    private final Set<FullPageId> pageIds;
 
-    private final CompletableFuture<?> allowToReplace;
+    private final CheckpointProgressImpl checkpointProgress;
 
     /**
      * Constructor.
      *
-     * @param pages Pages which would be stored to disk in current checkpoint, 
does not copy the set.
-     * @param replaceFuture The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @param pageIds Dirty page IDs in the segment that should be written at 
a checkpoint or page replacement.
+     * @param checkpointProgress Progress of the current checkpoint at which 
the object was created.
      */
-    public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?> 
replaceFuture) {
-        segmentPages = pages;
-        allowToReplace = replaceFuture;
+    public CheckpointPages(Set<FullPageId> pageIds, CheckpointProgress 
checkpointProgress) {
+        this.pageIds = pageIds;
+        this.checkpointProgress = (CheckpointProgressImpl) checkpointProgress;
     }
 
     /**
-     * Returns {@code true} If fullPageId is allowable to store to disk.
+     * Checks if the page is available for replacement, without removing the 
page.
+     *
+     * <p>Page is available for replacement if the following conditions are 
met:</p>
+     * <ul>
+     *     <li>If the dirty page sorting phase is complete, otherwise we wait 
for it. This is necessary so that we can safely create
+     *     partition delta files in which the dirty page order must be 
preserved.</li>
+     *     <li>If the checkpoint dirty page writer has not started writing the 
page or has already written it.</li>
+     *     <li>If the delta file fsync phase is not ready to start or is not 
in progress. This is necessary so that the data remains
+     *     consistent after the fsync phase is complete. If the phase has not 
yet begun, we will block it until we complete the
+     *     replacement.</li>
+     * </ul>
      *
-     * @param fullPageId Page id for checking.
-     * @throws IgniteInternalCheckedException If the waiting sign which allows 
replacing pages from a checkpoint by page replacer fails.
+     * <p>It is expected that if the method returns {@code true}, it will not 
be invoked again for the same page ID, then
+     * {@link #finishReplace} will be invoked later.</p>
+     *
+     * @param pageId Page ID of the replacement candidate.
+     * @return {@code True} if the page is available for replacement, {@code 
false} if not.
+     * @throws StorageException If any error occurred while waiting for the 
dirty page sorting phase to complete at a checkpoint.
+     * @see #finishReplace(FullPageId, Throwable)
      */
-    public boolean allowToSave(FullPageId fullPageId) throws 
IgniteInternalCheckedException {
+    public boolean allowToReplace(FullPageId pageId) throws StorageException {

Review Comment:
   Please rename this method. The word `allow` implies no side effects, it's 
just a check. But here we add `pageId` to some internal collection



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -587,10 +587,9 @@ public long allocatePageNoReuse(int grpId, int partId, 
byte flags) throws Ignite
             throw e;
         } finally {
             seg.writeLock().unlock();
-        }
 
-        // Finish replacement only when an exception wasn't thrown otherwise 
it possible to corrupt B+Tree.

Review Comment:
   Why did you remove the comment, is it not relevant anymore?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>

Review Comment:
   ```suggestion
            *     <li>Page is dirty, there is a checkpoint in the process and 
the following sub-conditions are met:</li>
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>
+         *     <ul>
+         *         <li>If the dirty page sorting phase is complete, otherwise 
we wait for it. This is necessary so that we can safely
+         *         create partition delta files in which the dirty page order 
must be preserved.</li>
+         *         <li>If the checkpoint dirty page writer has not started 
writing the page or has already written it.</li>
+         *         <li>If the delta file fsync phase is not ready to start or 
is not in progress. This is necessary so that the data
+         *         remains consistent after the fsync phase is complete. If 
the phase has not yet begun, we will block it until we complete
+         *         the replacement.</li>
+         *     </ul>
+         * </ul>
+         *
+         * <p>It is expected that if the method returns {@code true}, it will 
not be invoked again for the same page ID.</p>
          *
-         * @param fullPageId Candidate page full ID.
-         * @param absPtr Absolute pointer of the page to evict.
-         * @return {@code True} if it is ok to replace this page, {@code 
false} if another page should be selected.
-         * @throws IgniteInternalCheckedException If failed to write page to 
the underlying store during eviction.
+         * @param pageId Candidate page ID.
+         * @param absPtr Absolute pointer to the candidate page.
+         * @return {@code True} if the page replacement was successful, 
otherwise need to try another one.
+         * @throws StorageException If any error occurred while waiting for 
the dirty page sorting phase to complete at a checkpoint.
          */
-        public boolean tryToRemovePage(FullPageId fullPageId, long absPtr) 
throws IgniteInternalCheckedException {
+        public boolean tryToReplacePage(FullPageId pageId, long absPtr) {

Review Comment:
   What was wrong with the old name of the method? Technically speaking, it 
doesn't replace page with anything, it actually just removes it, right? Nothing 
is written on top of old data here



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java:
##########
@@ -18,22 +18,20 @@
 package org.apache.ignite.internal.pagememory.persistence;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 
-/**
- * Interface for write page to {@link PageStore}.
- */
+/** Interface for IO writing dirty pages in {@link PageStore} on checkpoint. */

Review Comment:
   ```suggestion
   /** Interface for writing dirty pages to {@link PageStore} on checkpoint. */
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>
+         *     <ul>
+         *         <li>If the dirty page sorting phase is complete, otherwise 
we wait for it. This is necessary so that we can safely
+         *         create partition delta files in which the dirty page order 
must be preserved.</li>
+         *         <li>If the checkpoint dirty page writer has not started 
writing the page or has already written it.</li>
+         *         <li>If the delta file fsync phase is not ready to start or 
is not in progress. This is necessary so that the data
+         *         remains consistent after the fsync phase is complete. If 
the phase has not yet begun, we will block it until we complete
+         *         the replacement.</li>
+         *     </ul>
+         * </ul>
+         *
+         * <p>It is expected that if the method returns {@code true}, it will 
not be invoked again for the same page ID.</p>
          *
-         * @param fullPageId Candidate page full ID.
-         * @param absPtr Absolute pointer of the page to evict.
-         * @return {@code True} if it is ok to replace this page, {@code 
false} if another page should be selected.
-         * @throws IgniteInternalCheckedException If failed to write page to 
the underlying store during eviction.
+         * @param pageId Candidate page ID.
+         * @param absPtr Absolute pointer to the candidate page.
+         * @return {@code True} if the page replacement was successful, 
otherwise need to try another one.
+         * @throws StorageException If any error occurred while waiting for 
the dirty page sorting phase to complete at a checkpoint.
          */
-        public boolean tryToRemovePage(FullPageId fullPageId, long absPtr) 
throws IgniteInternalCheckedException {
+        public boolean tryToReplacePage(FullPageId pageId, long absPtr) {

Review Comment:
   Why did you rename this parameter? It was correct, I don't get it.
   Please don't make the code more complicated for no reason. `pageId` is 
usually a `long` value, `FullPageId` object can be called `fullPageId`, that's 
totally fine.
   
   Also, even if it was a good rename, I'm still not a fan of mixing in some 
random refactorings into a 1000+ lines PR.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1865,32 +1908,28 @@ private void copyPageForCheckpoint(
         // No need to write if exception occurred.
         boolean canWrite = false;
 
-        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
-
-        if (!locked) {
-            // We release the page only once here because this page will be 
copied sometime later and
-            // will be released properly then.
+        if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS)) {
+            // We release the page only once here because this page will be 
copied sometime later and will be released properly then.

Review Comment:
   Why did you changed this completely normal code here?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1511,25 +1526,28 @@ public boolean tryToRemovePage(FullPageId fullPageId, 
long absPtr) throws Ignite
 
             if (isDirty(absPtr)) {
                 CheckpointPages checkpointPages = this.checkpointPages;
-                // Can evict a dirty page only if should be written by a 
checkpoint.
-                // These pages does not have tmp buffer.
-                if (checkpointPages != null && 
checkpointPages.allowToSave(fullPageId)) {
-                    WriteDirtyPage writeDirtyPage = 
delayedPageReplacementTracker.delayedPageWrite();
-
-                    writeDirtyPage.write(PersistentPageMemory.this, 
fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()));
-
-                    setDirty(fullPageId, absPtr, false, true);
+                // Can replace a dirty page only if should be written by a 
checkpoint.
+                // Safe to invoke because we keep segment write lock and the 
checkpoint writer must remove pages on the segment read lock.
+                if (checkpointPages != null && 
checkpointPages.allowToReplace(pageId) && checkpointPages.remove(pageId)) {

Review Comment:
   In the future, we might want to call `allowToReplace` outside of segment 
write lock. Blocking it here seems dangerous to me. What do you think?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1865,32 +1908,28 @@ private void copyPageForCheckpoint(
         // No need to write if exception occurred.
         boolean canWrite = false;
 
-        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
-
-        if (!locked) {
-            // We release the page only once here because this page will be 
copied sometime later and
-            // will be released properly then.
+        if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS)) {
+            // We release the page only once here because this page will be 
copied sometime later and will be released properly then.
             if (!pageSingleAcquire) {
                 PageHeader.releasePage(absPtr);
             }
 
+            // Since we failed to take the write lock, we will try to do it 
again later, for this we will need to invoke the writer with a
+            // special tag TRY_AGAIN_TAG and also return the page in the dirty 
pages of the segment.
+
             buf.clear();
 
-            if (isInCheckpoint(fullId)) {
-                pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
-            }
+            pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
 
-            return;
-        }
+            segment.readLock().lock();
 
-        if (!clearCheckpoint(fullId)) {
-            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+            try {
+                addToCheckpoint(segment, fullId);
 
-            if (!pageSingleAcquire) {
-                PageHeader.releasePage(absPtr);
+                return;
+            } finally {
+                segment.readLock().unlock();
             }

Review Comment:
   I don't think this change was necessary. As far as I can tell (as you 
explained to me), you're trying to avoid a data race between checkpointer and 
replacer. But the issue is that there's no race.
   - in replacer, we only replace pages that are not acquired. We use this 
while holding a segment write lock.
   - we only acquire pages when we hold segment read lock.
   - right here, the page is obviously acquired, meaning that it cannot be 
replaced.
   
   Please make it the way it was and document why it worked.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1974,16 +2013,14 @@ public void checkpointWritePage(
         seg.readLock().lock();
 
         try {
-            if (!isInCheckpoint(fullId)) {
+            if (!removeFromCheckpoint(seg, fullId)) {

Review Comment:
   Please rollback this change as well, obviously



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1495,14 +1494,30 @@ private void resetDirtyPages() {
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
+         *
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is not dirty - just replace it.</li>
+         *     <li>page is dirty, then there should be a checkpoint in the 
process and the following sub-conditions are met:</li>
+         *     <ul>
+         *         <li>If the dirty page sorting phase is complete, otherwise 
we wait for it. This is necessary so that we can safely
+         *         create partition delta files in which the dirty page order 
must be preserved.</li>
+         *         <li>If the checkpoint dirty page writer has not started 
writing the page or has already written it.</li>
+         *         <li>If the delta file fsync phase is not ready to start or 
is not in progress. This is necessary so that the data
+         *         remains consistent after the fsync phase is complete. If 
the phase has not yet begun, we will block it until we complete
+         *         the replacement.</li>
+         *     </ul>
+         * </ul>
+         *
+         * <p>It is expected that if the method returns {@code true}, it will 
not be invoked again for the same page ID.</p>
          *
-         * @param fullPageId Candidate page full ID.
-         * @param absPtr Absolute pointer of the page to evict.
-         * @return {@code True} if it is ok to replace this page, {@code 
false} if another page should be selected.
-         * @throws IgniteInternalCheckedException If failed to write page to 
the underlying store during eviction.
+         * @param pageId Candidate page ID.
+         * @param absPtr Absolute pointer to the candidate page.
+         * @return {@code True} if the page replacement was successful, 
otherwise need to try another one.
+         * @throws StorageException If any error occurred while waiting for 
the dirty page sorting phase to complete at a checkpoint.
          */
-        public boolean tryToRemovePage(FullPageId fullPageId, long absPtr) 
throws IgniteInternalCheckedException {
+        public boolean tryToReplacePage(FullPageId pageId, long absPtr) {
             assert writeLock().isHeldByCurrentThread();
 
             if (isAcquired(absPtr)) {

Review Comment:
   Please document this part too, it's not obvious what this check does and why 
it works



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java:
##########
@@ -45,21 +55,26 @@ public class DelayedDirtyPageWrite implements 
WriteDirtyPage {
     /** Replacing pages tracker, used to register & unregister pages being 
written. */
     private final DelayedPageReplacementTracker tracker;
 
-    /** Full page id to be written on {@link #finishReplacement} or {@code 
null} if nothing to write. */
-    private @Nullable FullPageId fullPageId;
+    /** Full page id to be written on {@link #flushCopiedPageIfExists}, {@code 
null} if nothing to write. */
+    private @Nullable FullPageId pageId;

Review Comment:
   Why did you rename a totally normal field?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java:
##########
@@ -17,75 +17,174 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 
 import java.util.Set;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.storage.StorageException;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * View of pages which should be stored during current checkpoint.
+ * Class contains dirty pages of the segment that will need to be written at a 
checkpoint or page replacement. It also contains helper
+ * methods before writing pages.
+ *
+ * <p>For correct parallel operation of the checkpoint writer and page 
replacement, external synchronization must be used, for example,

Review Comment:
   > for example, segment read and write locks.
    
   Please be specific. There's more synchronization here than only segment locks



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -484,8 +484,21 @@ boolean writePages(
         // Wait and check for errors.
         CompletableFuture.allOf(futures).join();
 
-        // Must re-check shutdown flag here because threads may have skipped 
some pages.
-        // If so, we should not put finish checkpoint mark.
+        // Must re-check shutdown flag here because threads may have skipped 
some pages because of it.
+        // If so, we should not finish checkpoint.
+        if (shutdownNow.getAsBoolean()) {
+            currentCheckpointProgress.fail(new NodeStoppingException("Node is 
stopping."));
+
+            return false;
+        }
+
+        // Stops new blockings on page replacement and wait for all those 
started up to this point.
+        // Will complete normally or with the first error on one of the page 
replacements.
+        // join() is used intentionally as above.
+        currentCheckpointProgress.stopBlockingFsyncOnPageReplacement().join();

Review Comment:
   Should we add one more step to a "checkpoint metrics"? Given that there's 
another join, I'd love to know how much time it takes.



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java:
##########
@@ -17,103 +17,179 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.fullPageId;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collections;
 import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.junit.jupiter.api.Test;
 
-/**
- * For {@link CheckpointPages} testing.
- */
+/** For {@link CheckpointPages} testing. */
 public class CheckpointPagesTest {
     @Test
     void testContains() {
-        CheckpointPages checkpointPages = new CheckpointPages(
-                Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
-                nullCompletedFuture()
-        );
+        CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0, 
0), fullPageId(1, 0));
 
-        assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
-        assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
+        assertTrue(checkpointPages.contains(fullPageId(0, 0)));
+        assertTrue(checkpointPages.contains(fullPageId(1, 0)));

Review Comment:
   What even is the point of this change? As if this PR is not big enough 
already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to