Fix regression of lagging commitlog flush log message patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf Branch: refs/heads/trunk Commit: 214a3abfcc25460af50805b543a5833697a1b341 Parents: 38096da Author: Jason Brown <jasedbr...@gmail.com> Authored: Fri Jun 1 05:45:23 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Jun 5 13:47:37 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/AbstractCommitLogService.java | 85 +++++++++++++------- .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++- 3 files changed, 104 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 16fe6d1..dfdfbfd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451) * Add Missing dependencies in pom-all (CASSANDRA-14422) * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447) * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121) http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 1cee55d..0845bd5 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; + public abstract class AbstractCommitLogService { /** @@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService // sync and signal long pollStarted = clock.currentTimeMillis(); - if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) + boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested; + if (flushToDisk) { // in this branch, we want to flush the commit log to disk syncRequested = false; commitLog.sync(shutdown, true); lastSyncedAt = pollStarted; syncComplete.signalAll(); + syncCount++; } else { @@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService commitLog.sync(false, false); } - // sleep any time we have left before the next one is due long now = clock.currentTimeMillis(); - long sleep = pollStarted + markerIntervalMillis - now; - if (sleep < 0) - { - // if we have lagged noticeably, update our lag counter - if (firstLagAt == 0) - { - firstLagAt = now; - totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; - } - syncExceededIntervalBy -= sleep; - lagCount++; - } - syncCount++; - totalSyncDuration += now - pollStarted; - - if (firstLagAt > 0) - { - //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log( - logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); - if (logged) - firstLagAt = 0; - } + if (flushToDisk) + maybeLogFlushLag(pollStarted, now); if (!run) return false; // if we have lagged this round, we probably have work to do already so we don't sleep + long sleep = pollStarted + markerIntervalMillis - now; if (sleep < 0) return true; @@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService } return true; } + + /** + * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}. + */ + @VisibleForTesting + boolean maybeLogFlushLag(long pollStarted, long now) + { + long flushDuration = now - pollStarted; + totalSyncDuration += flushDuration; + + // this is the timestamp by which we should have completed the flush + long maxFlushTimestamp = pollStarted + syncIntervalMillis; + if (maxFlushTimestamp > now) + return false; + + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) + { + firstLagAt = now; + syncExceededIntervalBy = lagCount = 0; + syncCount = 1; + totalSyncDuration = flushDuration; + } + syncExceededIntervalBy += now - maxFlushTimestamp; + lagCount++; + + if (firstLagAt > 0) + { + //Only reset the lag tracking if it actually logged this time + boolean logged = NoSpamLogger.log( + logger, + NoSpamLogger.Level.WARN, + 5, + TimeUnit.MINUTES, + "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", + syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); + if (logged) + firstLagAt = 0; + } + return true; + } + + @VisibleForTesting + long getTotalSyncDuration() + { + return totalSyncDuration; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java index 5a46e5f..6f51eaf 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FreeRunningClock; @@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2; FreeRunningClock clock = new FreeRunningClock(); FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock); + SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock); FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog; // at time 0 @@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest markCount.incrementAndGet(); } } + + @Test + public void maybeLogFlushLag_MustLog() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + long pollStarted = 1; + long now = pollStarted + (syncTimeMillis * 2); + Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } + + @Test + public void maybeLogFlushLag_NoLog() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + long pollStarted = 1; + long now = pollStarted + (syncTimeMillis - 1); + Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } + + /** + * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly + */ + @Test + public void maybeLogFlushLag_MultipleOperations() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + + long pollStarted = 1; + long now = pollStarted + (syncTimeMillis - 1); + + int runCount = 12; + for (int i = 1; i <= runCount; i++) + { + Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration()); + } + + now = pollStarted + (syncTimeMillis * 2); + Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org