Repository: cassandra
Updated Branches:
  refs/heads/trunk aed682513 -> 1e2f5244e


Add diag events for read repairs

patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-14668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e2f5244
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e2f5244
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e2f5244

Branch: refs/heads/trunk
Commit: 1e2f5244e5e341f32d23872104fad3b55dbf0cb0
Parents: aed6825
Author: Stefan Podkowinski <stefan.podkowin...@1und1.de>
Authored: Mon Aug 27 13:45:27 2018 +0200
Committer: Stefan Podkowinski <s.podkowin...@gmail.com>
Committed: Fri Aug 31 19:40:57 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/reads/DigestResolver.java |  30 ++-
 .../reads/repair/AbstractReadRepair.java        |   2 +
 .../reads/repair/BlockingPartitionRepair.java   |  18 ++
 .../reads/repair/PartitionRepairEvent.java      | 102 ++++++++++
 .../reads/repair/ReadRepairDiagnostics.java     |  78 ++++++++
 .../service/reads/repair/ReadRepairEvent.java   | 114 +++++++++++
 .../DiagEventsBlockingReadRepairTest.java       | 192 +++++++++++++++++++
 8 files changed, 536 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e40cf27..d2d9c86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add diagnostic events for read repairs (CASSANDRA-14668)
  * Use consistent nowInSeconds and timestamps values within a request 
(CASSANDRA-14671)
  * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
  * Clean up Message.Request implementations (CASSANDRA-14677)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java 
b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index b2eb0c6..897892f 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -25,9 +25,10 @@ import com.google.common.base.Preconditions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
-import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class DigestResolver extends ResponseResolver
 {
@@ -82,4 +83,31 @@ public class DigestResolver extends ResponseResolver
     {
         return dataResponse != null;
     }
+
+    public DigestResolverDebugResult[] getDigestsByEndpoint()
+    {
+        DigestResolverDebugResult[] ret = new 
DigestResolverDebugResult[responses.size()];
+        for (int i = 0; i < responses.size(); i++)
+        {
+            MessageIn<ReadResponse> message = responses.get(i);
+            ReadResponse response = message.payload;
+            String digestHex = 
ByteBufferUtil.bytesToHex(response.digest(command));
+            ret[i] = new DigestResolverDebugResult(message.from, digestHex, 
message.payload.isDigestResponse());
+        }
+        return ret;
+    }
+
+    public static class DigestResolverDebugResult
+    {
+        public InetAddressAndPort from;
+        public String digestHex;
+        public boolean isDigestResponse;
+
+        private DigestResolverDebugResult(InetAddressAndPort from, String 
digestHex, boolean isDigestResponse)
+        {
+            this.from = from;
+            this.digestHex = digestHex;
+            this.isDigestResponse = isDigestResponse;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index a1cf827..7e3f0ae 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -122,6 +122,7 @@ public abstract class AbstractReadRepair implements 
ReadRepair
             Tracing.trace("Enqueuing full data read to {}", endpoint);
             sendReadCommand(endpoint, readCallback);
         }
+        ReadRepairDiagnostics.startRepair(this, contactedEndpoints, 
digestResolver, allEndpoints);
     }
 
     public void awaitReads() throws ReadTimeoutException
@@ -167,6 +168,7 @@ public abstract class AbstractReadRepair implements 
ReadRepair
                 Tracing.trace("Enqueuing speculative full data read to {}", 
endpoint);
                 sendReadCommand(endpoint.get(), repair.readCallback);
                 ReadRepairMetrics.speculatedRead.mark();
+                ReadRepairDiagnostics.speculatedRead(this, endpoint.get(), 
candidates);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index eb402ba..8d69bef 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -161,6 +161,7 @@ public class BlockingPartitionRepair extends 
AbstractFuture<Object> implements I
 
             if (!shouldBlockOn(destination))
                 pendingRepairs.remove(destination);
+            ReadRepairDiagnostics.sendInitialRepair(this, destination, 
mutation);
         }
     }
 
@@ -226,14 +227,31 @@ public class BlockingPartitionRepair extends 
AbstractFuture<Object> implements I
             if (mutation == null)
             {
                 // the mutation is too large to send.
+                ReadRepairDiagnostics.speculatedWriteOversized(this, endpoint);
                 continue;
             }
 
             Tracing.trace("Sending speculative read-repair-mutation to {}", 
endpoint);
             sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), 
