tkalkirill commented on code in PR #806:
URL: https://github.com/apache/ignite-3/pull/806#discussion_r874755979
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,12 +17,766 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
/**
- * Empty.
+ * Checkpointer object is used for notification on checkpoint begin, predicate
is {@code nextCheckpointTimestamps - now > 0}.
+ *
+ * <p>Method {@link #scheduleCheckpoint} uses {@link Object#notifyAll()},
{@link #waitCheckpointEvent} uses {@link Object#wait(long)}.
+ *
+ * <p>Checkpointer is one threaded which means that only one checkpoint at the
one moment possible.
+ *
+ * <p>Responsiblity:
+ * <ul>
+ * <li>Provide the API for schedule/trigger the checkpoint.</li>
+ * <li>Schedule new checkpoint after current one according to checkpoint
frequency.</li>
+ * <li>Failure handling.</li>
+ * <li>Managing of page write threads.</li>
+ * <li>Logging and metrics of checkpoint.</li>
+ * </ul>
+ *
+ * <p>Checkpointer steps:
+ * <ul>
+ * <li>Awaiting checkpoint event.</li>
+ * <li>Collect all dirty pages from page memory under checkpoint write
lock.</li>
+ * <li>Start to write dirty pages to page store.</li>
+ * <li>Finish the checkpoint.
+ * </ul>
*/
-// TODO: IGNITE-16935 Continue porting the code
-public abstract class Checkpointer {
- public abstract Thread runner();
+public class Checkpointer extends IgniteWorker implements IgniteComponent {
+ private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint
started ["
+ + "checkpointId=%s, "
+ + "checkpointBeforeWriteLockTime=%dms, "
+ + "checkpointWriteLockWait=%dms, "
+ + "checkpointListenersExecuteTime=%dms, "
+ + "checkpointWriteLockHoldTime=%dms, "
+ + "splitAndSortPagesDuration=%dms, "
+ + "%s"
+ + "pages=%d, "
+ + "reason='%s']";
+
+ /** Avoid the start checkpoint if checkpointer was canceled. */
+ // TODO: IGNITE-16984 Move to config
+ private volatile boolean skipCheckpointOnNodeStop =
getBoolean("IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP", false);
+
+ /** Pause detector. */
+ @Nullable
+ private final LongJvmPauseDetector pauseDetector;
+
+ /** Supplier interval in ms after which the checkpoint is triggered if
there are no other events. */
+ private final LongSupplier checkpointFrequencySupplier;
+
+ /** Strategy of where and how to get the pages. */
+ private final CheckpointWorkflow checkpointWorkflow;
+
+ /** Factory for the creation of page-write workers. */
+ private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+ /** Checkpoint runner thread pool. If {@code null} tasks are to be run in
single thread. */
+ @Nullable
+ private final ThreadPoolExecutor checkpointWritePagesPool;
+
+ /** Next scheduled checkpoint progress. */
+ private volatile CheckpointProgressImpl scheduledCheckpointProgress;
+
+ /** Current checkpoint progress. This field is updated only by checkpoint
thread. */
+ @Nullable
+ private volatile CheckpointProgressImpl currentCheckpointProgress;
+
+ /** Shutdown now. */
+ private volatile boolean shutdownNow;
+
+ /** Last checkpoint timestamp, read/update only in checkpoint thread. */
+ private long lastCheckpointTimestamp;
+
+ @TestOnly
+ @Nullable
+ private volatile CompletableFuture<?> enableChangeAppliedFuture;
+
+ @TestOnly
+ private volatile boolean checkpointsEnabled = true;
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ * @param igniteInstanceName Name of the Ignite instance.
+ * @param workerListener Listener for life-cycle worker events.
+ * @param detector Long JVM pause detector.
+ * @param checkpointWorkFlow Implementation of checkpoint.
+ * @param factory Page writer factory.
+ * @param checkpointFrequencySupplier Supplier interval in ms after which
the checkpoint is triggered if there are no other events.
+ * @param checkpointWritePageThreads The number of IO-bound threads which
will write pages to disk.
+ */
+ Checkpointer(
+ IgniteLogger log,
+ String igniteInstanceName,
+ @Nullable IgniteWorkerListener workerListener,
+ @Nullable LongJvmPauseDetector detector,
+ CheckpointWorkflow checkpointWorkFlow,
+ CheckpointPagesWriterFactory factory,
+ int checkpointWritePageThreads,
+ LongSupplier checkpointFrequencySupplier
+ ) {
+ super(log, igniteInstanceName, "checkpoint-thread", workerListener);
+
+ this.pauseDetector = detector;
+ // TODO: IGNITE-16984 Move to config: checkpointFrequency *
checkpointFrequencyDeviation see 2.0
+ this.checkpointFrequencySupplier = checkpointFrequencySupplier;
+ this.checkpointWorkflow = checkpointWorkFlow;
+ this.checkpointPagesWriterFactory = factory;
+
+ scheduledCheckpointProgress = new
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+ // TODO: IGNITE-16984 Move checkpointWritePageThreads to config
+ if (checkpointWritePageThreads > 1) {
+ checkpointWritePagesPool = new ThreadPoolExecutor(
+ checkpointWritePageThreads,
+ checkpointWritePageThreads,
+ 30_000,
+ MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory(CHECKPOINT_RUNNER_THREAD_PREFIX +
"-IO")
+ );
+ } else {
+ checkpointWritePagesPool = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void body() {
+ try {
+ while (!isCancelled()) {
+ waitCheckpointEvent();
+
+ if (skipCheckpointOnNodeStop && (isCancelled() ||
shutdownNow)) {
+ if (log.isInfoEnabled()) {
+ log.warn("Skipping last checkpoint because node is
stopping.");
+ }
+
+ return;
+ }
+
+ CompletableFuture<?> enableChangeAppliedFuture =
this.enableChangeAppliedFuture;
+
+ if (enableChangeAppliedFuture != null) {
+ enableChangeAppliedFuture.complete(null);
+
+ this.enableChangeAppliedFuture = null;
+ }
+
+ if (checkpointsEnabled) {
+ doCheckpoint();
+ } else {
+ synchronized (this) {
+
scheduledCheckpointProgress.nextCheckpointNanos(MILLISECONDS.toNanos(nextCheckpointInterval()));
+ }
+ }
+ }
+
+ // Final run after the cancellation.
+ if (checkpointsEnabled && !shutdownNow) {
+ doCheckpoint();
+ }
+
+ if (!isCancelled.get()) {
+ throw new IllegalStateException("Thread is terminated
unexpectedly: " + name());
+ }
+
+ scheduledCheckpointProgress.fail(new NodeStoppingException("Node
is stopping."));
+ } catch (Throwable t) {
+ scheduledCheckpointProgress.fail(t);
+
+ // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the
exception (err) by the FailureProcessor
+ // We need to handle OutOfMemoryError and the rest in different
ways
+
+ throw new IgniteInternalException(t);
+ }
+ }
+
+ /**
+ * Changes the information for a scheduled checkpoint if it was scheduled
further than {@code delayFromNow}, or do nothing otherwise.
+ *
+ * @param delayFromNow Delay from now in milliseconds.
+ * @param reason Wakeup reason.
+ * @return Nearest scheduled checkpoint which is not started yet (dirty
pages weren't collected yet).
+ */
+ public CheckpointProgress scheduleCheckpoint(long delayFromNow, String
reason) {
+ return scheduleCheckpoint(delayFromNow, reason, null);
+ }
+
+ /**
+ * Changes the information for a scheduled checkpoint if it was scheduled
further than {@code delayFromNow}, or do nothing otherwise.
+ *
+ * @param delayFromNow Delay from now in milliseconds.
+ * @param reason Wakeup reason.
+ * @param finishFutureListener Checkpoint finish listener.
+ * @return Nearest scheduled checkpoint which is not started yet (dirty
pages weren't collected yet).
+ */
+ public CheckpointProgress scheduleCheckpoint(
+ long delayFromNow,
+ String reason,
+ @Nullable BiConsumer<Void, Throwable> finishFutureListener
+ ) {
+ CheckpointProgressImpl current = currentCheckpointProgress;
+
+ // If checkpoint haven't taken write lock yet it shouldn't trigger a
new checkpoint but should return current one.
+ if (finishFutureListener == null && current != null &&
!current.greaterOrEqualTo(LOCK_TAKEN)) {
+ return current;
+ }
+
+ if (finishFutureListener != null) {
+ // To be sure finishFutureListener will always be executed in
checkpoint thread.
+ synchronized (this) {
+ current = scheduledCheckpointProgress;
+
+ current.futureFor(FINISHED).whenComplete(finishFutureListener);
+ }
+ } else {
+ current = scheduledCheckpointProgress;
+ }
+
+ long nextNanos = nanoTime() + MILLISECONDS.toNanos(delayFromNow);
+
+ if (current.nextCheckpointNanos() - nextNanos <= 0) {
+ return current;
+ }
+
+ synchronized (this) {
+ current = scheduledCheckpointProgress;
+
+ if (current.nextCheckpointNanos() - nextNanos > 0) {
+ current.reason(reason);
+
+
current.nextCheckpointNanos(MILLISECONDS.toNanos(delayFromNow));
+ }
+
+ notifyAll();
+ }
+
+ return current;
+ }
+
+ /**
+ * Executes a checkpoint.
+ *
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ void doCheckpoint() throws IgniteInternalCheckedException {
+ Checkpoint chp = null;
+
+ try {
+ CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
+
+ startCheckpointProgress();
+
+ try {
+ chp =
checkpointWorkflow.markCheckpointBegin(lastCheckpointTimestamp,
currentCheckpointProgress, tracker);
+ } catch (Exception e) {
+ if (currentCheckpointProgress != null) {
+ currentCheckpointProgress.fail(e);
+ }
+
+ // TODO: IGNITE-16899 By analogy with 2.0, we need to handle
the exception by the FailureProcessor
+ // In case of checkpoint initialization error node should be
invalidated and stopped.
+
+ // Re-throw as unchecked exception to force stopping
checkpoint thread.
+ throw new IgniteInternalCheckedException(e);
+ }
+
+ updateHeartbeat();
+
+ currentCheckpointProgress.initCounters(chp.dirtyPagesSize);
+
+ if (chp.hasDelta()) {
+ if (log.isInfoEnabled()) {
+ long possibleJvmPauseDuration =
possibleLongJvmPauseDuration(tracker);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format(
+ CHECKPOINT_STARTED_LOG_FORMAT,
+ chp.progress.id(),
+ tracker.beforeWriteLockDuration(),
+ tracker.writeLockWaitDuration(),
+ tracker.onMarkCheckpointBeginDuration(),
+ tracker.writeLockHoldDuration(),
+ tracker.splitAndSortCheckpointPagesDuration(),
+ possibleJvmPauseDuration > 0 ?
"possibleJvmPauseDuration=" + possibleJvmPauseDuration + "ms, " : "",
+ chp.dirtyPagesSize,
+ chp.progress.reason()
+ ));
+ }
+ }
+
+ if (!writePages(tracker, chp.dirtyPages, chp.progress, this,
this::isShutdownNow)) {
+ return;
+ }
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info(String.format(
+ "Skipping checkpoint (no pages were modified) ["
+ + "checkpointBeforeWriteLockTime=%dms,
checkpointWriteLockWait=%dms, "
+ + "checkpointListenersExecuteTime=%dms,
checkpointWriteLockHoldTime=%dms, reason='%s']",
+ tracker.beforeWriteLockDuration(),
+ tracker.writeLockWaitDuration(),
+ tracker.onMarkCheckpointBeginDuration(),
+ tracker.writeLockHoldDuration(),
+ chp.progress.reason()
+ ));
+ }
+
+ tracker.onPagesWriteStart();
+ tracker.onFsyncStart();
+ }
+
+ // Must mark successful checkpoint only if there are no exceptions
or interrupts.
+ checkpointWorkflow.markCheckpointEnd(chp);
+
+ tracker.onCheckpointEnd();
+
+ if (chp.hasDelta()) {
+ if (log.isInfoEnabled()) {
+ log.info(String.format(
+ "Checkpoint finished [checkpointId=%s, pages=%d,
pagesWriteTime=%dms, fsyncTime=%dms, totalTime=%dms]",
+ chp.progress.id(),
+ chp.dirtyPagesSize,
+ tracker.pagesWriteDuration(),
+ tracker.fsyncDuration(),
+ tracker.totalDuration()
+ ));
+ }
+ }
+ } catch (IgniteInternalCheckedException e) {
+ if (chp != null) {
+ chp.progress.fail(e);
+ }
+
+ // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the
exception by the FailureProcessor
+
+ throw e;
+ }
+ }
+
+ /**
+ * Writes dirty pages to the appropriate stores.
+ *
+ * @param tracker Checkpoint metrics tracker.
+ * @param checkpointPages Checkpoint pages to write.
+ * @param currentCheckpointProgress Current checkpoint progress.
+ * @param workProgressDispatcher Work progress dispatcher.
+ * @param shutdownNow Checker of stop operation.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ boolean writePages(
+ CheckpointMetricsTracker tracker,
+ IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId>
checkpointPages,
+ CheckpointProgressImpl currentCheckpointProgress,
+ WorkProgressDispatcher workProgressDispatcher,
+ BooleanSupplier shutdownNow
+ ) throws IgniteInternalCheckedException {
+ ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
+
+ int checkpointWritePageThreads = pageWritePool == null ? 1 :
pageWritePool.getMaximumPoolSize();
+
+ // Identity stores set.
+ ConcurrentMap<PageStore, LongAdder> updStores = new
ConcurrentHashMap<>();
+
+ CompletableFuture<?>[] futures = new
CompletableFuture[checkpointWritePageThreads];
+
+ tracker.onPagesWriteStart();
+
+ for (int i = 0; i < checkpointWritePageThreads; i++) {
+ CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
+ tracker,
+ checkpointPages,
+ updStores,
+ futures[i] = new CompletableFuture<>(),
+ workProgressDispatcher::updateHeartbeat,
+ currentCheckpointProgress,
+ shutdownNow
+ );
+
+ if (pageWritePool == null) {
+ write.run();
+ } else {
+ try {
+ pageWritePool.execute(write);
+ } catch (RejectedExecutionException ignore) {
+ // Run the task synchronously.
+ write.run();
+ }
+ }
+ }
+
+ workProgressDispatcher.updateHeartbeat();
+
+ // Wait and check for errors.
+ CompletableFuture.allOf(futures).join();
+
+ // Must re-check shutdown flag here because threads may have skipped
some pages.
+ // If so, we should not put finish checkpoint mark.
+ if (shutdownNow.getAsBoolean()) {
+ currentCheckpointProgress.fail(new NodeStoppingException("Node is
stopping."));
+
+ return false;
+ }
+
+ tracker.onFsyncStart();
+
+ syncUpdatedStores(updStores);
+
+ if (shutdownNow.getAsBoolean()) {
+ currentCheckpointProgress.fail(new NodeStoppingException("Node is
stopping."));
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private void syncUpdatedStores(
+ ConcurrentMap<PageStore, LongAdder> updatedStoresForSync
+ ) throws IgniteInternalCheckedException {
+ ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
+
+ if (pageWritePool == null) {
+ for (Map.Entry<PageStore, LongAdder> updStoreEntry :
updatedStoresForSync.entrySet()) {
+ if (shutdownNow) {
+ return;
+ }
+
+ blockingSectionBegin();
+
+ try {
+ updStoreEntry.getKey().sync();
+ } finally {
+ blockingSectionEnd();
+ }
+
+
currentCheckpointProgress.syncedPagesCounter().addAndGet(updStoreEntry.getValue().intValue());
+ }
+ } else {
+ int checkpointThreads = pageWritePool.getMaximumPoolSize();
+
+ CompletableFuture<?>[] futures = new
CompletableFuture[checkpointThreads];
+
+ for (int i = 0; i < checkpointThreads; i++) {
+ futures[i] = new CompletableFuture<>();
+ }
+
+ BlockingQueue<Entry<PageStore, LongAdder>> queue = new
LinkedBlockingQueue<>(updatedStoresForSync.entrySet());
+
+ for (int i = 0; i < checkpointThreads; i++) {
+ int threadIdx = i;
+
+ pageWritePool.execute(() -> {
+ Map.Entry<PageStore, LongAdder> updStoreEntry =
queue.poll();
+
+ try {
+ while (updStoreEntry != null) {
+ if (shutdownNow) {
+ return;
+ }
+
+ blockingSectionBegin();
+
+ try {
+ updStoreEntry.getKey().sync();
+ } finally {
+ blockingSectionEnd();
+ }
+
+
currentCheckpointProgress.syncedPagesCounter().addAndGet(updStoreEntry.getValue().intValue());
+
+ updStoreEntry = queue.poll();
+ }
+
+ futures[threadIdx].complete(null);
+ } catch (Throwable t) {
+ futures[threadIdx].completeExceptionally(t);
+ }
+ });
+ }
+
+ blockingSectionBegin();
+
+ try {
+ CompletableFuture.allOf(futures).join();
+ } finally {
+ blockingSectionEnd();
+ }
+ }
+ }
+
+ /**
+ * Waiting until the next checkpoint time.
+ */
+ void waitCheckpointEvent() {
+ try {
+ synchronized (this) {
+ long remaining =
NANOSECONDS.toMillis(scheduledCheckpointProgress.nextCheckpointNanos() -
nanoTime());
+
+ while (remaining > 0 && !isCancelled()) {
+ blockingSectionBegin();
+
+ try {
+ wait(remaining);
+
+ remaining =
NANOSECONDS.toMillis(scheduledCheckpointProgress.nextCheckpointNanos() -
nanoTime());
+ } finally {
+ blockingSectionEnd();
+ }
+ }
+ }
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+
+ isCancelled.set(true);
+ }
+ }
+
+ /**
+ * Returns duration of possible JVM pause, if it was detected, or {@code
-1} otherwise.
+ *
+ * @param tracker Checkpoint metrics tracker.
+ */
+ private long possibleLongJvmPauseDuration(CheckpointMetricsTracker
tracker) {
+ if (pauseDetector != null) {
+ if (tracker.writeLockWaitDuration() +
tracker.writeLockHoldDuration() > pauseDetector.longJvmPauseThreshold()) {
+ long now = coarseCurrentTimeMillis();
+
+ // We must get last wake-up time before search possible pause
in events map.
+ long wakeUpTime = pauseDetector.getLastWakeUpTime();
+
+ IgniteBiTuple<Long, Long> lastLongPause =
pauseDetector.getLastLongPause();
+
+ if (lastLongPause != null && tracker.checkpointStartTime() <
lastLongPause.get1()) {
+ return lastLongPause.get2();
+ }
+
+ if (now - wakeUpTime > pauseDetector.longJvmPauseThreshold()) {
+ return now - wakeUpTime;
+ }
+ }
+ }
+
+ return -1L;
+ }
+
+ /**
+ * Update the current checkpoint info from the scheduled one.
+ */
+ void startCheckpointProgress() {
+ long checkpointStartTimestamp = coarseCurrentTimeMillis();
+
+ // This can happen in an unlikely event of two checkpoints happening
within a currentTimeMillis() granularity window.
+ if (checkpointStartTimestamp == lastCheckpointTimestamp) {
+ checkpointStartTimestamp++;
+ }
+
+ lastCheckpointTimestamp = checkpointStartTimestamp;
+
+ synchronized (this) {
+ CheckpointProgressImpl curr = scheduledCheckpointProgress;
+
+ if (curr.reason() == null) {
+ curr.reason("timeout");
+ }
+
+ // It is important that we assign a new progress object before
checkpoint mark in page memory.
+ scheduledCheckpointProgress = new
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+ currentCheckpointProgress = curr;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cancel() {
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling grid runnable: " + this);
+ }
+
+ // Do not interrupt runner thread.
+ isCancelled.set(true);
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ /**
+ * Stopping all checkpoint activity immediately even if the current
checkpoint is in progress.
+ */
+ public void shutdownNow() {
+ shutdownNow = true;
+
+ if (!isCancelled.get()) {
+ cancel();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (runner() != null) {
+ return;
+ }
+
+ assert runner() == null : "Checkpointer is running.";
+
+ new IgniteThread(igniteInstanceName(), name(), this).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ // Let's write the data.
+ shutdownCheckpointer(true);
+ }
+
+ /**
+ * Shutdown checkpointer.
+ *
+ * @param cancel Cancel flag.
+ */
+ public void shutdownCheckpointer(boolean cancel) {
+ if (cancel) {
+ shutdownNow();
+ } else {
+ cancel();
+ }
+
+ try {
+ join();
+ } catch (InterruptedException ignore) {
+ log.warn("Was interrupted while waiting for checkpointer shutdown,
will not wait for checkpoint to finish.");
+
+ Thread.currentThread().interrupt();
+
+ shutdownNow();
+
+ while (true) {
+ try {
+ join();
+
+ scheduledCheckpointProgress.fail(new
NodeStoppingException("Checkpointer is stopped during node stop."));
+
+ break;
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ Thread.currentThread().interrupt();
+ }
+
+ if (checkpointWritePagesPool != null) {
+ shutdownAndAwaitTermination(checkpointWritePagesPool, 2, MINUTES);
+ }
+ }
+
+ /**
+ * Returns progress of current checkpoint, last finished one or {@code
null}, if checkpoint has never started.
+ */
+ public @Nullable CheckpointProgress currentProgress() {
+ return currentCheckpointProgress;
+ }
+
+ /**
+ * Returns progress of scheduled checkpoint.
+ */
+ CheckpointProgress scheduledProgress() {
+ return scheduledCheckpointProgress;
+ }
+
+ /**
+ * Returns {@code true} if checkpoint should be stopped immediately.
+ */
+ boolean isShutdownNow() {
+ return shutdownNow;
+ }
+
+ /**
+ * Skip checkpoint on node stop.
+ *
+ * @param skip If {@code true} skips checkpoint on node stop.
+ */
+ public void skipCheckpointOnNodeStop(boolean skip) {
+ skipCheckpointOnNodeStop = skip;
+ }
+
+ /**
+ * Gets a checkpoint interval with a randomized delay in mills.
+ *
+ * <p>It helps when the cluster makes a checkpoint in the same time in
every node.
+ */
+ private long nextCheckpointInterval() {
+ long checkpointFrequency = checkpointFrequencySupplier.getAsLong();
+
+ long startDelay =
ThreadLocalRandom.current().nextLong(max(safeAbs(checkpointFrequency) / 100, 1))
+ - max(safeAbs(checkpointFrequency) / 200, 1);
+
+ return safeAbs(checkpointFrequency + startDelay);
+ }
+
+ /**
+ * For test use only.
+ *
+ * @deprecated Should be rewritten to public API.
+ */
+ @TestOnly
+ public CompletableFuture<?> enableCheckpoints(boolean enable) {
Review Comment:
Remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]