This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new abc808a  IGNITE-12006 Fixed threads may be parked for indefinite time 
during throttling after spurious wakeups - Fixes #6712.
abc808a is described below

commit abc808afe166ffd0698992fdc9bbee4f888cba76
Author: Sergey Antonov <antonovserge...@gmail.com>
AuthorDate: Tue Jul 30 12:05:05 2019 +0300

    IGNITE-12006 Fixed threads may be parked for indefinite time during 
throttling after spurious wakeups - Fixes #6712.
---
 .../GridCacheDatabaseSharedManager.java            |  3 ++
 .../cache/persistence/pagemem/PageMemoryImpl.java  | 21 ++++++--
 .../persistence/pagemem/PagesWriteThrottle.java    | 26 +++++++--
 .../pagemem/PagesWriteThrottlePolicy.java          | 10 +++-
 .../pagemem/IgniteThrottlingUnitTest.java          | 61 ++++++++++++++++++++++
 .../apache/ignite/util/GridCommandHandlerTest.java |  2 +
 6 files changed, 111 insertions(+), 12 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index b1a21de..c11b67c 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -4731,6 +4731,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 if (pagesToRetry.isEmpty())
                     doneFut.onDone((Void)null);
                 else {
+                    LT.warn(log, pagesToRetry.size() + " checkpoint pages were 
not written yet due to unsuccessful " +
+                        "page write lock acquisition and will be retried");
+
                     if (retryWriteExecutor == null) {
                         while (!pagesToRetry.isEmpty())
                             pagesToRetry = writePages(pagesToRetry);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 7877eec..8a543c6 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -926,7 +926,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             // We pinned the page when allocated the temp buffer, release it 
now.
             PageHeader.releasePage(absPtr);
 
-            checkpointPool.releaseFreePage(tmpBufPtr);
+            releaseCheckpointBufferPage(tmpBufPtr);
         }
 
         if (rmv)
@@ -945,6 +945,14 @@ public class PageMemoryImpl implements PageMemoryEx {
         return relPtr;
     }
 
+    /** */
+    private void releaseCheckpointBufferPage(long tmpBufPtr) {
+        int resCntr = checkpointPool.releaseFreePage(tmpBufPtr);
+
+        if (resCntr == checkpointBufferPagesSize() / 2 && writeThrottle != 
null)
+            writeThrottle.tryWakeupThrottledThreads();
+    }
+
     /**
      * Restores page from WAL page snapshot & delta records.
      *
@@ -1298,7 +1306,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 if (tracker != null)
                     tracker.onCowPageWritten();
 
-                checkpointPool.releaseFreePage(tmpRelPtr);
+                releaseCheckpointBufferPage(tmpRelPtr);
 
                 // Need release again because we pin page when resolve abs 
pointer,
                 // and page did not have tmp buffer page.
@@ -1934,14 +1942,17 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         /**
          * @param relPtr Relative pointer to free.
+         * @return Resulting number of pages in pool if pages counter is 
enabled, 0 otherwise.
          */
-        private void releaseFreePage(long relPtr) {
+        private int releaseFreePage(long relPtr) {
             long absPtr = absolute(relPtr);
 
             assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + 
PageHeader.fullPageId(absPtr);
 
+            int resCntr = 0;
+
             if (pagesCntr != null)
-                pagesCntr.getAndDecrement();
+                resCntr = pagesCntr.decrementAndGet();
 
             while (true) {
                 long freePageRelPtrMasked = 
GridUnsafe.getLong(freePageListPtr);
@@ -1951,7 +1962,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 GridUnsafe.putLong(absPtr, freePageRelPtr);
 
                 if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, 
freePageRelPtrMasked, relPtr))
-                    return;
+                    return resCntr;
             }
         }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index e7fb0fb..1586599 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -92,11 +92,8 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
 
         boolean shouldThrottle = false;
 
