Ensure commit log stop policy is enforced at startup patch by paulo; reviewed by benedict for CASSANDRA-8515
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b7934f1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b7934f1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b7934f1 Branch: refs/heads/cassandra-2.2 Commit: 3b7934f1aa20d2210866afd9b88472e9cb1aed8d Parents: 98a08eb Author: Paulo Motta <pauloricard...@gmail.com> Authored: Mon Jul 13 19:35:50 2015 -0300 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Aug 9 09:34:05 2015 +0200 ---------------------------------------------------------------------- .../cassandra/service/CassandraDaemon.java | 7 ++ .../cassandra/utils/JVMStabilityInspector.java | 9 +- .../org/apache/cassandra/db/CommitLogTest.java | 110 +++++++++++++++++++ .../apache/cassandra/utils/KillerForTests.java | 11 +- 4 files changed, 134 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b7934f1/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 2c141a6..d078203 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -34,6 +34,7 @@ import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXServiceURL; import javax.management.remote.rmi.RMIConnectorServer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; @@ -425,6 +426,12 @@ public class CassandraDaemon int nativePort = DatabaseDescriptor.getNativeTransportPort(); nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + completeSetup(); + } + + @VisibleForTesting + public void completeSetup() + { setupCompleted = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b7934f1/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index c0ab84f..de396bb 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -28,7 +28,9 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.thrift.Cassandra; /** * Responsible for deciding whether to kill the JVM if it gets in an "unstable" state (think OOM). @@ -38,6 +40,7 @@ public final class JVMStabilityInspector private static final Logger logger = LoggerFactory.getLogger(JVMStabilityInspector.class); private static Killer killer = new Killer(); + private JVMStabilityInspector() {} /** @@ -66,7 +69,11 @@ public final class JVMStabilityInspector public static void inspectCommitLogThrowable(Throwable t) { - if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) + if (!StorageService.instance.isSetupCompleted()) + { + logger.error("Exiting due to error while processing commit log during initialization.", t); + killer.killCurrentJVM(t, true); + } else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) killer.killCurrentJVM(t); else inspectThrowable(t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b7934f1/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 9a8a1dc..1c3daab 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogSegmentManager; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.composites.CellName; @@ -43,6 +44,7 @@ import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -279,6 +281,10 @@ public class CommitLogTest extends SchemaLoader @Test public void testCommitFailurePolicy_stop() throws ConfigurationException { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + // Need storage service active so stop policy can shutdown gossip StorageService.instance.initServer(); Assert.assertTrue(Gossiper.instance.isEnabled()); @@ -299,6 +305,10 @@ public class CommitLogTest extends SchemaLoader @Test public void testCommitFailurePolicy_die() { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + KillerForTests killerForTests = new KillerForTests(); JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); @@ -307,11 +317,111 @@ public class CommitLogTest extends SchemaLoader DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); CommitLog.handleCommitError("Testing die policy", new Throwable()); Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_mustDieIfNotStartedUp() + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + //even though policy is ignore, JVM must die because Daemon has not finished initializing + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitLogFailureBeforeInitialization_mustKillJVM() throws Exception + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + //let's make the commit log directory non-writable + File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); + commitLogDir.setWritable(false); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + + //now let's create a commit log segment manager and wait for it to fail + new CommitLogSegmentManager(); + + //busy wait since commitlogsegmentmanager spawns another thread + int retries = 0; + while (!killerForTests.wasKilled() && retries++ < 5) + Thread.sleep(10); + + //since failure was before CassandraDaemon startup, the JVM must be killed + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + commitLogDir.setWritable(true); + } + } + + @Test + public void testCommitLogFailureAfterInitialization_mustRespectFailurePolicy() throws Exception + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + //let's make the commit log directory non-writable + File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); + commitLogDir.setWritable(false); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + + //now let's create a commit log segment manager and wait for it to fail + new CommitLogSegmentManager(); + + //wait commit log segment manager thread to execute + Thread.sleep(50); + + //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup + Assert.assertFalse(killerForTests.wasKilled()); } finally { DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); JVMStabilityInspector.replaceKiller(originalKiller); + commitLogDir.setWritable(true); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b7934f1/test/unit/org/apache/cassandra/utils/KillerForTests.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java b/test/unit/org/apache/cassandra/utils/KillerForTests.java index d488f97..abc7952 100644 --- a/test/unit/org/apache/cassandra/utils/KillerForTests.java +++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java @@ -24,11 +24,13 @@ package org.apache.cassandra.utils; public class KillerForTests extends JVMStabilityInspector.Killer { private boolean killed = false; + private boolean quiet = false; @Override - protected void killCurrentJVM(Throwable t) + protected void killCurrentJVM(Throwable t, boolean quiet) { - killed = true; + this.killed = true; + this.quiet = quiet; } public boolean wasKilled() @@ -36,6 +38,11 @@ public class KillerForTests extends JVMStabilityInspector.Killer return killed; } + public boolean wasKilledQuietly() + { + return quiet; + } + public void reset() { killed = false;