tkalkirill commented on code in PR #791:
URL: https://github.com/apache/ignite-3/pull/791#discussion_r860961607


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during 
writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Data regions which should be covered by this lock. */
+    private final Supplier<Collection<PageMemoryDataRegion>> 
dataRegionsSupplier;
+
+    /** Internal checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Service for triggering the checkpoint. */
+    private final Checkpointer checkpointer;
+
+    /** Timeout for checkpoint read lock acquisition in milliseconds. */
+    private volatile long checkpointReadLockTimeout;
+
+    /** Stop flag. */
+    private boolean stop;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param checkpointReadWriteLock Checkpoint read-write lock.
+     * @param checkpointReadLockTimeout Timeout for checkpoint read lock 
acquisition in milliseconds.
+     * @param dataRegionsSupplier Data regions which should be covered by this 
lock.
+     */
+    public CheckpointTimeoutLock(
+            IgniteLogger log,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            long checkpointReadLockTimeout,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier,
+            Checkpointer checkpointer
+    ) {
+        this.log = log;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+        this.checkpointer = checkpointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        stop = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        checkpointReadWriteLock.writeLock();
+
+        try {
+            stop = true;
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     * @throws CheckpointReadLockTimeoutException If failed to get checkpoint 
read lock by timeout.
+     */
+    public void checkpointReadLock() {
+        if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        long timeout = checkpointReadLockTimeout;
+
+        long start = coarseCurrentTimeMillis();
+
+        boolean interrupted = false;
+
+        try {
+            for (; ; ) {
+                try {
+                    if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= 
timeout) {
+                        failCheckpointReadLock();
+                    }
+
+                    try {
+                        if (timeout > 0) {
+                            long timeout1 = timeout - 
(coarseCurrentTimeMillis() - start);
+
+                            if (!checkpointReadWriteLock.tryReadLock(timeout1, 
MILLISECONDS)) {
+                                failCheckpointReadLock();
+                            }
+                        } else {
+                            checkpointReadWriteLock.readLock();
+                        }
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+
+                        continue;
+                    }
+
+                    if (stop) {
+                        checkpointReadWriteLock.readUnlock();
+
+                        throw new IgniteInternalException(new 
NodeStoppingException("Failed to get checkpoint read lock"));
+                    }
+
+                    if (checkpointReadWriteLock.getReadHoldCount() > 1 || 
safeToUpdateAllPageMemories() || checkpointer.runner() == null) {
+                        break;
+                    } else {
+                        // If the checkpoint is triggered outside the lock,
+                        // it could cause the checkpoint to fire again for the 
same reason
+                        // (due to a data race between collecting dirty pages 
and triggering the checkpoint).
+                        CheckpointProgress checkpoint = 
checkpointer.scheduleCheckpoint(0, "too many dirty pages");
+
+                        checkpointReadWriteLock.readUnlock();
+
+                        if (timeout > 0 && coarseCurrentTimeMillis() - start 
>= timeout) {
+                            failCheckpointReadLock();
+                        }
+
+                        try {
+                            
getUninterruptibly(checkpoint.futureFor(LOCK_RELEASED));
+                        } catch (ExecutionException e) {
+                            throw new IgniteInternalException("Failed to wait 
for checkpoint begin", e.getCause());
+                        } catch (CancellationException e) {
+                            throw new IgniteInternalException("Failed to wait 
for checkpoint begin", e);
+                        }
+                    }
+                } catch (CheckpointReadLockTimeoutException e) {
+                    log.error(e.getMessage(), e);
+
+                    throw e;
+
+                    // TODO: IGNITE-16899 Should reset the timeout and try 
again

Review Comment:
   It's new.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to