Repository: cassandra
Updated Branches:
  refs/heads/trunk ab7aa57b3 -> 2c6924b56


Add additional statistics for speculative retry.

Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-13373


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2c6924b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2c6924b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2c6924b5

Branch: refs/heads/trunk
Commit: 2c6924b561ddf0b0df9315946b21260d6e27fdb9
Parents: ab7aa57
Author: Ariel Weisberg <aweisb...@apple.com>
Authored: Thu Mar 23 19:25:54 2017 -0400
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Mon Apr 3 18:24:02 2017 -0400

----------------------------------------------------------------------
 doc/source/operating/metrics.rst                |   3 +
 .../cassandra/metrics/KeyspaceMetrics.java      |  52 +++++
 .../apache/cassandra/metrics/TableMetrics.java  |  12 ++
 .../cassandra/service/AbstractReadExecutor.java | 102 ++++++---
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../cassandra/service/ReadExecutorTest.java     | 215 +++++++++++++++++++
 6 files changed, 360 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index af2e36e..6e1b212 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -127,6 +127,9 @@ CasPropose                              Latency        
Latency of paxos propose
 CasCommit                               Latency        Latency of paxos commit 
round.
 PercentRepaired                         Gauge<Double>  Percent of table data 
that is repaired on disk.
 SpeculativeRetries                      Counter        Number of times 
speculative retries were sent for this table.
+SpeculativeFailedRetries                Counter        Number of speculative 
retries that failed to prevent a timeout
+SpeculativeInsufficientReplicas         Counter        Number of speculative 
retries that couldn't be attempted due to lack of replicas
+SpeculativeSampleLatencyNanos           Gauge<Long>    Number of nanoseconds 
to wait before speculation is attempted. Value may be statically configured or 
updated periodically based on coordinator latency.
 WaitingOnFreeMemtableSpace              Histogram      Histogram of time spent 
waiting for free memtable space, either on- or off-heap.
 DroppedMutations                        Counter        Number of dropped 
mutations on this table.
 ======================================= ============== ===========

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 63f8dd0..3c6b604 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -97,6 +97,12 @@ public class KeyspaceMetrics
     public final Counter writeFailedIdealCL;
     /** Ideal CL write latency metrics */
     public final LatencyMetrics idealCLWriteLatency;
+    /** Speculative retries **/
+    public final Counter speculativeRetries;
+    /** Speculative retry occured but still timed out **/
+    public final Counter speculativeFailedRetries;
+    /** Needed to speculate, but didn't have enough replicas **/
+    public final Counter speculativeInsufficientReplicas;
 
     public final MetricNameFactory factory;
     private Keyspace keyspace;
@@ -244,6 +250,28 @@ public class KeyspaceMetrics
         casCommit = new LatencyMetrics(factory, "CasCommit");
         writeFailedIdealCL = 
Metrics.counter(factory.createMetricName("WriteFailedIdealCL"));
         idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite");
+
+        speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new 
MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeRetries.getCount();
+            }
+        });
+        speculativeFailedRetries = 
createKeyspaceCounter("SpeculativeFailedRetries", new MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeFailedRetries.getCount();
+            }
+        });
+        speculativeInsufficientReplicas = 
createKeyspaceCounter("SpeculativeInsufficientReplicas", new MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeInsufficientReplicas.getCount();
+            }
+        });
     }
 
     /**
@@ -298,6 +326,30 @@ public class KeyspaceMetrics
         });
     }
 
+    /**
+     * Creates a counter that will sum the current value of a metric for all 
column families in this keyspace
+     * @param name
+     * @param extractor
+     * @return Counter that computes sum of MetricValue.getValue()
+     */
+    private Counter createKeyspaceCounter(String name, final MetricValue 
extractor)
+    {
+        allMetrics.add(name);
+        return Metrics.register(factory.createMetricName(name), new Counter()
+        {
+            @Override
+            public long getCount()
+            {
+                long sum = 0;
+                for (ColumnFamilyStore cf : keyspace.getColumnFamilyStores())
+                {
+                    sum += extractor.getValue(cf.metric);
+                }
+                return sum;
+            }
+        });
+    }
+
     static class KeyspaceMetricNameFactory implements MetricNameFactory
     {
         private final String keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 126abed..c4b0000 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -162,6 +162,9 @@ public class TableMetrics
     private static final MetricNameFactory globalAliasFactory = new 
AllTableMetricNameFactory("ColumnFamily");
 
     public final Counter speculativeRetries;
+    public final Counter speculativeFailedRetries;
+    public final Counter speculativeInsufficientReplicas;
+    public final Gauge<Long> speculativeSampleLatencyNanos;
 
     public final static LatencyMetrics globalReadLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Write");
@@ -633,6 +636,15 @@ public class TableMetrics
             }
         });
         speculativeRetries = createTableCounter("SpeculativeRetries");
