This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ca21384f85 IGNITE-21234 Introduce a more flexible way of triggering "too many dirty pages" checkpoints (#3033) ca21384f85 is described below commit ca21384f85e8c779258cb3b21f54b6c30a7071e4 Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Tue Jan 16 14:26:30 2024 +0300 IGNITE-21234 Introduce a more flexible way of triggering "too many dirty pages" checkpoints (#3033) --- .../pagememory/persistence/CheckpointUrgency.java | 41 +++++++++++++++++++ .../persistence/PersistentPageMemory.java | 47 ++++++++++++++-------- .../persistence/checkpoint/CheckpointManager.java | 27 +++++++++---- .../checkpoint/CheckpointTimeoutLock.java | 28 ++++++++----- .../PersistentPageMemoryNoLoadTest.java | 32 ++++++++++----- .../checkpoint/CheckpointManagerTest.java | 44 ++++++++++---------- .../checkpoint/CheckpointTimeoutLockTest.java | 33 ++++++++------- .../checkpoint/CheckpointTestUtils.java | 3 +- 8 files changed, 175 insertions(+), 80 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/CheckpointUrgency.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/CheckpointUrgency.java new file mode 100644 index 0000000000..913f513a0d --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/CheckpointUrgency.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence; + +/** + * Checkpoint urgency status enum. + */ +public enum CheckpointUrgency { + /** + * Signifies that there's enough free space in page memory to allow taking checkpoint read locks, and there's no need to trigger + * checkpoint. + */ + NOT_REQUIRED, + + /** + * Signifies that there's still enough free space in page memory to allow taking checkpoint read locks, but it's more limited and we're + * at the point where we should schedule a checkpoint. + */ + SHOULD_TRIGGER, + + /** + * Signifies that there might not be enough free space in page memory to allow taking checkpoint read locks, and we should wait until + * scheduled checkpoint is started and collection of dirty pages is empty once again. + */ + MUST_TRIGGER +} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java index 6543970f40..0a17bf563b 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java @@ -27,6 +27,9 @@ import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId; import static org.apache.ignite.internal.pagememory.io.PageIo.getType; import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion; import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER; import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty; import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId; import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired; @@ -71,6 +74,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -197,8 +201,8 @@ public class PersistentPageMemory implements PageMemory { /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */ private volatile boolean started; - /** See {@link #safeToUpdate()}. */ - private final AtomicBoolean safeToUpdate = new AtomicBoolean(true); + /** See {@link #checkpointUrgency()}. */ + private final AtomicReference<CheckpointUrgency> checkpointUrgency = new AtomicReference<>(NOT_REQUIRED); /** Checkpoint page pool, {@code null} if not {@link #start() started}. */ @Nullable @@ -1237,8 +1241,18 @@ public class PersistentPageMemory implements PageMemory { Segment seg = segment(pageId.groupId(), pageId.pageId()); if (seg.dirtyPages.add(pageId)) { - if (seg.dirtyPagesCntr.incrementAndGet() >= seg.maxDirtyPages) { - safeToUpdate.set(false); + long dirtyPagesCnt = seg.dirtyPagesCntr.incrementAndGet(); + + if (dirtyPagesCnt >= seg.dirtyPagesSoftThreshold) { + CheckpointUrgency urgency = checkpointUrgency.get(); + + if (urgency != MUST_TRIGGER) { + if (dirtyPagesCnt >= seg.dirtyPagesHardThreshold) { + checkpointUrgency.set(MUST_TRIGGER); + } else if (urgency != SHOULD_TRIGGER) { + checkpointUrgency.compareAndSet(NOT_REQUIRED, SHOULD_TRIGGER); + } + } } } } @@ -1332,8 +1346,11 @@ public class PersistentPageMemory implements PageMemory { @Nullable private volatile CheckpointPages checkpointPages; - /** Maximum number of dirty pages. */ - private final long maxDirtyPages; + /** Maximum number of dirty pages for {@link CheckpointUrgency#NOT_REQUIRED}. */ + private final long dirtyPagesSoftThreshold; + + /** Maximum number of dirty pages for {@link CheckpointUrgency#SHOULD_TRIGGER}. */ + private final long dirtyPagesHardThreshold; /** Initial partition generation. */ private static final int INIT_PART_GENERATION = 1; @@ -1381,7 +1398,8 @@ public class PersistentPageMemory implements PageMemory { pool.pages() ); - maxDirtyPages = pool.pages() * 3L / 4; + dirtyPagesSoftThreshold = pool.pages() * 3L / 4; + dirtyPagesHardThreshold = pool.pages() * 9L / 10; } /** @@ -1571,7 +1589,8 @@ public class PersistentPageMemory implements PageMemory { return new IgniteOutOfMemoryException("Failed to find a page for eviction (" + reason + ") [" + "segmentCapacity=" + loadedPages.capacity() + ", loaded=" + loadedPages.size() - + ", maxDirtyPages=" + maxDirtyPages + + ", dirtyPagesSoftThreshold=" + dirtyPagesSoftThreshold + + ", dirtyPagesHardThreshold=" + dirtyPagesHardThreshold + ", dirtyPages=" + dirtyPagesCntr + ", pinned=" + acquiredPages() + ']' + lineSeparator() + "Out of memory in data region [" @@ -1743,14 +1762,10 @@ public class PersistentPageMemory implements PageMemory { * Heuristic method which allows a thread to check if it is safe to start memory structure modifications in regard with checkpointing. * May return false-negative result during or after partition eviction. * - * @return {@code False} if there are too many dirty pages and a thread should wait for a checkpoint to begin. + * @see CheckpointUrgency */ - public boolean safeToUpdate() { - if (segments != null) { - return safeToUpdate.get(); - } - - return true; + public CheckpointUrgency checkpointUrgency() { + return checkpointUrgency.get(); } /** @@ -2055,7 +2070,7 @@ public class PersistentPageMemory implements PageMemory { segment.resetDirtyPages(); } - safeToUpdate.set(true); + checkpointUrgency.set(NOT_REQUIRED); return CollectionUtils.concat(dirtyPageIds); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index fc40fcfa8c..65d1ffeace 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED; + import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -29,6 +32,7 @@ import org.apache.ignite.internal.pagememory.PageMemory; import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration; import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; +import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; @@ -152,7 +156,7 @@ public class CheckpointManager { checkpointTimeoutLock = new CheckpointTimeoutLock( checkpointReadWriteLock, checkpointConfigView.readLockTimeout(), - () -> safeToUpdateAllPageMemories(dataRegions), + () -> checkpointUrgency(dataRegions), checkpointer ); } @@ -244,19 +248,28 @@ public class CheckpointManager { } /** - * Returns {@link true} if it is safe for all {@link DataRegion data regions} to update their {@link PageMemory}. + * Returns checkpoint urgency status. {@link CheckpointUrgency#NOT_REQUIRED} if it is safe for all {@link DataRegion data regions} to + * update their {@link PageMemory}. * * @param dataRegions Data regions. - * @see PersistentPageMemory#safeToUpdate() + * @see PersistentPageMemory#checkpointUrgency() + * @see CheckpointUrgency */ - static boolean safeToUpdateAllPageMemories(Collection<? extends DataRegion<PersistentPageMemory>> dataRegions) { + static CheckpointUrgency checkpointUrgency(Collection<? extends DataRegion<PersistentPageMemory>> dataRegions) { + CheckpointUrgency urgency = NOT_REQUIRED; for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) { - if (!dataRegion.pageMemory().safeToUpdate()) { - return false; + CheckpointUrgency regionCheckpointUrgency = dataRegion.pageMemory().checkpointUrgency(); + + if (regionCheckpointUrgency.compareTo(urgency) > 0) { + urgency = regionCheckpointUrgency; + } + + if (urgency == MUST_TRIGGER) { + return MUST_TRIGGER; } } - return true; + return urgency; } /** diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java index cf918ebd02..852437962b 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java @@ -24,11 +24,12 @@ import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; /** @@ -40,10 +41,9 @@ public class CheckpointTimeoutLock { protected static final IgniteLogger LOG = Loggers.forClass(CheckpointTimeoutLock.class); /** - * {@link PersistentPageMemory#safeToUpdate() Safe update check} for all page memories, should return {@code false} if there are many - * dirty pages and a checkpoint is needed. + * {@link PersistentPageMemory#checkpointUrgency()} Checkpoint urgency check} for all page memories. */ - private final BooleanSupplier safeToUpdateAllPageMemories; + private final Supplier<CheckpointUrgency> urgencySupplier; /** Internal checkpoint lock. */ private final CheckpointReadWriteLock checkpointReadWriteLock; @@ -62,19 +62,18 @@ public class CheckpointTimeoutLock { * * @param checkpointReadWriteLock Checkpoint read-write lock. * @param checkpointReadLockTimeout Timeout for checkpoint read lock acquisition in milliseconds. - * @param safeToUpdateAllPageMemories {@link PersistentPageMemory#safeToUpdate() Safe update check} for all page memories, should return - * {@code false} if there are many dirty pages and a checkpoint is needed. + * @param urgencySupplier {@link PersistentPageMemory#checkpointUrgency()} Checkpoint urgency check} for all page memories. * @param checkpointer Service for triggering the checkpoint. */ public CheckpointTimeoutLock( CheckpointReadWriteLock checkpointReadWriteLock, long checkpointReadLockTimeout, - BooleanSupplier safeToUpdateAllPageMemories, + Supplier<CheckpointUrgency> urgencySupplier, Checkpointer checkpointer ) { this.checkpointReadWriteLock = checkpointReadWriteLock; this.checkpointReadLockTimeout = checkpointReadLockTimeout; - this.safeToUpdateAllPageMemories = safeToUpdateAllPageMemories; + this.urgencySupplier = urgencySupplier; this.checkpointer = checkpointer; } @@ -142,17 +141,26 @@ public class CheckpointTimeoutLock { throw new IgniteInternalException(new NodeStoppingException("Failed to get checkpoint read lock")); } + CheckpointUrgency urgency; + if (checkpointReadWriteLock.getReadHoldCount() > 1 - || safeToUpdateAllPageMemories.getAsBoolean() || checkpointer.runner() == null + || (urgency = urgencySupplier.get()) == CheckpointUrgency.NOT_REQUIRED ) { - break; + return; } else { // If the checkpoint is triggered outside the lock, // it could cause the checkpoint to fire again for the same reason // (due to a data race between collecting dirty pages and triggering the checkpoint). CheckpointProgress checkpoint = checkpointer.scheduleCheckpoint(0, "too many dirty pages"); + if (urgency != CheckpointUrgency.MUST_TRIGGER) { + // Allow to take the checkpoint read lock, if urgency is not "must trigger". We optimistically assume that + // triggerred checkpoint will start soon, without us having to explicitly wait for it and without page memory + // overflow. + return; + } + checkpointReadWriteLock.readUnlock(); if (timeout > 0 && coarseCurrentTimeMillis() - start >= timeout) { diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java index 0ee440c756..dd5eef40ff 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.pagememory.persistence; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER; import static org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.PAGE_OVERHEAD; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED; @@ -31,8 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -181,7 +183,7 @@ public class PersistentPageMemoryNoLoadTest extends AbstractPageMemoryNoLoadSelf } @Test - void testSafeToUpdate( + void testCheckpointUrgency( @InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig, @WorkDirectory Path workDir ) throws Exception { @@ -223,23 +225,33 @@ public class PersistentPageMemoryNoLoadTest extends AbstractPageMemoryNoLoadSelf long maxPages = pageMemory.totalPages(); - long maxDirtyPages = (maxPages * 3 / 4); + long dirtyPagesSoftThreshold = (maxPages * 3 / 4); + long dirtyPagesHardThreshold = (maxPages * 9 / 10); - assertThat(maxDirtyPages, greaterThanOrEqualTo(50L)); + assertThat(dirtyPagesSoftThreshold, greaterThanOrEqualTo(70L)); + assertThat(dirtyPagesHardThreshold, greaterThanOrEqualTo(80L)); checkpointManager.checkpointTimeoutLock().checkpointReadLock(); try { - for (int i = 0; i < maxDirtyPages - 1; i++) { + int i = 0; + + for (; i < dirtyPagesSoftThreshold - 1; i++) { + createDirtyPage(pageMemory); + + assertEquals(NOT_REQUIRED, pageMemory.checkpointUrgency(), "i=" + i); + } + + for (; i < dirtyPagesHardThreshold - 1; i++) { createDirtyPage(pageMemory); - assertTrue(pageMemory.safeToUpdate(), "i=" + i); + assertEquals(SHOULD_TRIGGER, pageMemory.checkpointUrgency(), "i=" + i); } - for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) { + for (; i < maxPages; i++) { createDirtyPage(pageMemory); - assertFalse(pageMemory.safeToUpdate(), "i=" + i); + assertEquals(MUST_TRIGGER, pageMemory.checkpointUrgency(), "i=" + i); } } finally { checkpointManager.checkpointTimeoutLock().checkpointReadUnlock(); @@ -250,7 +262,7 @@ public class PersistentPageMemoryNoLoadTest extends AbstractPageMemoryNoLoadSelf .futureFor(FINISHED) .get(1, SECONDS); - assertTrue(pageMemory.safeToUpdate()); + assertEquals(NOT_REQUIRED, pageMemory.checkpointUrgency()); } finally { closeAll( () -> pageMemory.stop(true), diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java index f61a19205a..1a84c1f925 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java @@ -18,14 +18,16 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.checkpointUrgency; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.pageIndexesForDeltaFilePageStore; -import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.safeToUpdateAllPageMemories; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -38,7 +40,6 @@ import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntFunction; import java.util.function.Supplier; @@ -49,6 +50,7 @@ import org.apache.ignite.internal.pagememory.DataRegion; import org.apache.ignite.internal.pagememory.FullPageId; import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; +import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; @@ -103,36 +105,36 @@ public class CheckpointManagerTest extends BaseIgniteAbstractTest { } @Test - void testSafeToUpdateAllPageMemories() { - assertTrue(safeToUpdateAllPageMemories(List.of())); + void testCheckpointUrgency() { + assertEquals(NOT_REQUIRED, checkpointUrgency(List.of())); - AtomicBoolean safeToUpdate0 = new AtomicBoolean(); - AtomicBoolean safeToUpdate1 = new AtomicBoolean(); + AtomicReference<CheckpointUrgency> urgency0 = new AtomicReference<>(MUST_TRIGGER); + AtomicReference<CheckpointUrgency> urgency1 = new AtomicReference<>(SHOULD_TRIGGER); PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class); PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class); - when(pageMemory0.safeToUpdate()).then(answer -> safeToUpdate0.get()); - when(pageMemory1.safeToUpdate()).then(answer -> safeToUpdate1.get()); + when(pageMemory0.checkpointUrgency()).then(answer -> urgency0.get()); + when(pageMemory1.checkpointUrgency()).then(answer -> urgency1.get()); DataRegion<PersistentPageMemory> dataRegion0 = () -> pageMemory0; DataRegion<PersistentPageMemory> dataRegion1 = () -> pageMemory1; - assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion0))); - assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion1))); - assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion0, dataRegion1))); + assertEquals(MUST_TRIGGER, checkpointUrgency(List.of(dataRegion0))); + assertEquals(SHOULD_TRIGGER, checkpointUrgency(List.of(dataRegion1))); + assertEquals(MUST_TRIGGER, checkpointUrgency(List.of(dataRegion0, dataRegion1))); - safeToUpdate0.set(true); + urgency0.set(NOT_REQUIRED); - assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion0))); - assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion1))); - assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion0, dataRegion1))); + assertEquals(NOT_REQUIRED, checkpointUrgency(List.of(dataRegion0))); + assertEquals(SHOULD_TRIGGER, checkpointUrgency(List.of(dataRegion1))); + assertEquals(SHOULD_TRIGGER, checkpointUrgency(List.of(dataRegion0, dataRegion1))); - safeToUpdate1.set(true); + urgency1.set(NOT_REQUIRED); - assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion0))); - assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion1))); - assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion0, dataRegion1))); + assertEquals(NOT_REQUIRED, checkpointUrgency(List.of(dataRegion0))); + assertEquals(NOT_REQUIRED, checkpointUrgency(List.of(dataRegion1))); + assertEquals(NOT_REQUIRED, checkpointUrgency(List.of(dataRegion0, dataRegion1))); } @Test diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java index 60f7c27157..3ac4299085 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint; import static java.lang.Thread.currentThread; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER; +import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; @@ -38,9 +40,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -64,7 +67,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { @Test void testCheckpointReadLockTimeout() { - timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class)); + timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), Long.MAX_VALUE, () -> NOT_REQUIRED, mock(Checkpointer.class)); timeoutLock.start(); @@ -77,8 +80,8 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { @Test void testCheckpointReadLock() throws Exception { - CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> true, mock(Checkpointer.class)); - CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(newReadWriteLock(), 1_000, () -> true, mock(Checkpointer.class)); + var timeoutLock0 = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> NOT_REQUIRED, mock(Checkpointer.class)); + var timeoutLock1 = new CheckpointTimeoutLock(newReadWriteLock(), 1_000, () -> NOT_REQUIRED, mock(Checkpointer.class)); try { timeoutLock0.start(); @@ -103,7 +106,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { void testCheckpointReadLockWithWriteLockHeldByCurrentThread() { CheckpointReadWriteLock readWriteLock = newReadWriteLock(); - timeoutLock = new CheckpointTimeoutLock(readWriteLock, 1_000, () -> true, mock(Checkpointer.class)); + timeoutLock = new CheckpointTimeoutLock(readWriteLock, 1_000, () -> NOT_REQUIRED, mock(Checkpointer.class)); timeoutLock.start(); @@ -122,7 +125,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { @Test void testCheckpointReadLockFailOnNodeStop() { - timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class)); + timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), Long.MAX_VALUE, () -> NOT_REQUIRED, mock(Checkpointer.class)); timeoutLock.stop(); @@ -136,8 +139,8 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { CheckpointReadWriteLock readWriteLock0 = newReadWriteLock(); CheckpointReadWriteLock readWriteLock1 = newReadWriteLock(); - CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(readWriteLock0, 0, () -> true, mock(Checkpointer.class)); - CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(readWriteLock1, 100, () -> true, mock(Checkpointer.class)); + var timeoutLock0 = new CheckpointTimeoutLock(readWriteLock0, 0, () -> NOT_REQUIRED, mock(Checkpointer.class)); + var timeoutLock1 = new CheckpointTimeoutLock(readWriteLock1, 100, () -> NOT_REQUIRED, mock(Checkpointer.class)); try { timeoutLock0.start(); @@ -172,8 +175,8 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { CheckpointReadWriteLock readWriteLock0 = newReadWriteLock(); CheckpointReadWriteLock readWriteLock1 = newReadWriteLock(); - CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(readWriteLock0, 0, () -> true, mock(Checkpointer.class)); - CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(readWriteLock1, 100, () -> true, mock(Checkpointer.class)); + var timeoutLock0 = new CheckpointTimeoutLock(readWriteLock0, 0, () -> NOT_REQUIRED, mock(Checkpointer.class)); + var timeoutLock1 = new CheckpointTimeoutLock(readWriteLock1, 100, () -> NOT_REQUIRED, mock(Checkpointer.class)); try { timeoutLock0.start(); @@ -240,9 +243,9 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { Checkpointer checkpointer = newCheckpointer(currentThread(), lockRealiseFuture); - AtomicBoolean safeToUpdate = new AtomicBoolean(); + AtomicReference<CheckpointUrgency> urgency = new AtomicReference<>(MUST_TRIGGER); - timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, safeToUpdate::get, checkpointer); + timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, urgency::get, checkpointer); timeoutLock.start(); @@ -254,7 +257,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { assertTrue(latch.await(100, MILLISECONDS)); - safeToUpdate.set(true); + urgency.set(NOT_REQUIRED); readLockFuture.get(100, MILLISECONDS); } @@ -263,7 +266,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { void testFailureLockReleasedFuture() { Checkpointer checkpointer = newCheckpointer(currentThread(), failedFuture(new Exception("test"))); - timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> false, checkpointer); + timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> MUST_TRIGGER, checkpointer); timeoutLock.start(); @@ -281,7 +284,7 @@ public class CheckpointTimeoutLockTest extends BaseIgniteAbstractTest { Checkpointer checkpointer = newCheckpointer(currentThread(), future); - timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> false, checkpointer); + timeoutLock = new CheckpointTimeoutLock(newReadWriteLock(), 0, () -> MUST_TRIGGER, checkpointer); timeoutLock.start(); diff --git a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java index bd4b630603..76389adc61 100644 --- a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java +++ b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.pagememory.FullPageId; +import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PartitionMeta; import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; @@ -59,7 +60,7 @@ public class CheckpointTestUtils { return new CheckpointTimeoutLock( mock(CheckpointReadWriteLock.class), Long.MAX_VALUE, - () -> true, + () -> CheckpointUrgency.NOT_REQUIRED, mock(Checkpointer.class) ) { @Override