Fix regression of lagging commitlog flush log message

patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf

Branch: refs/heads/trunk
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 85 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
  * Add Missing dependencies in pom-all (CASSANDRA-14422)
  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX 
clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractCommitLogService
 {
     /**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
 
                 // sync and signal
                 long pollStarted = clock.currentTimeMillis();
-                if (lastSyncedAt + syncIntervalMillis <= pollStarted || 
shutdown || syncRequested)
+                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= 
pollStarted || shutdown || syncRequested;
+                if (flushToDisk)
                 {
                     // in this branch, we want to flush the commit log to disk
                     syncRequested = false;
                     commitLog.sync(shutdown, true);
                     lastSyncedAt = pollStarted;
                     syncComplete.signalAll();
+                    syncCount++;
                 }
                 else
                 {
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
                     commitLog.sync(false, false);
                 }
 
-                // sleep any time we have left before the next one is due
                 long now = clock.currentTimeMillis();
-                long sleep = pollStarted + markerIntervalMillis - now;
-                if (sleep < 0)
-                {
-                    // if we have lagged noticeably, update our lag counter
-                    if (firstLagAt == 0)
-                    {
-                        firstLagAt = now;
-                        totalSyncDuration = syncExceededIntervalBy = syncCount 
= lagCount = 0;
-                    }
-                    syncExceededIntervalBy -= sleep;
-                    lagCount++;
-                }
-                syncCount++;
-                totalSyncDuration += now - pollStarted;
-
-                if (firstLagAt > 0)
-                {
-                    //Only reset the lag tracking if it actually logged this 
time
-                    boolean logged = NoSpamLogger.log(
-                    logger,
-                    NoSpamLogger.Level.WARN,
-                    5,
-                    TimeUnit.MINUTES,
-                    "Out of {} commit log syncs over the past {}s with average 
duration of {}ms, {} have exceeded the configured commit interval by an average 
of {}ms",
-                    syncCount, (now - firstLagAt) / 1000, 
String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, 
String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
-                    if (logged)
-                        firstLagAt = 0;
-                }
+                if (flushToDisk)
+                    maybeLogFlushLag(pollStarted, now);
 
                 if (!run)
                     return false;
 
                 // if we have lagged this round, we probably have work to do 
already so we don't sleep
+                long sleep = pollStarted + markerIntervalMillis - now;
                 if (sleep < 0)
                     return true;
 
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
             }
             return true;
         }
+
+        /**
+         * Add a log entry whenever the time to flush the commit log to disk 
exceeds {@link #syncIntervalMillis}.
+         */
+        @VisibleForTesting
+        boolean maybeLogFlushLag(long pollStarted, long now)
+        {
+            long flushDuration = now - pollStarted;
+            totalSyncDuration += flushDuration;
+
+            // this is the timestamp by which we should have completed the 
flush
+            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+            if (maxFlushTimestamp > now)
+                return false;
+
+            // if we have lagged noticeably, update our lag counter
+            if (firstLagAt == 0)
+            {
+                firstLagAt = now;
+                syncExceededIntervalBy = lagCount = 0;
+                syncCount = 1;
+                totalSyncDuration = flushDuration;
+            }
+            syncExceededIntervalBy += now - maxFlushTimestamp;
+            lagCount++;
+
+            if (firstLagAt > 0)
+            {
+                //Only reset the lag tracking if it actually logged this time
+                boolean logged = NoSpamLogger.log(
+                logger,
+                NoSpamLogger.Level.WARN,
+                5,
+                TimeUnit.MINUTES,
+                "Out of {} commit log syncs over the past {}s with average 
duration of {}ms, {} have exceeded the configured commit interval by an average 
of {}ms",
+                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", 
(double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", 
(double) syncExceededIntervalBy / lagCount));
+                if (logged)
+                    firstLagAt = 0;
+            }
+            return true;
+        }
+
+        @VisibleForTesting
+        long getTotalSyncDuration()
+        {
+            return totalSyncDuration;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- 
a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FreeRunningClock;
 
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
         long syncTimeMillis = 
AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
         FreeRunningClock clock = new FreeRunningClock();
         FakeCommitLogService commitLogService = new 
FakeCommitLogService(syncTimeMillis);
-        AbstractCommitLogService.SyncRunnable syncRunnable = 
commitLogService.new SyncRunnable(clock);
+        SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
         FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
 
         // at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
                 markCount.incrementAndGet();
         }
     }
+
+    @Test
+    public void maybeLogFlushLag_MustLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+    }
+
+    @Test
+    public void maybeLogFlushLag_NoLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+        Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+    }
+
+    /**
+     * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled 
correctly
+     */
+    @Test
+    public void maybeLogFlushLag_MultipleOperations()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+
+        int runCount = 12;
+        for (int i = 1; i <= runCount; i++)
+        {
+            Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, 
now));
+            Assert.assertEquals(i * (now - pollStarted), 
syncRunnable.getTotalSyncDuration());
+        }
+
+        now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to