ibessonov commented on code in PR #822:
URL: https://github.com/apache/ignite-3/pull/822#discussion_r885335264


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
+ *
+ * <p>Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced by other implementations if needed.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link 
#checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ *     <li>Collecting all pages from configured dataRegions which was marked 
as dirty under {@link #checkpointTimeoutLock}.</li>
+ *     <li>Marking the start of checkpoint on disk.</li>
+ *     <li>Notifying the subscribers of different checkpoint states through 
{@link CheckpointListener}.</li>
+ *     <li>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+    /** Checkpoint worker. */
+    private final Checkpointer checkpointer;
+
+    /** Main checkpoint steps. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Timeout checkpoint lock which should be used while write to memory 
happened. */
+    private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+    /** Checkpoint page writer factory. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /**
+     * Constructor.
+     *
+     * @param logger Logger producer.
+     * @param igniteInstanceName Ignite instance name.
+     * @param checkpointConfig Checkpoint configuration.
+     * @param workerListener Listener for life-cycle checkpoint worker events.
+     * @param longJvmPauseDetector Long JVM pause detector.
+     * @param filePageStoreManager File page store manager.
+     * @param dataRegions Data regions.
+     * @param storagePath Storage path.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointManager(
+            Function<Class<?>, IgniteLogger> logger,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener workerListener,
+            @Nullable LongJvmPauseDetector longJvmPauseDetector,
+            PageMemoryCheckpointConfiguration checkpointConfig,
+            FilePageStoreManager filePageStoreManager,
+            Collection<PageMemoryDataRegion> dataRegions,
+            Path storagePath,
+            // TODO: IGNITE-17017 Move to common config
+            int pageSize
+    ) throws IgniteInternalCheckedException {
+        PageMemoryCheckpointView checkpointConfigView = 
checkpointConfig.value();
+
+        ReentrantReadWriteLockWithTracking reentrantReadWriteLockWithTracking 
= checkpointConfigView.logReadLockHolders()
+                ? new 
ReentrantReadWriteLockWithTracking(logger.apply(CheckpointReadWriteLock.class), 
5_000)

Review Comment:
   What's 5000?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -214,7 +217,9 @@ public void markCheckpointEnd(Checkpoint chp) throws 
IgniteInternalCheckedExcept
             }
         }
 
-        checkpointMarkersStorage.onCheckpointEnd(chp.progress.id());
+        if (chp.hasDelta()) {

Review Comment:
   Why is this required?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
+ *
+ * <p>Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced by other implementations if needed.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link 
#checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ *     <li>Collecting all pages from configured dataRegions which was marked 
as dirty under {@link #checkpointTimeoutLock}.</li>
+ *     <li>Marking the start of checkpoint on disk.</li>
+ *     <li>Notifying the subscribers of different checkpoint states through 
{@link CheckpointListener}.</li>
+ *     <li>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+    /** Checkpoint worker. */
+    private final Checkpointer checkpointer;
+
+    /** Main checkpoint steps. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Timeout checkpoint lock which should be used while write to memory 
happened. */
+    private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+    /** Checkpoint page writer factory. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /**
+     * Constructor.
+     *
+     * @param logger Logger producer.
+     * @param igniteInstanceName Ignite instance name.
+     * @param checkpointConfig Checkpoint configuration.
+     * @param workerListener Listener for life-cycle checkpoint worker events.
+     * @param longJvmPauseDetector Long JVM pause detector.
+     * @param filePageStoreManager File page store manager.
+     * @param dataRegions Data regions.
+     * @param storagePath Storage path.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointManager(
+            Function<Class<?>, IgniteLogger> logger,

Review Comment:
   We should remember to rewrite this part anyways. I believe that we have 
plans to store information like "instance name" in loggers thread-local 
collection, and it would be convenient to get it from there. Design is not yet 
completed



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
+ *
+ * <p>Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced by other implementations if needed.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link 
#checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ *     <li>Collecting all pages from configured dataRegions which was marked 
as dirty under {@link #checkpointTimeoutLock}.</li>
+ *     <li>Marking the start of checkpoint on disk.</li>
+ *     <li>Notifying the subscribers of different checkpoint states through 
{@link CheckpointListener}.</li>
+ *     <li>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+    /** Checkpoint worker. */
+    private final Checkpointer checkpointer;
+
+    /** Main checkpoint steps. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Timeout checkpoint lock which should be used while write to memory 
happened. */
+    private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+    /** Checkpoint page writer factory. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /**
+     * Constructor.
+     *
+     * @param logger Logger producer.
+     * @param igniteInstanceName Ignite instance name.
+     * @param checkpointConfig Checkpoint configuration.
+     * @param workerListener Listener for life-cycle checkpoint worker events.
+     * @param longJvmPauseDetector Long JVM pause detector.
+     * @param filePageStoreManager File page store manager.
+     * @param dataRegions Data regions.
+     * @param storagePath Storage path.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointManager(
+            Function<Class<?>, IgniteLogger> logger,

Review Comment:
   And why would you call this parameter "logger", makes no sense!



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
+ *
+ * <p>Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced by other implementations if needed.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link 
#checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ *     <li>Collecting all pages from configured dataRegions which was marked 
as dirty under {@link #checkpointTimeoutLock}.</li>
+ *     <li>Marking the start of checkpoint on disk.</li>
+ *     <li>Notifying the subscribers of different checkpoint states through 
{@link CheckpointListener}.</li>
+ *     <li>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+    /** Checkpoint worker. */
+    private final Checkpointer checkpointer;
+
+    /** Main checkpoint steps. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Timeout checkpoint lock which should be used while write to memory 
happened. */
+    private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+    /** Checkpoint page writer factory. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /**
+     * Constructor.
+     *
+     * @param logger Logger producer.
+     * @param igniteInstanceName Ignite instance name.
+     * @param checkpointConfig Checkpoint configuration.
+     * @param workerListener Listener for life-cycle checkpoint worker events.
+     * @param longJvmPauseDetector Long JVM pause detector.
+     * @param filePageStoreManager File page store manager.
+     * @param dataRegions Data regions.
+     * @param storagePath Storage path.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointManager(
+            Function<Class<?>, IgniteLogger> logger,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener workerListener,
+            @Nullable LongJvmPauseDetector longJvmPauseDetector,
+            PageMemoryCheckpointConfiguration checkpointConfig,
+            FilePageStoreManager filePageStoreManager,
+            Collection<PageMemoryDataRegion> dataRegions,
+            Path storagePath,
+            // TODO: IGNITE-17017 Move to common config
+            int pageSize
+    ) throws IgniteInternalCheckedException {
+        PageMemoryCheckpointView checkpointConfigView = 
checkpointConfig.value();
+
+        ReentrantReadWriteLockWithTracking reentrantReadWriteLockWithTracking 
= checkpointConfigView.logReadLockHolders()
+                ? new 
ReentrantReadWriteLockWithTracking(logger.apply(CheckpointReadWriteLock.class), 
5_000)
+                : new ReentrantReadWriteLockWithTracking();
+
+        CheckpointReadWriteLock checkpointReadWriteLock = new 
CheckpointReadWriteLock(reentrantReadWriteLockWithTracking);
+
+        checkpointMarkersStorage = new CheckpointMarkersStorage(storagePath);
+
+        checkpointWorkflow = new CheckpointWorkflow(
+                checkpointConfig,
+                checkpointMarkersStorage,
+                checkpointReadWriteLock,
+                dataRegions
+        );
+
+        checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
+                logger.apply(CheckpointPagesWriterFactory.class),
+                (fullPage, buf, tag) -> 
filePageStoreManager.write(fullPage.groupId(), fullPage.pageId(), buf, tag, 
true),
+                pageSize
+        );
+
+        checkpointer = new Checkpointer(
+                logger.apply(Checkpoint.class),
+                igniteInstanceName,
+                workerListener,
+                longJvmPauseDetector,
+                checkpointWorkflow,
+                checkpointPagesWriterFactory,
+                checkpointConfig
+        );
+
+        checkpointTimeoutLock = new CheckpointTimeoutLock(
+                logger.apply(CheckpointTimeoutLock.class),
+                checkpointReadWriteLock,
+                checkpointConfigView.readLockTimeout(),
+                () -> safeToUpdateAllPageMemories(dataRegions),
+                checkpointer
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        checkpointWorkflow.start();
+
+        checkpointer.start();
+
+        checkpointTimeoutLock.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.closeAll(
+                checkpointTimeoutLock::stop,
+                checkpointer::stop,
+                checkpointWorkflow::stop
+        );
+    }
+
+    /**
+     * Returns checkpoint timeout lock which can be used for protection of 
writing to memory.
+     */
+    public CheckpointTimeoutLock checkpointTimeoutLock() {
+        return checkpointTimeoutLock;
+    }
+
+    /**
+     * Adds a listener to be called for the corresponding persistent data 
region.
+     *
+     * @param listener Listener.
+     * @param dataRegion Persistent data region for which listener is 
corresponded to, {@code null} for all regions.
+     */
+    public void addCheckpointListener(CheckpointListener listener, 
PageMemoryDataRegion dataRegion) {
+        checkpointWorkflow.addCheckpointListener(listener, dataRegion);
+    }
+
+    /**
+     * Removes the listener.
+     *
+     * @param listener Listener.
+     */
+    public void removeCheckpointListener(CheckpointListener listener) {
+        checkpointWorkflow.removeCheckpointListener(listener);
+    }
+
+    /**
+     * Start the new checkpoint immediately.
+     *
+     * @param reason Checkpoint reason.
+     * @param finishListener Listener which will be called on finish 
checkpoint.
+     * @return Triggered checkpoint progress.
+     */
+    public CheckpointProgress forceCheckpoint(
+            String reason,
+            @Nullable BiConsumer<Void, Throwable> finishListener

Review Comment:
   I still don't understand this. Don't we have checkpoint futures for various 
stages? Can't we just add listeners to them?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
+ *
+ * <p>Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced by other implementations if needed.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link 
#checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ *     <li>Collecting all pages from configured dataRegions which was marked 
as dirty under {@link #checkpointTimeoutLock}.</li>
+ *     <li>Marking the start of checkpoint on disk.</li>
+ *     <li>Notifying the subscribers of different checkpoint states through 
{@link CheckpointListener}.</li>
+ *     <li>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+    /** Checkpoint worker. */
+    private final Checkpointer checkpointer;
+
+    /** Main checkpoint steps. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Timeout checkpoint lock which should be used while write to memory 
happened. */
+    private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+    /** Checkpoint page writer factory. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /**
+     * Constructor.
+     *
+     * @param logger Logger producer.
+     * @param igniteInstanceName Ignite instance name.
+     * @param checkpointConfig Checkpoint configuration.
+     * @param workerListener Listener for life-cycle checkpoint worker events.
+     * @param longJvmPauseDetector Long JVM pause detector.
+     * @param filePageStoreManager File page store manager.
+     * @param dataRegions Data regions.
+     * @param storagePath Storage path.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointManager(
+            Function<Class<?>, IgniteLogger> logger,

Review Comment:
   Is there another way to do this? What was your intention?
   AFAIK, we have only one entrance into getting logger instance, why bother?



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static 
org.apache.ignite.internal.pagememory.PageMemoryTestUtils.newDataRegion;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.safeToUpdateAllPageMemories;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.IgniteLogger;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link CheckpointManager} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class CheckpointManagerTest {
+    @InjectConfiguration
+    private PageMemoryCheckpointConfiguration checkpointConfig;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @Test
+    void testSimple() throws Exception {
+        PageMemoryDataRegion dataRegion = newDataRegion(true, 
mock(PageMemoryImpl.class));
+
+        CheckpointManager checkpointManager = new CheckpointManager(
+                IgniteLogger::forClass,
+                "test",
+                null,

Review Comment:
   Can these be passed  with setters instead of constructor parameters? They 
are clearly optional



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java:
##########
@@ -78,56 +82,193 @@ public void testPageHandleDeallocation() {
     }
 
     @Test
-    void testDirtyPages() throws Exception {
-        PageMemoryImpl memory = (PageMemoryImpl) memory();
+    void testDirtyPages(
+            @InjectConfiguration PageMemoryCheckpointConfiguration 
checkpointConfig,
+            @WorkDirectory Path workDir
+    ) throws Exception {
+        FilePageStoreManager filePageStoreManager = 
createFilePageStoreManager(workDir);
+
+        Collection<PageMemoryDataRegion> dataRegions = new ArrayList<>();
+
+        CheckpointManager checkpointManager = 
createCheckpointManager(checkpointConfig, workDir, filePageStoreManager, 
dataRegions);
+
+        PageMemoryImpl pageMemoryImpl = 
createPageMemoryImpl(defaultSegmentSizes(), filePageStoreManager, 
checkpointManager);
+
+        dataRegions.add(newDataRegion(true, pageMemoryImpl));
 
-        memory.start();
+        filePageStoreManager.start();
+
+        checkpointManager.start();
+
+        pageMemoryImpl.start();
 
         try {
-            Set<FullPageId> dirtyPages = Set.of(allocatePage(memory), 
allocatePage(memory));
+            initGroupFilePageStores(filePageStoreManager);
+
+            checkpointManager.checkpointTimeoutLock().checkpointReadLock();
 
-            assertThat(memory.dirtyPages(), equalTo(dirtyPages));
+            try {
+                Set<FullPageId> dirtyPages = 
Set.of(createDirtyPage(pageMemoryImpl), createDirtyPage(pageMemoryImpl));
+
+                assertThat(pageMemoryImpl.dirtyPages(), equalTo(dirtyPages));
+            } finally {
+                
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+            }
 
-            // TODO: IGNITE-16984 After the checkpoint check that there are no 
dirty pages
+            checkpointManager
+                    .forceCheckpoint("for_test_flash_dirty_pages", null)
+                    .futureFor(FINISHED)
+                    .get(100, MILLISECONDS);
+
+            assertThat(pageMemoryImpl.dirtyPages(), empty());
         } finally {
-            memory.stop(true);
+            closeAll(
+                    () -> pageMemoryImpl.stop(true),
+                    checkpointManager::stop,
+                    filePageStoreManager::stop
+            );
         }
     }
 
     @Test
-    void testSafeToUpdate() throws Exception {
+    void testSafeToUpdate(
+            @InjectConfiguration PageMemoryCheckpointConfiguration 
checkpointConfig,
+            @WorkDirectory Path workDir
+    ) throws Exception {
+        FilePageStoreManager filePageStoreManager = 
createFilePageStoreManager(workDir);
+
+        Collection<PageMemoryDataRegion> dataRegions = new ArrayList<>();
+
+        CheckpointManager checkpointManager = 
createCheckpointManager(checkpointConfig, workDir, filePageStoreManager, 
dataRegions);
+
         long systemPageSize = PAGE_SIZE + PAGE_OVERHEAD;
 
-        dataRegionCfg
-                .change(c -> c.changeInitSize(128 * 
systemPageSize).changeMaxSize(128 * systemPageSize))
-                .get(1, SECONDS);
+        dataRegionCfg.change(c -> c.changeInitSize(128 * 
systemPageSize).changeMaxSize(128 * systemPageSize)).get(1, SECONDS);
 
-        PageMemoryImpl memory = memory(new long[]{100 * systemPageSize, 28 * 
systemPageSize});
+        PageMemoryImpl pageMemoryImpl = createPageMemoryImpl(
+                new long[]{100 * systemPageSize, 28 * systemPageSize},
+                filePageStoreManager,
+                checkpointManager
+        );
+
+        dataRegions.add(newDataRegion(true, pageMemoryImpl));
+
+        filePageStoreManager.start();
+
+        checkpointManager.start();
 
-        memory.start();
+        pageMemoryImpl.start();
 
         try {
-            long maxPages = memory.totalPages();
+            initGroupFilePageStores(filePageStoreManager);
+
+            long maxPages = pageMemoryImpl.totalPages();
 
             long maxDirtyPages = (maxPages * 3 / 4);
 
             assertThat(maxDirtyPages, greaterThanOrEqualTo(50L));
 
-            for (int i = 0; i < maxDirtyPages - 1; i++) {
-                allocatePage(memory);
+            checkpointManager.checkpointTimeoutLock().checkpointReadLock();
 
-                assertTrue(memory.safeToUpdate(), "i=" + i);
-            }
+            try {
+                for (int i = 0; i < maxDirtyPages - 1; i++) {
+                    createDirtyPage(pageMemoryImpl);
 
-            for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
-                allocatePage(memory);
+                    assertTrue(pageMemoryImpl.safeToUpdate(), "i=" + i);
+                }
 
-                assertFalse(memory.safeToUpdate(), "i=" + i);
+                for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
+                    createDirtyPage(pageMemoryImpl);
+
+                    assertFalse(pageMemoryImpl.safeToUpdate(), "i=" + i);
+                }
+            } finally {
+                
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
             }
 
-            // TODO: IGNITE-16984 After the checkpoint check 
assertTrue(memory.safeToUpdate())
+            checkpointManager
+                    .forceCheckpoint("for_test_safe_to_update", null)
+                    .futureFor(FINISHED)
+                    .get(100, MILLISECONDS);
+
+            assertTrue(pageMemoryImpl.safeToUpdate());
         } finally {
-            memory.stop(true);
+            closeAll(
+                    () -> pageMemoryImpl.stop(true),
+                    checkpointManager::stop,
+                    filePageStoreManager::stop
+            );
+        }
+    }
+
+    protected PageMemoryImpl createPageMemoryImpl(
+            long[] sizes,
+            @Nullable FilePageStoreManager filePageStoreManager,
+            @Nullable CheckpointManager checkpointManager
+    ) {
+        PageIoRegistry ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        return new PageMemoryImpl(
+                new UnsafeMemoryProvider(null),

Review Comment:
   Why is this a parameter? I thought we should derive this instance from the 
configuration. There's a specific config value for allocator type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to