Repository: cassandra Updated Branches: refs/heads/trunk ab7aa57b3 -> 2c6924b56
Add additional statistics for speculative retry. Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-13373 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2c6924b5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2c6924b5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2c6924b5 Branch: refs/heads/trunk Commit: 2c6924b561ddf0b0df9315946b21260d6e27fdb9 Parents: ab7aa57 Author: Ariel Weisberg <aweisb...@apple.com> Authored: Thu Mar 23 19:25:54 2017 -0400 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Mon Apr 3 18:24:02 2017 -0400 ---------------------------------------------------------------------- doc/source/operating/metrics.rst | 3 + .../cassandra/metrics/KeyspaceMetrics.java | 52 +++++ .../apache/cassandra/metrics/TableMetrics.java | 12 ++ .../cassandra/service/AbstractReadExecutor.java | 102 ++++++--- .../apache/cassandra/service/ReadCallback.java | 2 +- .../cassandra/service/ReadExecutorTest.java | 215 +++++++++++++++++++ 6 files changed, 360 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/doc/source/operating/metrics.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst index af2e36e..6e1b212 100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@ -127,6 +127,9 @@ CasPropose Latency Latency of paxos propose CasCommit Latency Latency of paxos commit round. PercentRepaired Gauge<Double> Percent of table data that is repaired on disk. SpeculativeRetries Counter Number of times speculative retries were sent for this table. +SpeculativeFailedRetries Counter Number of speculative retries that failed to prevent a timeout +SpeculativeInsufficientReplicas Counter Number of speculative retries that couldn't be attempted due to lack of replicas +SpeculativeSampleLatencyNanos Gauge<Long> Number of nanoseconds to wait before speculation is attempted. Value may be statically configured or updated periodically based on coordinator latency. WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap. DroppedMutations Counter Number of dropped mutations on this table. ======================================= ============== =========== http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 63f8dd0..3c6b604 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -97,6 +97,12 @@ public class KeyspaceMetrics public final Counter writeFailedIdealCL; /** Ideal CL write latency metrics */ public final LatencyMetrics idealCLWriteLatency; + /** Speculative retries **/ + public final Counter speculativeRetries; + /** Speculative retry occured but still timed out **/ + public final Counter speculativeFailedRetries; + /** Needed to speculate, but didn't have enough replicas **/ + public final Counter speculativeInsufficientReplicas; public final MetricNameFactory factory; private Keyspace keyspace; @@ -244,6 +250,28 @@ public class KeyspaceMetrics casCommit = new LatencyMetrics(factory, "CasCommit"); writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL")); idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite"); + + speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new MetricValue() + { + public Long getValue(TableMetrics metric) + { + return metric.speculativeRetries.getCount(); + } + }); + speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", new MetricValue() + { + public Long getValue(TableMetrics metric) + { + return metric.speculativeFailedRetries.getCount(); + } + }); + speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", new MetricValue() + { + public Long getValue(TableMetrics metric) + { + return metric.speculativeInsufficientReplicas.getCount(); + } + }); } /** @@ -298,6 +326,30 @@ public class KeyspaceMetrics }); } + /** + * Creates a counter that will sum the current value of a metric for all column families in this keyspace + * @param name + * @param extractor + * @return Counter that computes sum of MetricValue.getValue() + */ + private Counter createKeyspaceCounter(String name, final MetricValue extractor) + { + allMetrics.add(name); + return Metrics.register(factory.createMetricName(name), new Counter() + { + @Override + public long getCount() + { + long sum = 0; + for (ColumnFamilyStore cf : keyspace.getColumnFamilyStores()) + { + sum += extractor.getValue(cf.metric); + } + return sum; + } + }); + } + static class KeyspaceMetricNameFactory implements MetricNameFactory { private final String keyspaceName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 126abed..c4b0000 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -162,6 +162,9 @@ public class TableMetrics private static final MetricNameFactory globalAliasFactory = new AllTableMetricNameFactory("ColumnFamily"); public final Counter speculativeRetries; + public final Counter speculativeFailedRetries; + public final Counter speculativeInsufficientReplicas; + public final Gauge<Long> speculativeSampleLatencyNanos; public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read"); public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); @@ -633,6 +636,15 @@ public class TableMetrics } }); speculativeRetries = createTableCounter("SpeculativeRetries"); + speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries"); + speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas"); + speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", new Gauge<Long>() + { + public Long getValue() + { + return cfs.sampleLatencyNanos; + } + }); keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"), aliasFactory.createMetricName("KeyCacheHitRate"), new RatioGauge() http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 7a82187..956a40a 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -63,12 +63,14 @@ public abstract class AbstractReadExecutor protected final List<InetAddress> targetReplicas; protected final ReadCallback handler; protected final TraceState traceState; + protected final ColumnFamilyStore cfs; - AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) + AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) { this.command = command; this.targetReplicas = targetReplicas; this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime); + this.cfs = cfs; this.traceState = Tracing.instance.get(); // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes @@ -143,7 +145,21 @@ public abstract class AbstractReadExecutor */ public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException { - return handler.get(); + try + { + return handler.get(); + } + catch (ReadTimeoutException e) + { + try + { + onReadTimeout(); + } + finally + { + throw e; + } + } } private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata) @@ -187,12 +203,16 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry; - // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. + // Speculative retry is disabled *OR* // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses if (retry.equals(SpeculativeRetryParam.NONE) - || consistencyLevel == ConsistencyLevel.EACH_QUORUM - || consistencyLevel.blockFor(keyspace) == allReplicas.size()) - return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); + | consistencyLevel == ConsistencyLevel.EACH_QUORUM) + return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, false); + + // There are simply no extra replicas to speculate. + // Handle this separately so it can log failed attempts to speculate due to lack of replicas + if (consistencyLevel.blockFor(keyspace) == allReplicas.size()) + return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, true); if (targetReplicas.size() == allReplicas.size()) { @@ -225,11 +245,34 @@ public abstract class AbstractReadExecutor return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); } + /** + * Returns true if speculation should occur and if it should then block until it is time to + * send the speculative reads + */ + boolean shouldSpeculateAndMaybeWait() + { + // no latency information, or we're overloaded + if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout())) + return false; + + return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS); + } + + void onReadTimeout() {} + public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { - public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) + /** + * If never speculating due to lack of replicas + * log it is as a failure if it should have happened + * but couldn't due to lack of replicas + */ + private final boolean logFailedSpeculation; + + public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation) { - super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + this.logFailedSpeculation = logFailedSpeculation; } public void executeAsync() @@ -241,7 +284,10 @@ public abstract class AbstractReadExecutor public void maybeTryAdditionalReplicas() { - // no-op + if (shouldSpeculateAndMaybeWait() && logFailedSpeculation) + { + cfs.metric.speculativeInsufficientReplicas.inc(); + } } public Collection<InetAddress> getContactedReplicas() @@ -250,9 +296,8 @@ public abstract class AbstractReadExecutor } } - private static class SpeculatingReadExecutor extends AbstractReadExecutor + static class SpeculatingReadExecutor extends AbstractReadExecutor { - private final ColumnFamilyStore cfs; private volatile boolean speculated = false; public SpeculatingReadExecutor(Keyspace keyspace, @@ -262,8 +307,7 @@ public abstract class AbstractReadExecutor List<InetAddress> targetReplicas, long queryStartNanoTime) { - super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); - this.cfs = cfs; + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); } public void executeAsync() @@ -293,12 +337,11 @@ public abstract class AbstractReadExecutor public void maybeTryAdditionalReplicas() { - // no latency information, or we're overloaded - if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout())) - return; - - if (!handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS)) + if (shouldSpeculateAndMaybeWait()) { + //Handle speculation stats first in case the callback fires immediately + speculated = true; + cfs.metric.speculativeRetries.inc(); // Could be waiting on the data, or on enough digests. ReadCommand retryCommand = command; if (handler.resolver.isDataPresent()) @@ -309,9 +352,6 @@ public abstract class AbstractReadExecutor traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); - speculated = true; - - cfs.metric.speculativeRetries.inc(); } } @@ -321,12 +361,19 @@ public abstract class AbstractReadExecutor ? targetReplicas : targetReplicas.subList(0, targetReplicas.size() - 1); } + + @Override + void onReadTimeout() + { + //Shouldn't be possible to get here without first attempting to speculate even if the + //timing is bad + assert speculated; + cfs.metric.speculativeFailedRetries.inc(); + } } private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor { - private final ColumnFamilyStore cfs; - public AlwaysSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, @@ -334,8 +381,7 @@ public abstract class AbstractReadExecutor List<InetAddress> targetReplicas, long queryStartNanoTime) { - super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime); - this.cfs = cfs; + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); } public void maybeTryAdditionalReplicas() @@ -356,5 +402,11 @@ public abstract class AbstractReadExecutor makeDigestRequests(targetReplicas.subList(2, targetReplicas.size())); cfs.metric.speculativeRetries.inc(); } + + @Override + void onReadTimeout() + { + cfs.metric.speculativeFailedRetries.inc(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 2e75e1f..7ee6386 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -53,7 +53,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); public final ResponseResolver resolver; - private final SimpleCondition condition = new SimpleCondition(); + final SimpleCondition condition = new SimpleCondition(); private final long queryStartNanoTime; final int blockfor; final List<InetAddress> endpoints; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/test/unit/org/apache/cassandra/service/ReadExecutorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java new file mode 100644 index 0000000..fca8eca --- /dev/null +++ b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ReadExecutorTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + static List<InetAddress> targets; + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar")); + ks = Keyspace.open("Foo"); + cfs = ks.getColumnFamilyStore("Bar"); + targets = ImmutableList.of(InetAddress.getByName("127.0.0.255"), InetAddress.getByName("127.0.0.254"), InetAddress.getByName("127.0.0.253")); + cfs.sampleLatencyNanos = 0; + } + + @Before + public void resetCounters() throws Throwable + { + cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount()); + cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount()); + cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount()); + } + + /** + * If speculation would have been beneficial but could not be attempted due to lack of replicas + * count that it occured + */ + @Test + public void testUnableToSpeculate() throws Throwable + { + assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount()); + + //Shouldn't increment + executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount()); + } + + /** + * Test that speculation when it is attempted is countedc, and when it succeed + * no failure is counted. + */ + @Test + public void testSpeculateSucceeded() throws Throwable + { + assertEquals(0, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(0, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + executor.maybeTryAdditionalReplicas(); + new Thread() + { + @Override + public void run() + { + //Failures end the read promptly but don't require mock data to be suppleid + executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.condition.signalAll(); + } + }.start(); + + try + { + executor.get(); + fail(); + } + catch (ReadFailureException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(1, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + + } + + /** + * Test that speculation failure statistics are incremented if speculation occurs + * and the read still times out. + */ + @Test + public void testSpeculateFailed() throws Throwable + { + assertEquals(0, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(0, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeRetries.getCount()); + assertEquals(1, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(1, ks.metric.speculativeRetries.getCount()); + assertEquals(1, ks.metric.speculativeFailedRetries.getCount()); + } + + public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand + { + private final long timeout; + + MockSinglePartitionReadCommand() + { + this(0); + } + + MockSinglePartitionReadCommand(long timeout) + { + super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null); + this.timeout = timeout; + } + + @Override + public long getTimeout() + { + return timeout; + } + + @Override + public MessageOut createMessage() + { + return new MessageOut(MessagingService.Verb.BATCH_REMOVE) + { + @Override + public int serializedSize(int version) + { + return 0; + } + }; + } + + } + +}