-        if (isPageInCheckpoint) {
-            int checkpointBufLimit = 
(int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);
-
-            shouldThrottle = pageMemory.checkpointBufferPagesCount() > 
checkpointBufLimit;
-        }
+        if (isPageInCheckpoint)
+            shouldThrottle = shouldThrottle();
 
         if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
             AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
@@ -150,6 +147,16 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     }
 
     /** {@inheritDoc} */
+    @Override public void tryWakeupThrottledThreads() {
+        if (!shouldThrottle()) {
+            inCheckpointBackoffCntr.set(0);
+
+            parkThrds.forEach(LockSupport::unpark);
+            parkThrds.clear();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onBeginCheckpoint() {
     }
 
@@ -159,4 +166,13 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
 
         notInCheckpointBackoffCntr.set(0);
     }
+
+    /**
+     * @return {@code True} if throttling should be enabled, and {@code False} 
otherwise.
+     */
+    private boolean shouldThrottle() {
+        int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() 
* CP_BUF_FILL_THRESHOLD);
+
+        return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index e6aab79..5466ae8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -17,9 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
-import org.apache.ignite.IgniteSystemProperties;
-
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteSystemProperties;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THROTTLE_LOG_THRESHOLD;
 
@@ -38,6 +37,13 @@ public interface PagesWriteThrottlePolicy {
     void onMarkDirty(boolean isPageInCheckpoint);
 
     /**
+     * Callback to try wakeup throttled threads.
+     */
+    default void tryWakeupThrottledThreads() {
+        // No-op.
+    }
+
+    /**
      * Callback to notify throttling policy checkpoint was started.
      */
     void onBeginCheckpoint();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index c4a26a8..4dad75c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -16,11 +16,15 @@
  */
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.logger.NullLogger;
@@ -29,7 +33,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import static java.lang.Thread.State.TIMED_WAITING;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -226,6 +233,60 @@ public class IgniteThrottlingUnitTest {
         assertTrue(time == 0);
     }
 
+    /** */
+    @Test
+    public void wakeupThrottledThread() throws 
IgniteInterruptedCheckedException {
+        PagesWriteThrottlePolicy plc = new PagesWriteThrottle(pageMemory2g, 
null, stateChecker, true, log);
+
+        AtomicBoolean stopLoad = new AtomicBoolean();
+        List<Thread> loadThreads = new ArrayList<>();
+
+        for (int i = 0; i < 3; i++) {
+            loadThreads.add(new Thread(
+                () -> {
+                    while (!stopLoad.get())
+                        plc.onMarkDirty(true);
+                },
+                "load-" + i
+            ));
+        }
+
+        when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100);
+        when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(70);
+
+        try {
+            loadThreads.forEach(Thread::start);
+
+            for (int i = 0; i < 100_000; i++)
+                loadThreads.forEach(LockSupport::unpark);
+
+            // Awaiting that all load threads are parked.
+            for (Thread t : loadThreads)
+                assertTrue(t.getName(), waitForCondition(() -> t.getState() == 
TIMED_WAITING, 500L));
+
+            plc.tryWakeupThrottledThreads();
+
+            // Threads shouldn't wakeup because of throttling enabled.
+            for (Thread t : loadThreads)
+                assertEquals(t.getName(), TIMED_WAITING, t.getState());
+
+            // Disable throttling
+            when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(50);
+
+            plc.tryWakeupThrottledThreads();
+
+            // Awaiting that all load threads are unparked.
+            for (Thread t : loadThreads)
+                assertTrue(t.getName(), waitForCondition(() -> t.getState() != 
TIMED_WAITING, 500L));
+
+            for (Thread t : loadThreads)
+                assertNotEquals(t.getName(), TIMED_WAITING, t.getState());
+        }
+        finally {
+            stopLoad.set(true);
+        }
+    }
+
     /**
      *
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 94c7676..f6736cf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -1944,6 +1944,8 @@ public class GridCommandHandlerTest extends 
GridCommandHandlerAbstractTest {
 
         startGrid(0);
 
+        forceCheckpoint();
+
         awaitPartitionMapExchange();
 
         injectTestSystemOut();

Reply via email to