endpoint);
+            ReadRepairDiagnostics.speculatedWrite(this, endpoint, mutation);
         }
     }
 
+    Keyspace getKeyspace()
+    {
+        return keyspace;
+    }
+
+    DecoratedKey getKey()
+    {
+        return key;
+    }
+
+    ConsistencyLevel getConsistency()
+    {
+        return consistency;
+    }
+
     @VisibleForTesting
     protected Iterable<InetAddressAndPort> getCandidateEndpoints()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java
new file mode 100644
index 0000000..04abbcf
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+final class PartitionRepairEvent extends DiagnosticEvent
+{
+    private final PartitionRepairEventType type;
+    @VisibleForTesting
+    final InetAddressAndPort destination;
+    @Nullable
+    private final Keyspace keyspace;
+    @Nullable
+    private final DecoratedKey key;
+    @Nullable
+    private final ConsistencyLevel consistency;
+    @Nullable
+    @VisibleForTesting
+    String mutationSummary;
+
+    enum PartitionRepairEventType
+    {
+        SEND_INITIAL_REPAIRS,
+        SPECULATED_WRITE,
+        UPDATE_OVERSIZED
+    }
+
+    PartitionRepairEvent(PartitionRepairEventType type, 
BlockingPartitionRepair partitionRepair,
+                         InetAddressAndPort destination, Mutation mutation)
+    {
+        this.type = type;
+        this.destination = destination;
+        this.keyspace = partitionRepair.getKeyspace();
+        this.consistency = partitionRepair.getConsistency();
+        this.key = partitionRepair.getKey();
+        if (mutation != null)
+        {
+            try
+            {
+                this.mutationSummary = mutation.toString();
+            }
+            catch (Exception e)
+            {
+                this.mutationSummary = String.format("<Mutation.toString(): 
%s>", e.getMessage());
+            }
+        }
+    }
+
+    public PartitionRepairEventType getType()
+    {
+        return type;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (keyspace != null) ret.put("keyspace", keyspace.getName());
+        if (key != null)
+        {
+            ret.put("key", key.getKey() == null ? "null" : 
ByteBufferUtil.bytesToHex(key.getKey()));
+            ret.put("token", key.getToken().toString());
+        }
+        if (consistency != null) ret.put("consistency", consistency.name());
+
+        ret.put("destination", destination.toString());
+
+        if (mutationSummary != null) ret.put("mutation", mutationSummary);
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
new file mode 100644
index 0000000..1117822
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import 
org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
+import 
org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
+
+final class ReadRepairDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private ReadRepairDiagnostics()
+    {
+    }
+
+    static void startRepair(AbstractReadRepair readRepair, 
List<InetAddressAndPort> endpointDestinations,
+                            DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints)
+    {
+        if (service.isEnabled(ReadRepairEvent.class, 
ReadRepairEventType.START_REPAIR))
+            service.publish(new 
ReadRepairEvent(ReadRepairEventType.START_REPAIR,
+                                                readRepair, 
endpointDestinations, allEndpoints, digestResolver));
+    }
+
+    static void speculatedRead(AbstractReadRepair readRepair, 
InetAddressAndPort endpoint,
+                               Iterable<InetAddressAndPort> allEndpoints)
+    {
+        if (service.isEnabled(ReadRepairEvent.class, 
ReadRepairEventType.SPECULATED_READ))
+            service.publish(new 
ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
+                                                readRepair, 
Collections.singletonList(endpoint),
+                                                
Lists.newArrayList(allEndpoints), null));
+    }
+
+    static void sendInitialRepair(BlockingPartitionRepair partitionRepair, 
InetAddressAndPort destination, Mutation mutation)
+    {
+        if (service.isEnabled(PartitionRepairEvent.class, 
PartitionRepairEventType.SEND_INITIAL_REPAIRS))
+            service.publish(new 
PartitionRepairEvent(PartitionRepairEventType.SEND_INITIAL_REPAIRS, 
partitionRepair,
+                                                     destination, mutation));
+    }
+
+    static void speculatedWrite(BlockingPartitionRepair partitionRepair, 
InetAddressAndPort destination, Mutation mutation)
+    {
+        if (service.isEnabled(PartitionRepairEvent.class, 
PartitionRepairEventType.SPECULATED_WRITE))
+            service.publish(new 
PartitionRepairEvent(PartitionRepairEventType.SPECULATED_WRITE, partitionRepair,
+                                                     destination, mutation));
+    }
+
+    static void speculatedWriteOversized(BlockingPartitionRepair 
partitionRepair, InetAddressAndPort destination)
+    {
+        if (service.isEnabled(PartitionRepairEvent.class, 
PartitionRepairEventType.UPDATE_OVERSIZED))
+            service.publish(new 
PartitionRepairEvent(PartitionRepairEventType.UPDATE_OVERSIZED, partitionRepair,
+                                                     destination, null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
new file mode 100644
index 0000000..152f7e6
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import 
org.apache.cassandra.service.reads.DigestResolver.DigestResolverDebugResult;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+
+final class ReadRepairEvent extends DiagnosticEvent
+{
+
+    private final ReadRepairEventType type;
+    private final Keyspace keyspace;
+    private final String tableName;
+    private final String cqlCommand;
+    private final ConsistencyLevel consistency;
+    private final SpeculativeRetryPolicy.Kind speculativeRetry;
+    @VisibleForTesting
+    final List<InetAddressAndPort> destinations;
+    @VisibleForTesting
+    final List<InetAddressAndPort> allEndpoints;
+    @Nullable
+    private final DigestResolverDebugResult[] digestsByEndpoint;
+
+    enum ReadRepairEventType
+    {
+        START_REPAIR,
+        SPECULATED_READ
+    }
+
+    ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, 
List<InetAddressAndPort> destinations,
+                    List<InetAddressAndPort> allEndpoints, DigestResolver 
digestResolver)
+    {
+        this.keyspace = readRepair.cfs.keyspace;
+        this.tableName = readRepair.cfs.getTableName();
+        this.cqlCommand = readRepair.command.toCQLString();
+        this.consistency = readRepair.consistency;
+        this.speculativeRetry = 
readRepair.cfs.metadata().params.speculativeRetry.kind();
+        this.destinations = destinations;
+        this.allEndpoints = allEndpoints;
+        this.digestsByEndpoint = digestResolver != null ? 
digestResolver.getDigestsByEndpoint() : null;
+        this.type = type;
+    }
+
+    public ReadRepairEventType getType()
+    {
+        return type;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+
+        ret.put("keyspace", keyspace.getName());
+        ret.put("table", tableName);
+        ret.put("command", cqlCommand);
+        ret.put("consistency", consistency.name());
+        ret.put("speculativeRetry", speculativeRetry.name());
+
+        Set<String> eps = 
destinations.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet());
+        ret.put("endpointDestinations", new HashSet<>(eps));
+
+        if (digestsByEndpoint != null)
+        {
+            HashMap<String, Serializable> digestsMap = new HashMap<>();
+            for (DigestResolverDebugResult digestsByEndpoint : 
digestsByEndpoint)
+            {
+                HashMap<String, Serializable> digests = new HashMap<>();
+                digests.put("digestHex", digestsByEndpoint.digestHex);
+                digests.put("isDigestResponse", 
digestsByEndpoint.isDigestResponse);
+                digestsMap.put(digestsByEndpoint.from.toString(), digests);
+            }
+            ret.put("digestsByEndpoint", digestsMap);
+        }
+        if (allEndpoints != null)
+        {
+            eps = 
allEndpoints.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet());
+            ret.put("allEndpoints", new HashSet<>(eps));
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
new file mode 100644
index 0000000..1f07c2b
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.OverrideConfigurationLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.service.reads.ReadCallback;
+import 
org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
+
+/**
+ * Variation of {@link BlockingReadRepair} using diagnostic events instead of 
instrumentation for test validation.
+ */
+public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
+{
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        OverrideConfigurationLoader.override((config) -> {
+            config.diagnostic_events_enabled = true;
+        });
+        configureClass(ReadRepairStrategy.BLOCKING);
+    }
+
+    @After
+    public void unsubscribeAll()
+    {
+        DiagnosticEventService.instance().cleanup();
+    }
+
+    @Test
+    public void additionalMutationRequired()
+    {
+        Mutation repair1 = mutation(cell2);
+        Mutation repair2 = mutation(cell1);
+
+        // check that the correct repairs are calculated
+        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, repair1);
+        repairs.put(target2, repair2);
+
+
+        DiagnosticPartitionReadRepairHandler handler = 
createRepairHandler(repairs, 2);
+
+        Assert.assertTrue(handler.updatesByEp.isEmpty());
+
+        // check that the correct mutations are sent
+        handler.sendInitialRepairs();
+        Assert.assertEquals(2, handler.updatesByEp.size());
+
+        Assert.assertEquals(repair1.toString(), 
handler.updatesByEp.get(target1));
+        Assert.assertEquals(repair2.toString(), 
handler.updatesByEp.get(target2));
+
+        // check that a combined mutation is speculatively sent to the 3rd 
target
+        handler.updatesByEp.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertEquals(1, handler.updatesByEp.size());
+        Assert.assertEquals(resolved.toString(), 
handler.updatesByEp.get(target3));
+
+        // check repairs stop blocking after receiving 2 acks
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target1);
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target3);
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+    }
+
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, long queryStartNanoTime, ConsistencyLevel consistency)
+    {
+        return new DiagnosticBlockingRepairHandler(command, 
queryStartNanoTime, consistency);
+    }
+
+    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor)
+    {
+        Set<InetAddressAndPort> participants = repairs.keySet();
+        InetAddressAndPort[] participantArray = new 
InetAddressAndPort[participants.size()];
+        participants.toArray(participantArray);
+        return new DiagnosticPartitionReadRepairHandler(ks, key, 
ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray);
+    }
+
+    private static class DiagnosticBlockingRepairHandler extends 
BlockingReadRepair implements InstrumentedReadRepair
+    {
+        private Set<InetAddressAndPort> recipients = Collections.emptySet();
+        private ReadCallback readCallback = null;
+
+        DiagnosticBlockingRepairHandler(ReadCommand command, long 
queryStartNanoTime, ConsistencyLevel consistency)
+        {
+            super(command, queryStartNanoTime, consistency);
+            DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, 
this::onRepairEvent);
+        }
+
+        private void onRepairEvent(ReadRepairEvent e)
+        {
+            if (e.getType() == ReadRepairEventType.START_REPAIR) recipients = 
new HashSet<>(e.destinations);
+            else if (e.getType() == ReadRepairEventType.SPECULATED_READ) 
recipients.addAll(e.destinations);
+            Assert.assertEquals(targets, e.allEndpoints);
+            Assert.assertNotNull(e.toMap());
+        }
+
+        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        {
+            assert readCallback == null || readCallback == callback;
+            readCallback = callback;
+        }
+
+        Iterable<InetAddressAndPort> getCandidatesForToken(Token token)
+        {
+            return targets;
+        }
+
+        public Set<InetAddressAndPort> getReadRecipients()
+        {
+            return recipients;
+        }
+
+        public ReadCallback getReadCallback()
+        {
+            return readCallback;
+        }
+    }
+
+    private static class DiagnosticPartitionReadRepairHandler extends 
BlockingPartitionRepair
+    {
+        private final Map<InetAddressAndPort, String> updatesByEp = new 
HashMap<>();
+
+        DiagnosticPartitionReadRepairHandler(Keyspace keyspace, DecoratedKey 
key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, 
int maxBlockFor, InetAddressAndPort[] participants)
+        {
+            super(keyspace, key, consistency, repairs, maxBlockFor, 
participants);
+            
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, 
this::onRepairEvent);
+        }
+
+        private void onRepairEvent(PartitionRepairEvent e)
+        {
+            updatesByEp.put(e.destination, e.mutationSummary);
+            Assert.assertNotNull(e.toMap());
+        }
+
+        protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort 
endpoint)
+        {
+        }
+
+        List<InetAddressAndPort> candidates = targets;
+
+        protected List<InetAddressAndPort> getCandidateEndpoints()
+        {
+            return candidates;
+        }
+
+        @Override
+        protected boolean isLocal(InetAddressAndPort endpoint)
+        {
+            return targets.contains(endpoint);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to