This is an automated email from the ASF dual-hosted git repository. dgovorukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new abc808a IGNITE-12006 Fixed threads may be parked for indefinite time during throttling after spurious wakeups - Fixes #6712. abc808a is described below commit abc808afe166ffd0698992fdc9bbee4f888cba76 Author: Sergey Antonov <antonovserge...@gmail.com> AuthorDate: Tue Jul 30 12:05:05 2019 +0300 IGNITE-12006 Fixed threads may be parked for indefinite time during throttling after spurious wakeups - Fixes #6712. --- .../GridCacheDatabaseSharedManager.java | 3 ++ .../cache/persistence/pagemem/PageMemoryImpl.java | 21 ++++++-- .../persistence/pagemem/PagesWriteThrottle.java | 26 +++++++-- .../pagemem/PagesWriteThrottlePolicy.java | 10 +++- .../pagemem/IgniteThrottlingUnitTest.java | 61 ++++++++++++++++++++++ .../apache/ignite/util/GridCommandHandlerTest.java | 2 + 6 files changed, 111 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b1a21de..c11b67c 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -4731,6 +4731,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (pagesToRetry.isEmpty()) doneFut.onDone((Void)null); else { + LT.warn(log, pagesToRetry.size() + " checkpoint pages were not written yet due to unsuccessful " + + "page write lock acquisition and will be retried"); + if (retryWriteExecutor == null) { while (!pagesToRetry.isEmpty()) pagesToRetry = writePages(pagesToRetry); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 7877eec..8a543c6 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -926,7 +926,7 @@ public class PageMemoryImpl implements PageMemoryEx { // We pinned the page when allocated the temp buffer, release it now. PageHeader.releasePage(absPtr); - checkpointPool.releaseFreePage(tmpBufPtr); + releaseCheckpointBufferPage(tmpBufPtr); } if (rmv) @@ -945,6 +945,14 @@ public class PageMemoryImpl implements PageMemoryEx { return relPtr; } + /** */ + private void releaseCheckpointBufferPage(long tmpBufPtr) { + int resCntr = checkpointPool.releaseFreePage(tmpBufPtr); + + if (resCntr == checkpointBufferPagesSize() / 2 && writeThrottle != null) + writeThrottle.tryWakeupThrottledThreads(); + } + /** * Restores page from WAL page snapshot & delta records. * @@ -1298,7 +1306,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (tracker != null) tracker.onCowPageWritten(); - checkpointPool.releaseFreePage(tmpRelPtr); + releaseCheckpointBufferPage(tmpRelPtr); // Need release again because we pin page when resolve abs pointer, // and page did not have tmp buffer page. @@ -1934,14 +1942,17 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param relPtr Relative pointer to free. + * @return Resulting number of pages in pool if pages counter is enabled, 0 otherwise. */ - private void releaseFreePage(long relPtr) { + private int releaseFreePage(long relPtr) { long absPtr = absolute(relPtr); assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr); + int resCntr = 0; + if (pagesCntr != null) - pagesCntr.getAndDecrement(); + resCntr = pagesCntr.decrementAndGet(); while (true) { long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr); @@ -1951,7 +1962,7 @@ public class PageMemoryImpl implements PageMemoryEx { GridUnsafe.putLong(absPtr, freePageRelPtr); if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtr)) - return; + return resCntr; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java index e7fb0fb..1586599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -92,11 +92,8 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { boolean shouldThrottle = false; - if (isPageInCheckpoint) { - int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD); - - shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; - } + if (isPageInCheckpoint) + shouldThrottle = shouldThrottle(); if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); @@ -150,6 +147,16 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { } /** {@inheritDoc} */ + @Override public void tryWakeupThrottledThreads() { + if (!shouldThrottle()) { + inCheckpointBackoffCntr.set(0); + + parkThrds.forEach(LockSupport::unpark); + parkThrds.clear(); + } + } + + /** {@inheritDoc} */ @Override public void onBeginCheckpoint() { } @@ -159,4 +166,13 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { notInCheckpointBackoffCntr.set(0); } + + /** + * @return {@code True} if throttling should be enabled, and {@code False} otherwise. + */ + private boolean shouldThrottle() { + int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD); + + return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java index e6aab79..5466ae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java @@ -17,9 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; -import org.apache.ignite.IgniteSystemProperties; - import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteSystemProperties; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THROTTLE_LOG_THRESHOLD; @@ -38,6 +37,13 @@ public interface PagesWriteThrottlePolicy { void onMarkDirty(boolean isPageInCheckpoint); /** + * Callback to try wakeup throttled threads. + */ + default void tryWakeupThrottledThreads() { + // No-op. + } + + /** * Callback to notify throttling policy checkpoint was started. */ void onBeginCheckpoint(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java index c4a26a8..4dad75c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java @@ -16,11 +16,15 @@ */ package org.apache.ignite.internal.processors.cache.persistence.pagemem; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.logger.NullLogger; @@ -29,7 +33,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import static java.lang.Thread.State.TIMED_WAITING; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -226,6 +233,60 @@ public class IgniteThrottlingUnitTest { assertTrue(time == 0); } + /** */ + @Test + public void wakeupThrottledThread() throws IgniteInterruptedCheckedException { + PagesWriteThrottlePolicy plc = new PagesWriteThrottle(pageMemory2g, null, stateChecker, true, log); + + AtomicBoolean stopLoad = new AtomicBoolean(); + List<Thread> loadThreads = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + loadThreads.add(new Thread( + () -> { + while (!stopLoad.get()) + plc.onMarkDirty(true); + }, + "load-" + i + )); + } + + when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100); + when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(70); + + try { + loadThreads.forEach(Thread::start); + + for (int i = 0; i < 100_000; i++) + loadThreads.forEach(LockSupport::unpark); + + // Awaiting that all load threads are parked. + for (Thread t : loadThreads) + assertTrue(t.getName(), waitForCondition(() -> t.getState() == TIMED_WAITING, 500L)); + + plc.tryWakeupThrottledThreads(); + + // Threads shouldn't wakeup because of throttling enabled. + for (Thread t : loadThreads) + assertEquals(t.getName(), TIMED_WAITING, t.getState()); + + // Disable throttling + when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(50); + + plc.tryWakeupThrottledThreads(); + + // Awaiting that all load threads are unparked. + for (Thread t : loadThreads) + assertTrue(t.getName(), waitForCondition(() -> t.getState() != TIMED_WAITING, 500L)); + + for (Thread t : loadThreads) + assertNotEquals(t.getName(), TIMED_WAITING, t.getState()); + } + finally { + stopLoad.set(true); + } + } + /** * */ diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 94c7676..f6736cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -1944,6 +1944,8 @@ public class GridCommandHandlerTest extends GridCommandHandlerAbstractTest { startGrid(0); + forceCheckpoint(); + awaitPartitionMapExchange(); injectTestSystemOut();