Repository: cassandra Updated Branches: refs/heads/trunk 209ea6e9b -> e39dc56ed
Add 'die' policy for commit log and disk failure patch by John Sumsion and Josh McKenzie, reviewed by blerer for CASSANDRA-7927 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4df271f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4df271f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4df271f Branch: refs/heads/trunk Commit: d4df271f754ac4d5afd785d4043ccaaeb907a6ad Parents: 860cde7 Author: Joshua McKenzie <jmcken...@apache.org> Authored: Fri Oct 24 12:27:20 2014 -0500 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Fri Oct 24 12:27:20 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 5 +- .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/db/commitlog/CommitLog.java | 12 ++++- .../org/apache/cassandra/io/util/FileUtils.java | 10 +++- .../cassandra/utils/JVMStabilityInspector.java | 50 ++++++++++++++++++- .../org/apache/cassandra/db/CommitLogTest.java | 45 +++++++++++------ .../utils/JVMStabilityInspectorTest.java | 51 ++++++++++++++++++++ .../apache/cassandra/utils/KillerForTests.java | 43 +++++++++++++++++ 9 files changed, 197 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c136c5e..4ed07a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.2 + * Add 'die' policy for commit log and disk failure (CASSANDRA-7927) * Fix installing as service on Windows (CASSANDRA-8115) * Fix CREATE TABLE for CQL2 (CASSANDRA-8144) * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c95c68c..74a8cfb 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -104,6 +104,8 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner # commitlog_directory: /var/lib/cassandra/commitlog # policy for data disk failures: +# die: shut down gossip and Thrift and kill the JVM for any fs errors or +# single-sstable errors, so the node can be replaced. # stop_paranoid: shut down gossip and Thrift even for single-sstable errors. # stop: shut down gossip and Thrift, leaving the node effectively dead, but # can still be inspected via JMX. @@ -114,9 +116,10 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner disk_failure_policy: stop # policy for commit disk failures: +# die: shut down gossip and Thrift and kill the JVM, so the node can be replaced. # stop: shut down gossip and Thrift, leaving the node effectively dead, but # can still be inspected via JMX. -# stop_commit: shutdown the commit log, letting writes collect but +# stop_commit: shutdown the commit log, letting writes collect but # continuing to service reads, as in pre-2.0.5 Cassandra # ignore: ignore fatal errors and let the batches fail commit_failure_policy: stop http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index e2df89f..5f16239 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -299,6 +299,7 @@ public class Config stop, ignore, stop_paranoid, + die } public static enum CommitFailurePolicy @@ -306,6 +307,7 @@ public class Config stop, stop_commit, ignore, + die, } public static enum RequestSchedulerId http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index d38c4ed..ee9ca14 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -25,9 +25,12 @@ import java.util.*; import javax.management.MBeanServer; import javax.management.ObjectName; -import org.apache.commons.lang3.StringUtils; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; + import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; @@ -36,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputByteBuffer; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.PureJavaCrc32; import static org.apache.cassandra.db.commitlog.CommitLogSegment.*; @@ -349,10 +353,14 @@ public class CommitLog implements CommitLogMBean return allocator.getActiveSegments().size(); } - static boolean handleCommitError(String message, Throwable t) + @VisibleForTesting + public static boolean handleCommitError(String message, Throwable t) { + JVMStabilityInspector.inspectCommitLogThrowable(t); switch (DatabaseDescriptor.getCommitFailurePolicy()) { + // Needed here for unit tests to not fail on default assertion + case die: case stop: StorageService.instance.stopTransports(); case stop_commit: http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index e590918..295679e 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -395,12 +395,18 @@ public class FileUtils public static void handleCorruptSSTable(CorruptSSTableException e) { - if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid) - StorageService.instance.stopTransports(); + JVMStabilityInspector.inspectThrowable(e); + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + StorageService.instance.stopTransports(); + break; + } } public static void handleFSError(FSError e) { + JVMStabilityInspector.inspectThrowable(e); switch (DatabaseDescriptor.getDiskFailurePolicy()) { case stop_paranoid: http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 9fdc5ea..bcff172 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -17,15 +17,28 @@ */ package org.apache.cassandra.utils; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.service.StorageService; -public class JVMStabilityInspector +/** + * Responsible for deciding whether to kill the JVM if it gets in an "unstable" state (think OOM). + */ +public final class JVMStabilityInspector { private static final Logger logger = LoggerFactory.getLogger(JVMStabilityInspector.class); + private static Killer killer = new Killer(); + + private JVMStabilityInspector() {} + /** - * Certain Throwables and Exceptions represent "Stop" conditions for the server. + * Certain Throwables and Exceptions represent "Die" conditions for the server. * @param t * The Throwable to check for server-stop conditions */ @@ -34,7 +47,40 @@ public class JVMStabilityInspector boolean isUnstable = false; if (t instanceof OutOfMemoryError) isUnstable = true; + + if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.die) + if (t instanceof FSError || t instanceof CorruptSSTableException) + isUnstable = true; + if (isUnstable) + killer.killCurrentJVM(t); + } + + public static void inspectCommitLogThrowable(Throwable t) + { + if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) + killer.killCurrentJVM(t); + else + inspectThrowable(t); + } + + @VisibleForTesting + public static Killer replaceKiller(Killer newKiller) { + Killer oldKiller = JVMStabilityInspector.killer; + JVMStabilityInspector.killer = newKiller; + return oldKiller; + } + + @VisibleForTesting + public static class Killer + { + /** + * Certain situations represent "Die" conditions for the server, and if so, the reason is logged and the current JVM is killed. + * + * @param t + * The Throwable to log before killing the current JVM + */ + protected void killCurrentJVM(Throwable t) { t.printStackTrace(System.err); logger.error("JVM state determined to be unstable. Exiting forcefully due to:", t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index ed9601d..8a1bb0c 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -23,11 +23,9 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Checksum; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.junit.Test; @@ -42,10 +40,14 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -275,29 +277,42 @@ public class CommitLogTest extends SchemaLoader } @Test - public void testCommitFailurePolicy_stop() + public void testCommitFailurePolicy_stop() throws ConfigurationException { - File commitDir = new File(DatabaseDescriptor.getCommitLogLocation()); + // Need storage service active so stop policy can shutdown gossip + StorageService.instance.initServer(); + Assert.assertTrue(Gossiper.instance.isEnabled()); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); try { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); - commitDir.setWritable(false); - Mutation rm = new Mutation("Keyspace1", bytes("k")); - rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); + CommitLog.handleCommitError("Test stop error", new Throwable()); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + } + } - // Adding it twice (won't change segment) - CommitLog.instance.add(rm); - Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS); - Assert.assertFalse(StorageService.instance.isRPCServerRunning()); - Assert.assertFalse(StorageService.instance.isNativeTransportRunning()); - Assert.assertFalse(StorageService.instance.isInitialized()); + @Test + public void testCommitFailurePolicy_die() + { + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); } finally { - commitDir.setWritable(true); + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java new file mode 100644 index 0000000..e2a5107 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java @@ -0,0 +1,51 @@ +package org.apache.cassandra.utils; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSReadError; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class JVMStabilityInspectorTest +{ + @Test + public void testKill() throws Exception + { + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + + Config.DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + Config.CommitFailurePolicy oldCommitPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + killerForTests.reset(); + JVMStabilityInspector.inspectThrowable(new IOException()); + assertFalse(killerForTests.wasKilled()); + + killerForTests.reset(); + JVMStabilityInspector.inspectThrowable(new OutOfMemoryError()); + assertTrue(killerForTests.wasKilled()); + + DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die); + killerForTests.reset(); + JVMStabilityInspector.inspectThrowable(new FSReadError(new IOException(), "blah")); + assertTrue(killerForTests.wasKilled()); + + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); + killerForTests.reset(); + JVMStabilityInspector.inspectCommitLogThrowable(new Throwable()); + assertTrue(killerForTests.wasKilled()); + } + finally + { + JVMStabilityInspector.replaceKiller(originalKiller); + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + DatabaseDescriptor.setCommitFailurePolicy(oldCommitPolicy); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/utils/KillerForTests.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java b/test/unit/org/apache/cassandra/utils/KillerForTests.java new file mode 100644 index 0000000..83cd7fc --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Responsible for stubbing out the System.exit() logic during unit tests. + */ +public class KillerForTests extends JVMStabilityInspector.Killer +{ + private boolean killed = false; + + @Override + protected void killCurrentJVM(Throwable t) + { + killed = true; + } + + public boolean wasKilled() + { + return killed; + } + + public void reset() + { + killed = false; + } +}