ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r869981682
##########
modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java:
##########
@@ -31,10 +31,11 @@ public class FastTimestamps {
private static void startUpdater() {
Thread updater = new Thread("FastTimestamps updater") {
+ /** {@inheritDoc} */
@Override
public void run() {
while (true) {
- coarseCurrentTimeMillis = System.currentTimeMillis();
+ coarseCurrentTimeMillis =
Math.max(coarseCurrentTimeMillis, System.currentTimeMillis());
Review Comment:
I still don't think that this change is necessary. Can you please roll it
back.
Here's my motivation: if time goes back, old implementation would start
counting it again from the past. New implementation will be returning the same
value for some time, this can be even worse imo.
##########
modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static
org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_IDLE;
+import static
org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STARTED;
+import static
org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STOPPED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteWorker} testing.
+ */
+public class IgniteWorkerTest {
+ static final String CLEANUP = "cleanup";
+
+ private final IgniteLogger log =
IgniteLogger.forClass(IgniteWorkerTest.class);
+
+ @Test
+ void testNewIgniteWorker() {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ assertEquals("testNode", worker.igniteInstanceName());
+ assertEquals("testWorker", worker.name());
+
+ assertEquals(0, worker.heartbeat());
+
+ assertFalse(worker.isCancelled());
+ assertFalse(worker.isDone());
+
+ assertNull(worker.runner());
+ }
+
+ @Test
+ void testBlockingSelection() {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ long currentTimeMillis = coarseCurrentTimeMillis();
+
+ worker.blockingSectionBegin();
+
+ assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+ worker.blockingSectionEnd();
+
+ assertThat(worker.heartbeat(),
greaterThanOrEqualTo(currentTimeMillis));
+
+ // Checks update heartbeat after blockingSectionBegin().
+
+ worker.blockingSectionBegin();
+
+ assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+ worker.updateHeartbeat();
+
+ assertThat(worker.heartbeat(),
greaterThanOrEqualTo(currentTimeMillis));
+
+ worker.blockingSectionEnd();
+
+ assertThat(worker.heartbeat(),
greaterThanOrEqualTo(currentTimeMillis));
+ }
+
+ @Test
+ void testUpdateHeartbeat() throws Exception {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ long currentTimeMillis = coarseCurrentTimeMillis();
+
+ worker.updateHeartbeat();
+
+ long heartbeat = worker.heartbeat();
+
+ assertThat(heartbeat, greaterThanOrEqualTo(currentTimeMillis));
+
+ Thread.sleep(10);
+
+ assertEquals(heartbeat, worker.heartbeat());
+
+ worker.updateHeartbeat();
+
+ assertThat(worker.heartbeat(), greaterThan(heartbeat));
+ }
+
+ @Test
+ void testIdle() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener);
+
+ worker.onIdle();
+
+ assertThat(events, equalTo(List.of(ON_IDLE)));
+ }
+
+ @Test
+ void testRun() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events) {
+ /** {@inheritDoc} */
+ @Override
+ public void onStarted(IgniteWorker worker) {
+ super.onStarted(worker);
+
+ assertThat(worker.heartbeat(),
lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertSame(Thread.currentThread(), worker.runner());
+ assertFalse(worker.isCancelled());
+ assertFalse(worker.isDone());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onStopped(IgniteWorker worker) {
+ super.onStopped(worker);
+
+ assertThat(worker.heartbeat(),
lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertSame(Thread.currentThread(), worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+ };
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+ };
+
+ worker.run();
+
+ assertThat(events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(),
lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+
+ @Test
+ void testInterruptFromBody() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() throws InterruptedException {
+ throw new InterruptedException();
Review Comment:
I think you should also interrupt the thread before throwing an exception
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+
+/**
+ * Data class of checkpoint information.
+ */
+class Checkpoint {
+ /** Checkpoint pages. */
+ final GridConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages;
+
+ /** Checkpoint progress status. */
+ final CheckpointProgressImpl progress;
+
+ /** Number of dirty pages. */
+ final int pagesSize;
Review Comment:
Can you please rename it? Like, "dirtyPagesSize" maybe?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for
checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+ /** Total number of dirty pages. */
+ final int pageCount;
+
+ /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+ final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>>
pages;
Review Comment:
Why is it not the map, what do you think?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it
may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Earliest checkpoint map changes threshold system properties. */
+ public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD =
"IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN =
Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint
start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /** Earliest checkpoint map changes threshold. */
+ // TODO: IGNITE-16935 Move to config
+ private final int earliestCheckpointChangesThreshold =
getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+ /**
+ * Constructor.
+ *
+ * @param storagePath Storage path.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointMarkersStorage(
+ Path storagePath
+ ) throws IgniteInternalCheckedException {
+ checkpointDir = storagePath.resolve("cp");
+
+ try {
+ createDirectories(checkpointDir);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create
directory for checkpoint metadata: " + checkpointDir, e);
+ }
+
+ checkCheckpointDir(checkpointDir);
+
+ try {
+ checkpointIds = list(checkpointDir)
+
.map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+ .collect(toCollection(ConcurrentHashMap::newKeySet));
Review Comment:
I don't see a place where you would sort or order cp markers. But they must
be ordered, right? Are you planning to do it in the future? Please add TODO then
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+ /** Checkpoint id. */
+ private final UUID id = UUID.randomUUID();
+
+ /** Scheduled time of checkpoint. */
+ private volatile long nextCheckpointNanos;
+
+ /** Current checkpoint state. */
+ private volatile AtomicReference<CheckpointState> state = new
AtomicReference<>(SCHEDULED);
+
+ /** Future which would be finished when corresponds state is set. */
+ private final Map<CheckpointState, CompletableFuture<Void>> stateFutures =
new ConcurrentHashMap<>();
+
+ /** Wakeup reason. */
+ private volatile String reason;
+
+ /** Number of dirty pages in current checkpoint at the beginning of
checkpoint. */
+ private volatile int currCheckpointPagesCnt;
+
+ /** Cause of fail, which has happened during the checkpoint or {@code
null} if checkpoint was successful. */
+ @Nullable
+ private volatile Throwable failCause;
+
+ /** Counter for written checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger writtenPagesCntr;
+
+ /** Counter for fsynced checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger syncedPagesCntr;
+
+ /** Counter for evicted checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger evictedPagesCntr;
+
+ /**
+ * Constructor.
+ *
+ * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+ */
+ CheckpointProgressImpl(long nextCheckpointTimeout) {
+ nextCheckpointNanos(nextCheckpointTimeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable String reason() {
+ return reason;
+ }
+
+ /**
+ * Sets description of the reason of the current checkpoint.
+ *
+ * @param reason New wakeup reason.
+ */
+ public void reason(String reason) {
+ this.reason = reason;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean inProgress() {
+ return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> futureFor(CheckpointState state) {
+ CompletableFuture<Void> stateFut = stateFutures.computeIfAbsent(state,
(k) -> new CompletableFuture<>());
+
+ if (greaterOrEqualTo(state)) {
+ completeFuture(stateFut, failCause);
+ }
+
+ return stateFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int currentCheckpointPagesCount() {
+ return currCheckpointPagesCnt;
+ }
+
+ /**
+ * Sets current checkpoint pages num to store.
+ *
+ * @param num Pages to store.
+ */
+ public void currentCheckpointPagesCount(int num) {
+ currCheckpointPagesCnt = num;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable AtomicInteger writtenPagesCounter() {
+ return writtenPagesCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable AtomicInteger syncedPagesCounter() {
+ return syncedPagesCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable AtomicInteger evictedPagesCounter() {
+ return evictedPagesCntr;
+ }
+
+ /**
+ * Returns scheduled time of checkpoint.
+ */
+ public long nextCheckpointNanos() {
+ return nextCheckpointNanos;
+ }
+
+ /**
+ * Sets new scheduled time of checkpoint.
+ *
+ * @param nextCheckpointNanos New scheduled time of checkpoint in nanos.
+ */
+ public void nextCheckpointNanos(long nextCheckpointNanos) {
Review Comment:
This is SO confusing! Field represents instance in time, but parameter is a
time interval. And they have the same name. What the hell?
And why would anyone pass negative value in this method?
Why a year? Javadoc says nothing about it. I need explanations
##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import java.util.EventListener;
+
+/**
+ * This interface defines worker listener.
+ */
+public interface IgniteWorkerListener extends EventListener {
Review Comment:
Please remove "extends EventListener", no one uses it
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it
may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Earliest checkpoint map changes threshold system properties. */
+ public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD =
"IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN =
Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint
start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /** Earliest checkpoint map changes threshold. */
+ // TODO: IGNITE-16935 Move to config
Review Comment:
This should be managed by a merger process. You can't just delete markers,
don't you? I'd remove most of the code related to this property
##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extension to standard {@link Runnable} interface.
+ *
+ * <p>Adds proper details to be used with {@link Executor} implementations.
+ *
+ * <p>Only for internal use.
Review Comment:
It's in "internal" package, is this comment necessary? Was it in the
original code?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it
may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Earliest checkpoint map changes threshold system properties. */
+ public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD =
"IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN =
Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint
start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /** Earliest checkpoint map changes threshold. */
+ // TODO: IGNITE-16935 Move to config
+ private final int earliestCheckpointChangesThreshold =
getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+ /**
+ * Constructor.
+ *
+ * @param storagePath Storage path.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointMarkersStorage(
+ Path storagePath
+ ) throws IgniteInternalCheckedException {
+ checkpointDir = storagePath.resolve("cp");
+
+ try {
+ createDirectories(checkpointDir);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create
directory for checkpoint metadata: " + checkpointDir, e);
+ }
+
+ checkCheckpointDir(checkpointDir);
+
+ try {
+ checkpointIds = list(checkpointDir)
+
.map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+ .collect(toCollection(ConcurrentHashMap::newKeySet));
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not reads
checkpoint markers: " + checkpointDir, e);
Review Comment:
```suggestion
throw new IgniteInternalCheckedException("Could not read
checkpoint markers: " + checkpointDir, e);
```
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java:
##########
@@ -17,16 +17,52 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
/**
* Represents information of progress of a current checkpoint and allows
obtaining future to wait for a particular checkpoint state.
*/
-// TODO: IGNITE-16898 Continue porting the code
public interface CheckpointProgress {
+ /**
+ * Returns checkpoint ID.
+ */
+ UUID id();
+
+ /**
+ * Returns description of the reason of the current checkpoint.
+ */
+ @Nullable String reason();
+
+ /**
+ * Return {@code true} If checkpoint already started but have not finished
yet.
+ */
+ boolean inProgress();
+
/**
* Returns future which can be used for detection when current checkpoint
reaches the specific state.
*/
CompletableFuture<?> futureFor(CheckpointState state);
+
+ /**
+ * Returns number of dirty pages in current checkpoint. If checkpoint is
not running, returns {@code 0}.
+ */
+ int currentCheckpointPagesCount();
+
+ /**
+ * Returns counter for written checkpoint pages. Not {@code null} only if
checkpoint is running.
+ */
+ @Nullable AtomicInteger writtenPagesCounter();
Review Comment:
Why would you return internal atomic integer instances? What's the deal with
it?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+ /** Checkpoint id. */
+ private final UUID id = UUID.randomUUID();
+
+ /** Scheduled time of checkpoint. */
+ private volatile long nextCheckpointNanos;
+
+ /** Current checkpoint state. */
+ private volatile AtomicReference<CheckpointState> state = new
AtomicReference<>(SCHEDULED);
+
+ /** Future which would be finished when corresponds state is set. */
+ private final Map<CheckpointState, CompletableFuture<Void>> stateFutures =
new ConcurrentHashMap<>();
+
+ /** Wakeup reason. */
+ private volatile String reason;
+
+ /** Number of dirty pages in current checkpoint at the beginning of
checkpoint. */
+ private volatile int currCheckpointPagesCnt;
+
+ /** Cause of fail, which has happened during the checkpoint or {@code
null} if checkpoint was successful. */
+ @Nullable
+ private volatile Throwable failCause;
+
+ /** Counter for written checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger writtenPagesCntr;
+
+ /** Counter for fsynced checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger syncedPagesCntr;
+
+ /** Counter for evicted checkpoint pages. Not {@link null} only if
checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger evictedPagesCntr;
+
+ /**
+ * Constructor.
+ *
+ * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+ */
+ CheckpointProgressImpl(long nextCheckpointTimeout) {
+ nextCheckpointNanos(nextCheckpointTimeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable String reason() {
+ return reason;
+ }
+
+ /**
+ * Sets description of the reason of the current checkpoint.
+ *
+ * @param reason New wakeup reason.
+ */
+ public void reason(String reason) {
+ this.reason = reason;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean inProgress() {
+ return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> futureFor(CheckpointState state) {
Review Comment:
I have a feeling that there's a data race somewhere, can we discuss it
later? I know that it's not your code
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,24 +17,12 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import org.jetbrains.annotations.Nullable;
-
/**
* Empty.
*/
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
public abstract class Checkpointer {
- /**
- * Changes the information for a scheduled checkpoint if it was scheduled
further than {@code delayFromNow}, or do nothing otherwise.
- *
- * @param delayFromNow Delay from now in milliseconds.
- * @param reason Wakeup reason.
- * @return Nearest scheduled checkpoint which is not started yet (dirty
pages weren't collected yet).
- */
- public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow,
String reason);
+ public abstract Thread runner();
Review Comment:
You removed comments and reordered methods. What was the reason?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values
per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may
produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class GridConcurrentMultiPairQueue<K, V> {
+ /** Empty pair queue. */
+ public static final GridConcurrentMultiPairQueue EMPTY = new
GridConcurrentMultiPairQueue<>(Map.of());
+
+ /** Inner holder. */
+ private final V[][] vals;
+
+ /** Storage for every array length. */
+ private final int[] lenSeq;
+
+ /** Current absolute position. */
+ private final AtomicInteger pos = new AtomicInteger();
+
+ /** Precalculated max position. */
+ private final int maxPos;
+
+ /** Keys array. */
+ private final K[] keysArr;
+
+ /**
+ * Constructor.
+ *
+ * @param items Items.
+ */
+ public GridConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items)
{
+ int pairCnt = (int)
items.entrySet().stream().map(Map.Entry::getValue).filter(k -> k.size() >
0).count();
Review Comment:
"k -> k.size() > 0" should be replaced with "k -> !k.isEmpty()"
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.GridConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class responsibility is to complement {@link Checkpointer} class with
side logic of checkpointing like checkpoint listeners
+ * notifications, collect dirt pages etc.
+ *
+ * <p>It allows {@link Checkpointer} class is to focus on its main
responsibility: synchronizing memory with disk.
+ *
+ * <p>Additional actions needed during checkpoint are implemented in this
class.
+ *
+ * <p>Two main blocks of logic this class is responsible for:
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointBegin} - Initialization of next
checkpoint. It collects all required info.
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last
checkpoint.
+ */
+class CheckpointWorkflow implements IgniteComponent {
+ /**
+ * Starting from this number of dirty pages in checkpoint, array will be
sorted with {@link Arrays#parallelSort(Comparable[])} in case
+ * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+ */
+ public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD =
"CHECKPOINT_PARALLEL_SORT_THRESHOLD";
+
+ /**
+ * Starting from this number of dirty pages in checkpoint, array will be
sorted with {@link Arrays#parallelSort(Comparable[])} in case
+ * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+ */
+ // TODO: IGNITE-16935 Move to configuration
+ private final int parallelSortThreshold =
getInteger(CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
+
+ /** This number of threads will be created and used for parallel sorting.
*/
+ private static final int PARALLEL_SORT_THREADS =
Math.min(Runtime.getRuntime().availableProcessors(), 8);
+
+ /** Checkpoint marker storage. */
+ private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+ /** Checkpoint lock. */
+ private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+ /** Supplier of persistent data regions for the checkpointing. */
+ private final Supplier<Collection<PageMemoryDataRegion>>
dataRegionsSupplier;
+
+ /** Checkpoint write order configuration. */
+ private final CheckpointWriteOrder checkpointWriteOrder;
+
+ /** Collections of checkpoint listeners. */
+ private final List<IgniteBiTuple<CheckpointListener,
PageMemoryDataRegion>> listeners = new CopyOnWriteArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param checkpointMarkersStorage Checkpoint marker storage.
+ * @param checkpointReadWriteLock Checkpoint read write lock.
+ * @param checkpointWriteOrder Checkpoint write order.
+ * @param dataRegionsSupplier Supplier of persistent data regions for the
checkpointing.
+ */
+ public CheckpointWorkflow(
+ CheckpointMarkersStorage checkpointMarkersStorage,
+ CheckpointReadWriteLock checkpointReadWriteLock,
+ CheckpointWriteOrder checkpointWriteOrder,
+ Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier
+ ) {
+ this.checkpointMarkersStorage = checkpointMarkersStorage;
+ this.checkpointReadWriteLock = checkpointReadWriteLock;
+ this.checkpointWriteOrder = checkpointWriteOrder;
+ this.dataRegionsSupplier = dataRegionsSupplier;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() {
+ listeners.clear();
+ }
+
+ /**
+ * First stage of checkpoint which collects demanded information (dirty
pages mostly).
+ *
+ * @param startCheckpointTimestamp Checkpoint start timestamp.
+ * @param curr Current checkpoint event info.
+ * @return Checkpoint collected info.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public Checkpoint markCheckpointBegin(
+ long startCheckpointTimestamp,
+ CheckpointProgressImpl curr
+ ) throws IgniteInternalCheckedException {
+ Collection<PageMemoryDataRegion> dataRegions =
dataRegionsSupplier.get();
+
+ List<CheckpointListener> listeners =
collectCheckpointListeners(dataRegions);
+
+ checkpointReadWriteLock.readLock();
+
+ try {
+ for (CheckpointListener listener : listeners) {
+ listener.beforeCheckpointBegin(curr);
+ }
+ } finally {
+ checkpointReadWriteLock.readUnlock();
+ }
+
+ checkpointReadWriteLock.writeLock();
+
+ CheckpointDirtyPagesInfoHolder dirtyPages;
+
+ try {
+ curr.transitTo(LOCK_TAKEN);
+
+ for (CheckpointListener listener : listeners) {
+ listener.onMarkCheckpointBegin(curr);
+ }
+
+ // There are allowable to replace pages only after checkpoint
entry was stored to disk.
+ dirtyPages = beginCheckpoint(dataRegions,
curr.futureFor(MARKER_STORED_TO_DISK));
+
+ curr.currentCheckpointPagesCount(dirtyPages.pageCount);
+
+ curr.transitTo(PAGE_SNAPSHOT_TAKEN);
+ } finally {
+ checkpointReadWriteLock.writeUnlock();
+ }
+
+ curr.transitTo(LOCK_RELEASED);
+
+ for (CheckpointListener listener : listeners) {
+ listener.onCheckpointBegin(curr);
+ }
+
+ if (dirtyPages.pageCount > 0) {
+ checkpointMarkersStorage.onCheckpointBegin(curr.id());
+
+ curr.transitTo(MARKER_STORED_TO_DISK);
+
+ return new Checkpoint(splitAndSortCpPagesIfNeeded(dirtyPages),
curr);
+ }
+
+ return new Checkpoint(EMPTY, curr);
+ }
+
+ /**
+ * Do some actions on checkpoint finish (After all pages were written to
disk).
+ *
+ * @param chp Checkpoint snapshot.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void markCheckpointEnd(Checkpoint chp) throws
IgniteInternalCheckedException {
+ Collection<PageMemoryDataRegion> dataRegions =
dataRegionsSupplier.get();
Review Comment:
Why supplier instead of collection?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it
may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Earliest checkpoint map changes threshold system properties. */
+ public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD =
"IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN =
Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint
start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /** Earliest checkpoint map changes threshold. */
+ // TODO: IGNITE-16935 Move to config
+ private final int earliestCheckpointChangesThreshold =
getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+ /**
+ * Constructor.
+ *
+ * @param storagePath Storage path.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointMarkersStorage(
+ Path storagePath
+ ) throws IgniteInternalCheckedException {
+ checkpointDir = storagePath.resolve("cp");
+
+ try {
+ createDirectories(checkpointDir);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create
directory for checkpoint metadata: " + checkpointDir, e);
+ }
+
+ checkCheckpointDir(checkpointDir);
+
+ try {
+ checkpointIds = list(checkpointDir)
+
.map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+ .collect(toCollection(ConcurrentHashMap::newKeySet));
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not reads
checkpoint markers: " + checkpointDir, e);
+ }
+ }
+
+ /**
+ * Callback at the start of the checkpoint.
+ *
+ * <p>Creates a start marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointBegin(UUID checkpointId) throws
IgniteInternalCheckedException {
+ assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointStartMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_START_MARKER));
+
+ try {
+ createFile(checkpointStartMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create start
checkpoint marker: " + checkpointStartMarker, e);
+ }
+
+ checkpointIds.add(checkpointId);
+ }
+
+ /**
+ * Callback at the end of the checkpoint.
+ *
+ * <p>Creates an end marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointEnd(UUID checkpointId) throws
IgniteInternalCheckedException {
+ assert checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointEndMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_END_MARKER));
+
+ try {
+ createFile(checkpointEndMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create end
checkpoint marker: " + checkpointEndMarker, e);
+ }
+
+ if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+ for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); )
{
+ UUID id = it.next();
+
+ if (!id.equals(checkpointId)) {
+ removeCheckpointMarkers(id);
+
+ it.remove();
+ }
+ }
+ }
+ }
+
+ private void removeCheckpointMarkers(UUID checkpointId) {
+ Path startMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_START_MARKER));
+ Path endMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_END_MARKER));
+
+ if (exists(startMarker)) {
+ startMarker.toFile().delete();
+ }
+
+ if (exists(endMarker)) {
+ endMarker.toFile().delete();
+ }
+ }
+
+ /**
+ * Checks that the directory contains only paired (start and end)
checkpoint markers.
+ */
+ private static void checkCheckpointDir(Path checkpointDir) throws
IgniteInternalCheckedException {
+ assert isDirectory(checkpointDir) : checkpointDir;
+
+ try {
+ Map<Boolean, List<Path>> files = list(checkpointDir)
Review Comment:
Is this an old code?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it
may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Earliest checkpoint map changes threshold system properties. */
+ public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD =
"IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN =
Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint
start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /** Earliest checkpoint map changes threshold. */
+ // TODO: IGNITE-16935 Move to config
+ private final int earliestCheckpointChangesThreshold =
getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+ /**
+ * Constructor.
+ *
+ * @param storagePath Storage path.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointMarkersStorage(
+ Path storagePath
+ ) throws IgniteInternalCheckedException {
+ checkpointDir = storagePath.resolve("cp");
+
+ try {
+ createDirectories(checkpointDir);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create
directory for checkpoint metadata: " + checkpointDir, e);
+ }
+
+ checkCheckpointDir(checkpointDir);
+
+ try {
+ checkpointIds = list(checkpointDir)
+
.map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+ .collect(toCollection(ConcurrentHashMap::newKeySet));
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not reads
checkpoint markers: " + checkpointDir, e);
+ }
+ }
+
+ /**
+ * Callback at the start of the checkpoint.
+ *
+ * <p>Creates a start marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointBegin(UUID checkpointId) throws
IgniteInternalCheckedException {
+ assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointStartMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_START_MARKER));
+
+ try {
+ createFile(checkpointStartMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create start
checkpoint marker: " + checkpointStartMarker, e);
+ }
+
+ checkpointIds.add(checkpointId);
+ }
+
+ /**
+ * Callback at the end of the checkpoint.
+ *
+ * <p>Creates an end marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointEnd(UUID checkpointId) throws
IgniteInternalCheckedException {
+ assert checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointEndMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_END_MARKER));
+
+ try {
+ createFile(checkpointEndMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create end
checkpoint marker: " + checkpointEndMarker, e);
+ }
+
+ if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+ for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); )
{
+ UUID id = it.next();
+
+ if (!id.equals(checkpointId)) {
+ removeCheckpointMarkers(id);
+
+ it.remove();
+ }
+ }
+ }
+ }
+
+ private void removeCheckpointMarkers(UUID checkpointId) {
+ Path startMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_START_MARKER));
+ Path endMarker =
checkpointDir.resolve(checkpointMarkerFileName(checkpointId,
CHECKPOINT_END_MARKER));
+
+ if (exists(startMarker)) {
+ startMarker.toFile().delete();
+ }
+
+ if (exists(endMarker)) {
+ endMarker.toFile().delete();
+ }
+ }
+
+ /**
+ * Checks that the directory contains only paired (start and end)
checkpoint markers.
+ */
+ private static void checkCheckpointDir(Path checkpointDir) throws
IgniteInternalCheckedException {
+ assert isDirectory(checkpointDir) : checkpointDir;
+
+ try {
+ Map<Boolean, List<Path>> files = list(checkpointDir)
+ .collect(partitioningBy(path ->
parseCheckpointIdFromMarkerFile(path) != null));
+
+ if (!files.get(false).isEmpty()) {
+ throw new IgniteInternalCheckedException(
Review Comment:
Will it fail in presence of "tmp" marker file? I guess so
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values
per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may
produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
Review Comment:
I don't get this order. Do you? Why is it "3, 1, 2"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]