tkalkirill commented on code in PR #806:
URL: https://github.com/apache/ignite-3/pull/806#discussion_r874714073
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,12 +17,766 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
/**
- * Empty.
+ * Checkpointer object is used for notification on checkpoint begin, predicate
is {@code nextCheckpointTimestamps - now > 0}.
+ *
+ * <p>Method {@link #scheduleCheckpoint} uses {@link Object#notifyAll()},
{@link #waitCheckpointEvent} uses {@link Object#wait(long)}.
+ *
+ * <p>Checkpointer is one threaded which means that only one checkpoint at the
one moment possible.
+ *
+ * <p>Responsiblity:
+ * <ul>
+ * <li>Provide the API for schedule/trigger the checkpoint.</li>
+ * <li>Schedule new checkpoint after current one according to checkpoint
frequency.</li>
+ * <li>Failure handling.</li>
+ * <li>Managing of page write threads.</li>
+ * <li>Logging and metrics of checkpoint.</li>
+ * </ul>
+ *
+ * <p>Checkpointer steps:
+ * <ul>
+ * <li>Awaiting checkpoint event.</li>
+ * <li>Collect all dirty pages from page memory under checkpoint write
lock.</li>
+ * <li>Start to write dirty pages to page store.</li>
+ * <li>Finish the checkpoint.
+ * </ul>
*/
-// TODO: IGNITE-16935 Continue porting the code
-public abstract class Checkpointer {
- public abstract Thread runner();
+public class Checkpointer extends IgniteWorker implements IgniteComponent {
+ private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint
started ["
+ + "checkpointId=%s, "
+ + "checkpointBeforeWriteLockTime=%dms, "
+ + "checkpointWriteLockWait=%dms, "
+ + "checkpointListenersExecuteTime=%dms, "
+ + "checkpointWriteLockHoldTime=%dms, "
+ + "splitAndSortPagesDuration=%dms, "
+ + "%s"
+ + "pages=%d, "
+ + "reason='%s']";
+
+ /** Avoid the start checkpoint if checkpointer was canceled. */
+ // TODO: IGNITE-16984 Move to config
+ private volatile boolean skipCheckpointOnNodeStop =
getBoolean("IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP", false);
+
+ /** Pause detector. */
+ @Nullable
+ private final LongJvmPauseDetector pauseDetector;
+
+ /** Supplier interval in ms after which the checkpoint is triggered if
there are no other events. */
+ private final LongSupplier checkpointFrequencySupplier;
+
+ /** Strategy of where and how to get the pages. */
+ private final CheckpointWorkflow checkpointWorkflow;
+
+ /** Factory for the creation of page-write workers. */
+ private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+ /** Checkpoint runner thread pool. If {@code null} tasks are to be run in
single thread. */
+ @Nullable
+ private final ThreadPoolExecutor checkpointWritePagesPool;
+
+ /** Next scheduled checkpoint progress. */
+ private volatile CheckpointProgressImpl scheduledCheckpointProgress;
+
+ /** Current checkpoint progress. This field is updated only by checkpoint
thread. */
+ @Nullable
+ private volatile CheckpointProgressImpl currentCheckpointProgress;
+
+ /** Shutdown now. */
+ private volatile boolean shutdownNow;
+
+ /** Last checkpoint timestamp, read/update only in checkpoint thread. */
+ private long lastCheckpointTimestamp;
+
+ @TestOnly
+ @Nullable
+ private volatile CompletableFuture<?> enableChangeAppliedFuture;
+
+ @TestOnly
+ private volatile boolean checkpointsEnabled = true;
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ * @param igniteInstanceName Name of the Ignite instance.
+ * @param workerListener Listener for life-cycle worker events.
+ * @param detector Long JVM pause detector.
+ * @param checkpointWorkFlow Implementation of checkpoint.
+ * @param factory Page writer factory.
+ * @param checkpointFrequencySupplier Supplier interval in ms after which
the checkpoint is triggered if there are no other events.
+ * @param checkpointWritePageThreads The number of IO-bound threads which
will write pages to disk.
+ */
+ Checkpointer(
+ IgniteLogger log,
+ String igniteInstanceName,
+ @Nullable IgniteWorkerListener workerListener,
+ @Nullable LongJvmPauseDetector detector,
+ CheckpointWorkflow checkpointWorkFlow,
+ CheckpointPagesWriterFactory factory,
+ int checkpointWritePageThreads,
+ LongSupplier checkpointFrequencySupplier
+ ) {
+ super(log, igniteInstanceName, "checkpoint-thread", workerListener);
+
+ this.pauseDetector = detector;
+ // TODO: IGNITE-16984 Move to config: checkpointFrequency *
checkpointFrequencyDeviation see 2.0
+ this.checkpointFrequencySupplier = checkpointFrequencySupplier;
+ this.checkpointWorkflow = checkpointWorkFlow;
+ this.checkpointPagesWriterFactory = factory;
+
+ scheduledCheckpointProgress = new
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+ // TODO: IGNITE-16984 Move checkpointWritePageThreads to config
+ if (checkpointWritePageThreads > 1) {
+ checkpointWritePagesPool = new ThreadPoolExecutor(
+ checkpointWritePageThreads,
+ checkpointWritePageThreads,
+ 30_000,
+ MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory(CHECKPOINT_RUNNER_THREAD_PREFIX +
"-IO")
+ );
+ } else {
+ checkpointWritePagesPool = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void body() {
+ try {
+ while (!isCancelled()) {
+ waitCheckpointEvent();
+
+ if (skipCheckpointOnNodeStop && (isCancelled() ||
shutdownNow)) {
+ if (log.isInfoEnabled()) {
+ log.warn("Skipping last checkpoint because node is
stopping.");
Review Comment:
Fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]