ibessonov commented on code in PR #791:
URL: https://github.com/apache/ignite-3/pull/791#discussion_r860616861


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory;
+
+import org.apache.ignite.internal.manager.IgniteComponent;
+
+/**
+ * Data region based on {@link PageMemory}.
+ */
+public interface PageMemoryDataRegion extends IgniteComponent {

Review Comment:
   Why is this a component?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1171,7 +1250,7 @@ public Collection<FullPageId> dirtyPages() {
     /**
      * Page segment.
      */
-    class Segment extends ReentrantReadWriteLock {
+    public class Segment extends ReentrantReadWriteLock {

Review Comment:
   Why is it public?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Wrapper of the classic read write lock with checkpoint features.
+ */
+public class CheckpointReadWriteLock {
+    private final ThreadLocal<Integer> checkpointReadLockHoldCount = 
ThreadLocal.withInitial(() -> 0);
+
+    /**
+     * Any thread with a such prefix is managed by the checkpoint. So some 
conditions can rely on it(ex. we don't need a checkpoint lock
+     * there because checkpoint is already held write lock).
+     */
+    // TODO: IGNITE-16898 I think it needs to be redone or relocated
+    static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+
+    /** Checkpoint lock. */
+    private final ReentrantReadWriteLockWithTracking checkpointLock;
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointLock Checkpoint lock.
+     */
+    public CheckpointReadWriteLock(ReentrantReadWriteLockWithTracking 
checkpointLock) {
+        this.checkpointLock = checkpointLock;
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     */
+    public void readLock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        checkpointLock.readLock().lock();
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @param timeout – Time to wait for the read lock.
+     * @param unit – Time unit of the timeout argument.
+     * @throws IgniteInternalException If failed.
+     */
+    public boolean tryReadLock(long timeout, TimeUnit unit) throws 
InterruptedException {
+        if (isWriteLockHeldByCurrentThread()) {
+            return true;
+        }
+
+        boolean res = checkpointLock.readLock().tryLock(timeout, unit);
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);

Review Comment:
   Isn't this a bug?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1598,4 +1727,327 @@ public interface PageChangeTracker {
          */
         void apply(long page, FullPageId fullPageId, PageMemoryEx 
pageMemoryEx);
     }
+
+    /**
+     * Heuristic method which allows a thread to check if it is safe to start 
memory structure modifications in regard with checkpointing.
+     * May return false-negative result during or after partition eviction.
+     *
+     * @return {@code False} if there are too many dirty pages and a thread 
should wait for a checkpoint to begin.
+     */
+    public boolean safeToUpdate() {
+        if (segments != null) {
+            return safeToUpdate.get();
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns number of pages used in checkpoint buffer.
+     */
+    public int usedCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.size();
+    }
+
+    /**
+     * Returns max number of pages in checkpoint buffer.
+     */
+    public int maxCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.pages();
+    }
+
+    private void releaseCheckpointBufferPage(long tmpBufPtr) {
+        checkpointPool.releaseFreePage(tmpBufPtr);
+    }
+
+    /**
+     * Returns {@code true} if it was added to the checkpoint list.
+     *
+     * @param pageId Page ID to check if it was added to the checkpoint list.
+     */
+    boolean isInCheckpoint(FullPageId pageId) {
+        Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        return pages0 != null && pages0.contains(pageId);
+    }
+
+    /**
+     * Returns {@code true} if remove successfully.
+     *
+     * @param fullPageId Page ID to clear.
+     */
+    boolean clearCheckpoint(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        assert pages0 != null;
+
+        return pages0.markAsSaved(fullPageId);
+    }
+
+    private void copyPageForCheckpoint(

Review Comment:
   I'd like to have a javadoc for this method if it's possible



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1386,10 +1489,29 @@ public long refreshOutdatedPage(int grpId, long pageId, 
boolean rmv) {
 
             dirty(absPtr, false);
 
+            long tmpBufPtr = tempBufferPointer(absPtr);
+
+            if (tmpBufPtr != INVALID_REL_PTR) {
+                setMemory(checkpointPool.absolute(tmpBufPtr) + PAGE_OVERHEAD, 
pageSize(), (byte) 0);

Review Comment:
   I added "zeroMemory" method, you should use it instead



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during 
writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Data regions which should be covered by this lock. */
+    private final Supplier<Collection<PageMemoryDataRegion>> 
dataRegionsSupplier;
+
+    /** Internal checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Service for triggering the checkpoint. */
+    private final Checkpointer checkpointer;
+
+    /** Timeout for checkpoint read lock acquisition in milliseconds. */
+    private volatile long checkpointReadLockTimeout;
+
+    /** Stop flag. */
+    private boolean stop;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param checkpointReadWriteLock Checkpoint read-write lock.
+     * @param checkpointReadLockTimeout Timeout for checkpoint read lock 
acquisition in milliseconds.
+     * @param dataRegionsSupplier Data regions which should be covered by this 
lock.
+     */
+    public CheckpointTimeoutLock(
+            IgniteLogger log,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            long checkpointReadLockTimeout,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier,
+            Checkpointer checkpointer
+    ) {
+        this.log = log;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+        this.checkpointer = checkpointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        stop = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        checkpointReadWriteLock.writeLock();
+
+        try {
+            stop = true;
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     * @throws CheckpointReadLockTimeoutException If failed to get checkpoint 
read lock by timeout.
+     */
+    public void checkpointReadLock() {
+        if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        long timeout = checkpointReadLockTimeout;
+
+        long start = coarseCurrentTimeMillis();
+
+        boolean interrupted = false;
+
+        try {
+            for (; ; ) {
+                try {
+                    if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= 
timeout) {
+                        failCheckpointReadLock();
+                    }
+
+                    try {
+                        if (timeout > 0) {
+                            long timeout1 = timeout - 
(coarseCurrentTimeMillis() - start);

Review Comment:
   You could calculate this value earlier, because the same thing is used 
before "failCheckpointReadLock" check. But this is an old code, I get it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -149,6 +171,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     private final DirectMemoryProvider directMemoryProvider;
 
     /** Segments array. */
+    @Nullable

Review Comment:
   What exactly is nullable here? And why? Please reflect this in the comment



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Wrapper of the classic read write lock with checkpoint features.
+ */
+public class CheckpointReadWriteLock {
+    private final ThreadLocal<Integer> checkpointReadLockHoldCount = 
ThreadLocal.withInitial(() -> 0);
+
+    /**
+     * Any thread with a such prefix is managed by the checkpoint. So some 
conditions can rely on it(ex. we don't need a checkpoint lock
+     * there because checkpoint is already held write lock).
+     */
+    // TODO: IGNITE-16898 I think it needs to be redone or relocated
+    static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+
+    /** Checkpoint lock. */
+    private final ReentrantReadWriteLockWithTracking checkpointLock;
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointLock Checkpoint lock.
+     */
+    public CheckpointReadWriteLock(ReentrantReadWriteLockWithTracking 
checkpointLock) {
+        this.checkpointLock = checkpointLock;
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     */
+    public void readLock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        checkpointLock.readLock().lock();
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @param timeout – Time to wait for the read lock.
+     * @param unit – Time unit of the timeout argument.
+     * @throws IgniteInternalException If failed.
+     */
+    public boolean tryReadLock(long timeout, TimeUnit unit) throws 
InterruptedException {
+        if (isWriteLockHeldByCurrentThread()) {
+            return true;
+        }
+
+        boolean res = checkpointLock.readLock().tryLock(timeout, unit);
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+
+        return res;
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @return {@code True} if the checkpoint read lock is acquired.
+     */
+    public boolean tryReadLock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return true;
+        }
+
+        boolean res = checkpointLock.readLock().tryLock();
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);

Review Comment:
   As well as this, I don't understand this increment



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1598,4 +1727,327 @@ public interface PageChangeTracker {
          */
         void apply(long page, FullPageId fullPageId, PageMemoryEx 
pageMemoryEx);
     }
+
+    /**
+     * Heuristic method which allows a thread to check if it is safe to start 
memory structure modifications in regard with checkpointing.
+     * May return false-negative result during or after partition eviction.
+     *
+     * @return {@code False} if there are too many dirty pages and a thread 
should wait for a checkpoint to begin.
+     */
+    public boolean safeToUpdate() {
+        if (segments != null) {
+            return safeToUpdate.get();
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns number of pages used in checkpoint buffer.
+     */
+    public int usedCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;

Review Comment:
   How can this be null?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during 
writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Data regions which should be covered by this lock. */
+    private final Supplier<Collection<PageMemoryDataRegion>> 
dataRegionsSupplier;
+
+    /** Internal checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Service for triggering the checkpoint. */
+    private final Checkpointer checkpointer;
+
+    /** Timeout for checkpoint read lock acquisition in milliseconds. */
+    private volatile long checkpointReadLockTimeout;
+
+    /** Stop flag. */
+    private boolean stop;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param checkpointReadWriteLock Checkpoint read-write lock.
+     * @param checkpointReadLockTimeout Timeout for checkpoint read lock 
acquisition in milliseconds.
+     * @param dataRegionsSupplier Data regions which should be covered by this 
lock.
+     */
+    public CheckpointTimeoutLock(
+            IgniteLogger log,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            long checkpointReadLockTimeout,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier,
+            Checkpointer checkpointer
+    ) {
+        this.log = log;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+        this.checkpointer = checkpointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        stop = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        checkpointReadWriteLock.writeLock();
+
+        try {
+            stop = true;
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     * @throws CheckpointReadLockTimeoutException If failed to get checkpoint 
read lock by timeout.
+     */
+    public void checkpointReadLock() {
+        if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        long timeout = checkpointReadLockTimeout;
+
+        long start = coarseCurrentTimeMillis();
+
+        boolean interrupted = false;
+
+        try {
+            for (; ; ) {
+                try {
+                    if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= 
timeout) {
+                        failCheckpointReadLock();
+                    }
+
+                    try {
+                        if (timeout > 0) {
+                            long timeout1 = timeout - 
(coarseCurrentTimeMillis() - start);
+
+                            if (!checkpointReadWriteLock.tryReadLock(timeout1, 
MILLISECONDS)) {
+                                failCheckpointReadLock();
+                            }
+                        } else {
+                            checkpointReadWriteLock.readLock();
+                        }
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+
+                        continue;
+                    }
+
+                    if (stop) {
+                        checkpointReadWriteLock.readUnlock();
+
+                        throw new IgniteInternalException(new 
NodeStoppingException("Failed to get checkpoint read lock"));
+                    }
+
+                    if (checkpointReadWriteLock.getReadHoldCount() > 1 || 
safeToUpdateAllPageMemories() || checkpointer.runner() == null) {
+                        break;
+                    } else {
+                        // If the checkpoint is triggered outside the lock,
+                        // it could cause the checkpoint to fire again for the 
same reason
+                        // (due to a data race between collecting dirty pages 
and triggering the checkpoint).
+                        CheckpointProgress checkpoint = 
checkpointer.scheduleCheckpoint(0, "too many dirty pages");
+
+                        checkpointReadWriteLock.readUnlock();
+
+                        if (timeout > 0 && coarseCurrentTimeMillis() - start 
>= timeout) {
+                            failCheckpointReadLock();
+                        }
+
+                        try {
+                            
getUninterruptibly(checkpoint.futureFor(LOCK_RELEASED));
+                        } catch (ExecutionException e) {
+                            throw new IgniteInternalException("Failed to wait 
for checkpoint begin", e.getCause());
+                        } catch (CancellationException e) {
+                            throw new IgniteInternalException("Failed to wait 
for checkpoint begin", e);
+                        }
+                    }
+                } catch (CheckpointReadLockTimeoutException e) {
+                    log.error(e.getMessage(), e);
+
+                    throw e;
+
+                    // TODO: IGNITE-16899 Should reset the timeout and try 
again

Review Comment:
   Is this an old issue or a new one?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during 
writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Data regions which should be covered by this lock. */
+    private final Supplier<Collection<PageMemoryDataRegion>> 
dataRegionsSupplier;

Review Comment:
   Usage of suppliers doesn't look right to me. All that it's used for is a 
"safeToUpdate" check. Can't we just pass "safeToUpdate" boolean supplier at 
least? Would be more clean this way I believe



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java:
##########
@@ -1598,4 +1727,327 @@ public interface PageChangeTracker {
          */
         void apply(long page, FullPageId fullPageId, PageMemoryEx 
pageMemoryEx);
     }
+
+    /**
+     * Heuristic method which allows a thread to check if it is safe to start 
memory structure modifications in regard with checkpointing.
+     * May return false-negative result during or after partition eviction.
+     *
+     * @return {@code False} if there are too many dirty pages and a thread 
should wait for a checkpoint to begin.
+     */
+    public boolean safeToUpdate() {
+        if (segments != null) {
+            return safeToUpdate.get();
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns number of pages used in checkpoint buffer.
+     */
+    public int usedCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.size();
+    }
+
+    /**
+     * Returns max number of pages in checkpoint buffer.
+     */
+    public int maxCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.pages();
+    }
+
+    private void releaseCheckpointBufferPage(long tmpBufPtr) {
+        checkpointPool.releaseFreePage(tmpBufPtr);
+    }
+
+    /**
+     * Returns {@code true} if it was added to the checkpoint list.
+     *
+     * @param pageId Page ID to check if it was added to the checkpoint list.
+     */
+    boolean isInCheckpoint(FullPageId pageId) {
+        Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        return pages0 != null && pages0.contains(pageId);
+    }
+
+    /**
+     * Returns {@code true} if remove successfully.
+     *
+     * @param fullPageId Page ID to clear.
+     */
+    boolean clearCheckpoint(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        assert pages0 != null;
+
+        return pages0.markAsSaved(fullPageId);
+    }
+
+    private void copyPageForCheckpoint(
+            long absPtr,
+            FullPageId fullId,
+            ByteBuffer buf,
+            Integer tag,
+            boolean pageSingleAcquire,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert absPtr != 0;
+        assert isAcquired(absPtr) || !isInCheckpoint(fullId);
+
+        // Exception protection flag.
+        // No need to write if exception occurred.
+        boolean canWrite = false;
+
+        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
+
+        if (!locked) {
+            // We release the page only once here because this page will be 
copied sometime later and
+            // will be released properly then.
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            buf.clear();
+
+            if (isInCheckpoint(fullId)) {
+                pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+            }
+
+            return;
+        }
+
+        if (!clearCheckpoint(fullId)) {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            return;
+        }
+
+        try {
+            long tmpRelPtr = tempBufferPointer(absPtr);
+
+            if (tmpRelPtr != INVALID_REL_PTR) {
+                tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+                long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+                copyInBuffer(tmpAbsPtr, buf);
+
+                fullPageId(tmpAbsPtr, NULL_PAGE);
+
+                setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                releaseCheckpointBufferPage(tmpRelPtr);
+
+                // Need release again because we pin page when resolve abs 
pointer,
+                // and page did not have tmp buffer page.
+                if (!pageSingleAcquire) {
+                    PageHeader.releasePage(absPtr);
+                }
+            } else {
+                copyInBuffer(absPtr, buf);
+
+                dirty(absPtr, false);
+            }
+
+            assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + 
hexLong(fullId.pageId());
+            assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId 
= " + hexLong(fullId.pageId());
+
+            canWrite = true;
+        } finally {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (canWrite) {
+                buf.rewind();
+
+                pageStoreWriter.writePage(fullId, buf, tag);
+
+                buf.rewind();
+            }
+
+            // We pinned the page either when allocated the temp buffer, or 
when resolved abs pointer.
+            // Must release the page only after write unlock.
+            PageHeader.releasePage(absPtr);
+        }
+    }
+
+    /**
+     * Prepare page for write during checkpoint. {@link PageStoreWriter} will 
be called when the page will be ready to write.
+     *
+     * @param fullId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link
+     * #beginCheckpoint(CompletableFuture)} method call.
+     * @param buf Temporary buffer to write changes into.
+     * @param pageStoreWriter Checkpoint page write context.
+     * @throws IgniteInternalCheckedException If failed to obtain page data.
+     */
+    public void checkpointWritePage(
+            FullPageId fullId,
+            ByteBuffer buf,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert buf.remaining() == pageSize();
+
+        Segment seg = segment(fullId.groupId(), fullId.pageId());
+
+        long absPtr = 0;
+
+        long relPtr;
+
+        int tag;
+
+        boolean pageSingleAcquire = false;
+
+        seg.readLock().lock();
+
+        try {
+            if (!isInCheckpoint(fullId)) {
+                return;
+            }
+
+            relPtr = resolveRelativePointer(seg, fullId, tag = 
generationTag(seg, fullId));
+
+            // Page may have been cleared during eviction. We have nothing to 
do in this case.
+            if (relPtr == INVALID_REL_PTR) {
+                return;
+            }
+
+            if (relPtr != OUTDATED_REL_PTR) {
+                absPtr = seg.absolute(relPtr);
+
+                // Pin the page until page will not be copied. This helpful to 
prevent page replacement of this page.
+                if (tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+                    PageHeader.acquirePage(absPtr);
+                } else {
+                    pageSingleAcquire = true;
+                }
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        if (relPtr == OUTDATED_REL_PTR) {
+            seg.writeLock().lock();
+
+            try {
+                // Double-check.
+                relPtr = resolveRelativePointer(seg, fullId, 
generationTag(seg, fullId));
+
+                if (relPtr == INVALID_REL_PTR) {
+                    return;
+                }
+
+                if (relPtr == OUTDATED_REL_PTR) {
+                    relPtr = seg.refreshOutdatedPage(
+                            fullId.groupId(),
+                            fullId.effectivePageId(),
+                            true
+                    );
+
+                    seg.pageReplacementPolicy.onRemove(relPtr);
+
+                    seg.pool.releaseFreePage(relPtr);
+                }
+
+                return;
+            } finally {
+                seg.writeLock().unlock();
+            }
+        }
+
+        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, 
pageStoreWriter);
+    }
+
+    /**
+     * Get arbitrary page from cp buffer.
+     */
+    public FullPageId pullPageFromCpBuffer() {
+        long idx = getLong(checkpointPool.lastAllocatedIdxPtr);
+
+        long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+        while (--lastIdx > 1) {
+            assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+            long relative = checkpointPool.relative(lastIdx);
+
+            long freePageAbsPtr = checkpointPool.absolute(relative);
+
+            FullPageId fullPageId = fullPageId(freePageAbsPtr);
+
+            if (fullPageId.pageId() == NULL_PAGE.pageId() || 
fullPageId.groupId() == NULL_PAGE.groupId()) {
+                continue;
+            }
+
+            if (!isInCheckpoint(fullPageId)) {
+                continue;
+            }
+
+            return fullPageId;
+        }
+
+        return NULL_PAGE;
+    }
+
+    /**
+     * Gets a collection of dirty page IDs since the last checkpoint. If a 
dirty page is being written after the checkpointing operation
+     * begun, the modifications will be written to a temporary buffer which 
will be flushed to the main memory after the checkpointing
+     * finished. This method must be called when no concurrent operations on 
pages are performed.
+     *
+     * @param allowToReplace The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @return Collection view of dirty page IDs.
+     * @throws IgniteInternalException If checkpoint has been already started 
and was not finished.
+     */
+    public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> 
allowToReplace) throws IgniteInternalException {
+        if (segments == null) {
+            return List.of();
+        }
+
+        Collection<FullPageId>[] collections = new Collection[segments.length];
+
+        for (int i = 0; i < segments.length; i++) {
+            Segment seg = segments[i];
+
+            if (seg.checkpointPages != null) {
+                throw new IgniteInternalException("Failed to begin checkpoint 
(it is already in progress).");
+            }
+
+            Collection<FullPageId> dirtyPages = seg.dirtyPages;
+            collections[i] = dirtyPages;
+
+            seg.checkpointPages = new CheckpointPages(dirtyPages, 
allowToReplace);
+
+            seg.resetDirtyPages();
+        }
+
+        safeToUpdate.set(true);
+
+        return CollectionUtils.union(collections);

Review Comment:
   I wander what's going on in this "union" method. Maybe we should revisit it 
in the future.
   
   Here's my idea: given that we determine a segment by hashing group id and 
page id, we could implement faster "sharder" contains method. What do you think?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+/**
+ * Possible checkpoint states. Ordinal is important. Every next state follows 
the previous one.
+ */
+// TODO: IGNITE-16898 Review states
+public enum CheckpointState {
+    /** Checkpoint is waiting to execution. **/
+    SCHEDULED,
+
+    /** Checkpoint was awakened and it is preparing to start. **/
+    LOCK_TAKEN,
+
+    /** Dirty pages snapshot has been taken. **/
+    PAGE_SNAPSHOT_TAKEN,
+
+    /** Checkpoint counted the pages and write lock was released. **/
+    LOCK_RELEASED,
+
+    // TODO: IGNITE-16898 It will most likely need to be removed

Review Comment:
   We will have a checkpoint marker on disk, don't we?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during 
writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Data regions which should be covered by this lock. */
+    private final Supplier<Collection<PageMemoryDataRegion>> 
dataRegionsSupplier;
+
+    /** Internal checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Service for triggering the checkpoint. */
+    private final Checkpointer checkpointer;
+
+    /** Timeout for checkpoint read lock acquisition in milliseconds. */
+    private volatile long checkpointReadLockTimeout;
+
+    /** Stop flag. */
+    private boolean stop;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param checkpointReadWriteLock Checkpoint read-write lock.
+     * @param checkpointReadLockTimeout Timeout for checkpoint read lock 
acquisition in milliseconds.
+     * @param dataRegionsSupplier Data regions which should be covered by this 
lock.
+     */
+    public CheckpointTimeoutLock(
+            IgniteLogger log,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            long checkpointReadLockTimeout,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier,
+            Checkpointer checkpointer
+    ) {
+        this.log = log;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+        this.checkpointer = checkpointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        stop = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        checkpointReadWriteLock.writeLock();
+
+        try {
+            stop = true;
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     * @throws CheckpointReadLockTimeoutException If failed to get checkpoint 
read lock by timeout.
+     */
+    public void checkpointReadLock() {
+        if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        long timeout = checkpointReadLockTimeout;
+
+        long start = coarseCurrentTimeMillis();
+
+        boolean interrupted = false;
+
+        try {
+            for (; ; ) {
+                try {
+                    if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= 
timeout) {
+                        failCheckpointReadLock();
+                    }
+
+                    try {
+                        if (timeout > 0) {
+                            long timeout1 = timeout - 
(coarseCurrentTimeMillis() - start);

Review Comment:
   what a name for the variable!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to