Phillippko commented on code in PR #5365:
URL: https://github.com/apache/ignite-3/pull/5365#discussion_r1984741784


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteSpeedBasedThrottle.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ * When the page in question is not included in the current checkpoint and 
Checkpoint Buffer is filled over 2/3,
+ * uses exponentially growing sleep time to throttle.
+ * Otherwise, uses average checkpoint write speed and instant speed of marking 
pages as dirty.<br>
+ *
+ * @see <a 
href="https://github.com/apache/ignite/tree/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem#speed-based-throttling";>Speed-based
 throttling description</a>

Review Comment:
   link to AI2 docs



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/ProgressSpeedCalculationTest.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright 2021 GridGain Systems, Inc. and Contributors.

Review Comment:
   old copyright



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/IgniteThrottlingUnitTest.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import static java.lang.Thread.State.TIMED_WAITING;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.Constants;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link PagesWriteSpeedBasedThrottle} ported from Ignite 2.
+ */
+public class IgniteThrottlingUnitTest extends IgniteAbstractTest {

Review Comment:
   ```suggestion
   public class IgniteThrottlingTest extends IgniteAbstractTest {
   ```
   
   



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteSpeedBasedThrottle.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ * When the page in question is not included in the current checkpoint and 
Checkpoint Buffer is filled over 2/3,
+ * uses exponentially growing sleep time to throttle.
+ * Otherwise, uses average checkpoint write speed and instant speed of marking 
pages as dirty.<br>
+ *
+ * @see <a 
href="https://github.com/apache/ignite/tree/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem#speed-based-throttling";>Speed-based
 throttling description</a>
+ */
+public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteSpeedBasedThrottle.class);
+
+    /**
+     * Throttling 'duration' used to signal that no throttling is needed, and 
no certain side-effects are allowed
+     * (like stats collection).
+     */
+    static final long NO_THROTTLING_MARKER = Long.MIN_VALUE;
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Checkpoint progress provider. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** Threads set. Contains threads which are currently parked because of 
throttling. */
+    private final Set<Thread> parkedThreads = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Used for calculating speed of marking pages dirty.
+     * Value from past 750-1000 millis only.
+     * {@link IntervalBasedMeasurement#getSpeedOpsPerSec(long)} returns pages 
marked/second.
+     * {@link IntervalBasedMeasurement#getAverage()} returns average throttle 
time.
+     * */
+    private final IntervalBasedMeasurement markSpeedAndAvgParkTime = new 
IntervalBasedMeasurement(250, 3);
+
+    /** Checkpoint lock state provider. */
+    private final CheckpointLockStateChecker cpLockStateChecker;
+
+    /** Previous warning time, nanos. */
+    private final AtomicLong prevWarnTime = new AtomicLong();
+
+    /** Warning min delay nanoseconds. */
+    private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10);
+
+    /** Warning threshold: minimal level of pressure that causes warning 
messages to log. */
+    private static final double WARN_THRESHOLD = 0.2;
+
+    /** Checkpoint buffer protection logic. */
+    private final ExponentialBackoffThrottlingStrategy cpBufferProtector = new 
ExponentialBackoffThrottlingStrategy();
+
+    /** Clean pages protection logic. */
+    private final SpeedBasedMemoryConsumptionThrottlingStrategy 
cleanPagesProtector;
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker Checkpoint lock state provider.
+     */
+    public PagesWriteSpeedBasedThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        cpLockStateChecker = stateChecker;
+
+        cleanPagesProtector = new 
SpeedBasedMemoryConsumptionThrottlingStrategy(pageMemory, cpProgress,
+            markSpeedAndAvgParkTime);
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+    }
+
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert cpLockStateChecker.checkpointLockIsHeldByThread();
+
+        long curNanoTime = System.nanoTime();
+        long throttleParkTimeNs = 
computeThrottlingParkTime(isPageInCheckpoint, curNanoTime);
+
+        if (throttleParkTimeNs == NO_THROTTLING_MARKER) {
+            return;
+        } else if (throttleParkTimeNs > 0) {
+            recurrentLogIfNeeded();
+            doPark(throttleParkTimeNs);
+        }
+
+        
markSpeedAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs);
+    }
+
+    private long computeThrottlingParkTime(boolean isPageInCheckpoint, long 
curNanoTime) {
+        if (isPageInCheckpoint && isCpBufferOverflowThresholdExceeded()) {
+            return cpBufferProtector.protectionParkTime();
+        } else {
+            if (isPageInCheckpoint) {
+                // The fact that we are here means that we checked whether CP 
Buffer is in danger zone and found that
+                // it is ok, so its protector may relax, hence we reset it.
+                cpBufferProtector.resetBackoff();
+            }
+            return cleanPagesProtector.protectionParkTime(curNanoTime);
+        }
+    }
+
+    /**
+     * Disables the current thread for thread scheduling purposes. May be 
overriden by subclasses for tests.
+     *
+     * @param throttleParkTimeNs The maximum number of nanoseconds to wait.
+     */
+    protected void doPark(long throttleParkTimeNs) {
+        if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+            LOG.warn("Parking thread=" + Thread.currentThread().getName()
+                    + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+        }
+
+        parkedThreads.add(Thread.currentThread());
+
+        try {
+            LockSupport.parkNanos(throttleParkTimeNs);
+        } finally {
+            parkedThreads.remove(Thread.currentThread());
+        }
+    }
+
+    /**
+     * Returns a number of written pages.
+     */
+    private int cpWrittenPages() {
+        CheckpointProgress cpProgress = this.cpProgress.get();
+
+        if (cpProgress == null) {
+            return 0;
+        }
+
+        return cpProgress.writtenPages();
+    }
+
+    /**
+     * Prints warning to log if throttling is occurred and requires markable 
amount of time.
+     */
+    private void recurrentLogIfNeeded() {
+        long prevWarningNs = prevWarnTime.get();
+        long curNs = System.nanoTime();
+
+        if (prevWarningNs != 0 && (curNs - prevWarningNs) <= 
WARN_MIN_DELAY_NS) {
+            return;
+        }
+
+        double weight = throttleWeight();
+        if (weight <= WARN_THRESHOLD) {
+            return;
+        }
+
+        if (prevWarnTime.compareAndSet(prevWarningNs, curNs) && 
LOG.isInfoEnabled()) {
+            String msg = String.format("Throttling is applied to page 
modifications "
+                            + "[fractionOfParkTime=%.2f, markDirty=%d 
pages/sec, checkpointWrite=%d pages/sec, "
+                            + "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, 
maxDirty=%.2f, avgParkTime=%d ns, "
+                            + "pages: (total=%d, evicted=%d, written=%d, 
synced=%d, cpBufUsed=%d, cpBufTotal=%d)]",
+                    weight, getMarkDirtySpeed(), getCpWriteSpeed(),
+                    getLastEstimatedSpeedForMarkAll(), getCurrDirtyRatio(), 
getTargetDirtyRatio(), throttleParkTime(),
+                    cleanPagesProtector.cpTotalPages(), 
cleanPagesProtector.cpEvictedPages(), cpWrittenPages(),
+                    cleanPagesProtector.cpSyncedPages(),
+                    pageMemory.usedCheckpointBufferPages(), 
pageMemory.maxCheckpointBufferPages());
+
+            LOG.info(msg);
+        }
+    }
+
+    /**
+     * This is only used in tests.
+     *
+     * @param dirtyPagesRatio Actual percent of dirty pages.
+     * @param fullyCompletedPages Written & fsynced pages count.
+     * @param cpTotalPages Total checkpoint scope.
+     * @param threads Number of threads providing data during current 
checkpoint.
+     * @param markDirtySpeed Registered mark dirty speed, pages/sec.
+     * @param curCpWriteSpeed Average checkpoint write speed, pages/sec.
+     * @return Time in nanoseconds to part or 0 if throttling is not required.
+     */
+    @TestOnly
+    long getCleanPagesProtectionParkTime(
+            double dirtyPagesRatio,
+            long fullyCompletedPages,
+            int cpTotalPages,
+            int threads,
+            long markDirtySpeed,
+            long curCpWriteSpeed) {
+        return cleanPagesProtector.getParkTime(dirtyPagesRatio, 
fullyCompletedPages, cpTotalPages, threads,
+                markDirtySpeed, curCpWriteSpeed);
+    }
+
+    @Override public void onBeginCheckpoint() {
+        cleanPagesProtector.reset();
+    }
+
+    @Override public void onFinishCheckpoint() {
+        cpBufferProtector.resetBackoff();
+
+        cleanPagesProtector.finish();
+        markSpeedAndAvgParkTime.finishInterval();
+        unparkParkedThreads();
+    }
+
+    /**
+     * Unparks parked threads.

Review Comment:
   I think it won't hurt to make easy fixes in ported code. I.e. we definitely 
don't need these javadocs like that (private method + duplicates the naming) 
and it is simple to remove them



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1425,6 +1442,13 @@ private void close() {
             }
         }
 
+        /**
+         * Returns a dirty pages ratio.

Review Comment:
   let's remove this javadoc :) Private method + duplicates the name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to