+        speculativeFailedRetries = 
createTableCounter("SpeculativeFailedRetries");
+        speculativeInsufficientReplicas = 
createTableCounter("SpeculativeInsufficientReplicas");
+        speculativeSampleLatencyNanos = 
createTableGauge("SpeculativeSampleLatencyNanos", new Gauge<Long>()
+        {
+            public Long getValue()
+            {
+                return cfs.sampleLatencyNanos;
+            }
+        });
         keyCacheHitRate = 
Metrics.register(factory.createMetricName("KeyCacheHitRate"),
                                            
aliasFactory.createMetricName("KeyCacheHitRate"),
                                            new RatioGauge()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7a82187..956a40a 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -63,12 +63,14 @@ public abstract class AbstractReadExecutor
     protected final List<InetAddress> targetReplicas;
     protected final ReadCallback handler;
     protected final TraceState traceState;
+    protected final ColumnFamilyStore cfs;
 
-    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long 
queryStartNanoTime)
+    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, 
long queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
         this.handler = new ReadCallback(new DigestResolver(keyspace, command, 
consistencyLevel, targetReplicas.size()), consistencyLevel, command, 
targetReplicas, queryStartNanoTime);
+        this.cfs = cfs;
         this.traceState = Tracing.instance.get();
 
         // Set the digest version (if we request some digests). This is the 
smallest version amongst all our target replicas since new nodes
@@ -143,7 +145,21 @@ public abstract class AbstractReadExecutor
      */
     public PartitionIterator get() throws ReadFailureException, 
ReadTimeoutException, DigestMismatchException
     {
-        return handler.get();
+        try
+        {
+            return handler.get();
+        }
+        catch (ReadTimeoutException e)
+        {
+            try
+            {
+                onReadTimeout();
+            }
+            finally
+            {
+                throw e;
+            }
+        }
     }
 
     private static ReadRepairDecision newReadRepairDecision(TableMetadata 
metadata)
@@ -187,12 +203,16 @@ public abstract class AbstractReadExecutor
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
 
-        // Speculative retry is disabled *OR* there are simply no extra 
replicas to speculate.
+        // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to 
prevent miscounting DC responses
         if (retry.equals(SpeculativeRetryParam.NONE)
-            || consistencyLevel == ConsistencyLevel.EACH_QUORUM
-            || consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
+            | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, false);
+
+        // There are simply no extra replicas to speculate.
+        // Handle this separately so it can log failed attempts to speculate 
due to lack of replicas
+        if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, true);
 
         if (targetReplicas.size() == allReplicas.size())
         {
@@ -225,11 +245,34 @@ public abstract class AbstractReadExecutor
             return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
     }
 
+    /**
+     *  Returns true if speculation should occur and if it should then block 
until it is time to
+     *  send the speculative reads
+     */
+    boolean shouldSpeculateAndMaybeWait()
+    {
+        // no latency information, or we're overloaded
+        if (cfs.sampleLatencyNanos > 
TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+            return false;
+
+        return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+    }
+
+    void onReadTimeout() {}
+
     public static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, 
long queryStartNanoTime)
+        /**
+         * If never speculating due to lack of replicas
+         * log it is as a failure if it should have happened
+         * but couldn't due to lack of replicas
+         */
+        private final boolean logFailedSpeculation;
+
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddress> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
+            this.logFailedSpeculation = logFailedSpeculation;
         }
 
         public void executeAsync()
@@ -241,7 +284,10 @@ public abstract class AbstractReadExecutor
 
         public void maybeTryAdditionalReplicas()
         {
-            // no-op
+            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
+            {
+                cfs.metric.speculativeInsufficientReplicas.inc();
+            }
         }
 
         public Collection<InetAddress> getContactedReplicas()
@@ -250,9 +296,8 @@ public abstract class AbstractReadExecutor
         }
     }
 
-    private static class SpeculatingReadExecutor extends AbstractReadExecutor
+    static class SpeculatingReadExecutor extends AbstractReadExecutor
     {
-        private final ColumnFamilyStore cfs;
         private volatile boolean speculated = false;
 
         public SpeculatingReadExecutor(Keyspace keyspace,
@@ -262,8 +307,7 @@ public abstract class AbstractReadExecutor
                                        List<InetAddress> targetReplicas,
                                        long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-            this.cfs = cfs;
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
         }
 
         public void executeAsync()
@@ -293,12 +337,11 @@ public abstract class AbstractReadExecutor
 
         public void maybeTryAdditionalReplicas()
         {
-            // no latency information, or we're overloaded
-            if (cfs.sampleLatencyNanos > 
TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
-                return;
-
-            if (!handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
+            if (shouldSpeculateAndMaybeWait())
             {
+                //Handle speculation stats first in case the callback fires 
immediately
+                speculated = true;
+                cfs.metric.speculativeRetries.inc();
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
                 if (handler.resolver.isDataPresent())
@@ -309,9 +352,6 @@ public abstract class AbstractReadExecutor
                     traceState.trace("speculating read retry on {}", 
extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
                 
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), 
extraReplica, handler);
-                speculated = true;
-
-                cfs.metric.speculativeRetries.inc();
             }
         }
 
@@ -321,12 +361,19 @@ public abstract class AbstractReadExecutor
                  ? targetReplicas
                  : targetReplicas.subList(0, targetReplicas.size() - 1);
         }
+
+        @Override
+        void onReadTimeout()
+        {
+            //Shouldn't be possible to get here without first attempting to 
speculate even if the
+            //timing is bad
+            assert speculated;
+            cfs.metric.speculativeFailedRetries.inc();
+        }
     }
 
     private static class AlwaysSpeculatingReadExecutor extends 
AbstractReadExecutor
     {
-        private final ColumnFamilyStore cfs;
-
         public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
@@ -334,8 +381,7 @@ public abstract class AbstractReadExecutor
                                              List<InetAddress> targetReplicas,
                                              long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-            this.cfs = cfs;
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -356,5 +402,11 @@ public abstract class AbstractReadExecutor
                 makeDigestRequests(targetReplicas.subList(2, 
targetReplicas.size()));
             cfs.metric.speculativeRetries.inc();
         }
+
+        @Override
+        void onReadTimeout()
+        {
+            cfs.metric.speculativeFailedRetries.inc();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 2e75e1f..7ee6386 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -53,7 +53,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
 
     public final ResponseResolver resolver;
-    private final SimpleCondition condition = new SimpleCondition();
+    final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
     final int blockfor;
     final List<InetAddress> endpoints;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
new file mode 100644
index 0000000..fca8eca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ReadExecutorTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+    static List<InetAddress> targets;
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), 
SchemaLoader.standardCFMD("Foo", "Bar"));
+        ks = Keyspace.open("Foo");
+        cfs = ks.getColumnFamilyStore("Bar");
+        targets = ImmutableList.of(InetAddress.getByName("127.0.0.255"), 
InetAddress.getByName("127.0.0.254"), InetAddress.getByName("127.0.0.253"));
+        cfs.sampleLatencyNanos = 0;
+    }
+
+    @Before
+    public void resetCounters() throws Throwable
+    {
+        
cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
+        
cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
+        
cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
+    }
+
+    /**
+     * If speculation would have been beneficial but could not be attempted 
due to lack of replicas
+     * count that it occured
+     */
+    @Test
+    public void testUnableToSpeculate() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, 
System.nanoTime(), true);
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+
+        //Shouldn't increment
+        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, 
cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, 
targets, System.nanoTime(), false);
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+    }
+
+    /**
+     *  Test that speculation when it is attempted is countedc, and when it 
succeed
+     *  no failure is counted.
+     */
+    @Test
+    public void testSpeculateSucceeded() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(0, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), 
ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
+        executor.maybeTryAdditionalReplicas();
+        new Thread()
+        {
+            @Override
+            public void run()
+            {
+                //Failures end the read promptly but don't require mock data 
to be suppleid
+                executor.handler.onFailure(targets.get(0), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.onFailure(targets.get(1), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.condition.signalAll();
+            }
+        }.start();
+
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadFailureException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(1, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+
+    }
+
+    /**
+     * Test that speculation failure statistics are incremented if speculation 
occurs
+     * and the read still times out.
+     */
+    @Test
+    public void testSpeculateFailed() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(0, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, 
System.nanoTime());
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeRetries.getCount());
+        assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(1, ks.metric.speculativeRetries.getCount());
+        assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
+    }
+
+    public static class MockSinglePartitionReadCommand extends 
SinglePartitionReadCommand
+    {
+        private final long timeout;
+
+        MockSinglePartitionReadCommand()
+        {
+            this(0);
+        }
+
+        MockSinglePartitionReadCommand(long timeout)
+        {
+            super(false, 0, cfs.metadata(), 0, null, null, null, 
Util.dk("ry@n_luvs_teh_y@nk33z"), null);
+            this.timeout = timeout;
+        }
+
+        @Override
+        public long getTimeout()
+        {
+            return timeout;
+        }
+
+        @Override
+        public MessageOut createMessage()
+        {
+            return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
+            {
+                @Override
+                public int serializedSize(int version)
+                {
+                    return 0;
+                }
+            };
+        }
+
+    }
+
+}

Reply via email to