Repository: cassandra Updated Branches: refs/heads/trunk aed682513 -> 1e2f5244e
Add diag events for read repairs patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-14668 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e2f5244 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e2f5244 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e2f5244 Branch: refs/heads/trunk Commit: 1e2f5244e5e341f32d23872104fad3b55dbf0cb0 Parents: aed6825 Author: Stefan Podkowinski <stefan.podkowin...@1und1.de> Authored: Mon Aug 27 13:45:27 2018 +0200 Committer: Stefan Podkowinski <s.podkowin...@gmail.com> Committed: Fri Aug 31 19:40:57 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/reads/DigestResolver.java | 30 ++- .../reads/repair/AbstractReadRepair.java | 2 + .../reads/repair/BlockingPartitionRepair.java | 18 ++ .../reads/repair/PartitionRepairEvent.java | 102 ++++++++++ .../reads/repair/ReadRepairDiagnostics.java | 78 ++++++++ .../service/reads/repair/ReadRepairEvent.java | 114 +++++++++++ .../DiagEventsBlockingReadRepairTest.java | 192 +++++++++++++++++++ 8 files changed, 536 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e40cf27..d2d9c86 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add diagnostic events for read repairs (CASSANDRA-14668) * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) * Add sampler for query time and expose with nodetool (CASSANDRA-14436) * Clean up Message.Request implementations (CASSANDRA-14677) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index b2eb0c6..897892f 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -25,9 +25,10 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.service.reads.repair.ReadRepair; -import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.utils.ByteBufferUtil; public class DigestResolver extends ResponseResolver { @@ -82,4 +83,31 @@ public class DigestResolver extends ResponseResolver { return dataResponse != null; } + + public DigestResolverDebugResult[] getDigestsByEndpoint() + { + DigestResolverDebugResult[] ret = new DigestResolverDebugResult[responses.size()]; + for (int i = 0; i < responses.size(); i++) + { + MessageIn<ReadResponse> message = responses.get(i); + ReadResponse response = message.payload; + String digestHex = ByteBufferUtil.bytesToHex(response.digest(command)); + ret[i] = new DigestResolverDebugResult(message.from, digestHex, message.payload.isDigestResponse()); + } + return ret; + } + + public static class DigestResolverDebugResult + { + public InetAddressAndPort from; + public String digestHex; + public boolean isDigestResponse; + + private DigestResolverDebugResult(InetAddressAndPort from, String digestHex, boolean isDigestResponse) + { + this.from = from; + this.digestHex = digestHex; + this.isDigestResponse = isDigestResponse; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index a1cf827..7e3f0ae 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -122,6 +122,7 @@ public abstract class AbstractReadRepair implements ReadRepair Tracing.trace("Enqueuing full data read to {}", endpoint); sendReadCommand(endpoint, readCallback); } + ReadRepairDiagnostics.startRepair(this, contactedEndpoints, digestResolver, allEndpoints); } public void awaitReads() throws ReadTimeoutException @@ -167,6 +168,7 @@ public abstract class AbstractReadRepair implements ReadRepair Tracing.trace("Enqueuing speculative full data read to {}", endpoint); sendReadCommand(endpoint.get(), repair.readCallback); ReadRepairMetrics.speculatedRead.mark(); + ReadRepairDiagnostics.speculatedRead(this, endpoint.get(), candidates); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index eb402ba..8d69bef 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -161,6 +161,7 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I if (!shouldBlockOn(destination)) pendingRepairs.remove(destination); + ReadRepairDiagnostics.sendInitialRepair(this, destination, mutation); } } @@ -226,14 +227,31 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I if (mutation == null) { // the mutation is too large to send. + ReadRepairDiagnostics.speculatedWriteOversized(this, endpoint); continue; } Tracing.trace("Sending speculative read-repair-mutation to {}", endpoint); sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), endpoint); + ReadRepairDiagnostics.speculatedWrite(this, endpoint, mutation); } } + Keyspace getKeyspace() + { + return keyspace; + } + + DecoratedKey getKey() + { + return key; + } + + ConsistencyLevel getConsistency() + { + return consistency; + } + @VisibleForTesting protected Iterable<InetAddressAndPort> getCandidateEndpoints() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java new file mode 100644 index 0000000..04abbcf --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionRepairEvent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.ByteBufferUtil; + +final class PartitionRepairEvent extends DiagnosticEvent +{ + private final PartitionRepairEventType type; + @VisibleForTesting + final InetAddressAndPort destination; + @Nullable + private final Keyspace keyspace; + @Nullable + private final DecoratedKey key; + @Nullable + private final ConsistencyLevel consistency; + @Nullable + @VisibleForTesting + String mutationSummary; + + enum PartitionRepairEventType + { + SEND_INITIAL_REPAIRS, + SPECULATED_WRITE, + UPDATE_OVERSIZED + } + + PartitionRepairEvent(PartitionRepairEventType type, BlockingPartitionRepair partitionRepair, + InetAddressAndPort destination, Mutation mutation) + { + this.type = type; + this.destination = destination; + this.keyspace = partitionRepair.getKeyspace(); + this.consistency = partitionRepair.getConsistency(); + this.key = partitionRepair.getKey(); + if (mutation != null) + { + try + { + this.mutationSummary = mutation.toString(); + } + catch (Exception e) + { + this.mutationSummary = String.format("<Mutation.toString(): %s>", e.getMessage()); + } + } + } + + public PartitionRepairEventType getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (keyspace != null) ret.put("keyspace", keyspace.getName()); + if (key != null) + { + ret.put("key", key.getKey() == null ? "null" : ByteBufferUtil.bytesToHex(key.getKey())); + ret.put("token", key.getToken().toString()); + } + if (consistency != null) ret.put("consistency", consistency.name()); + + ret.put("destination", destination.toString()); + + if (mutationSummary != null) ret.put("mutation", mutationSummary); + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java new file mode 100644 index 0000000..1117822 --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.reads.DigestResolver; +import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType; +import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; + +final class ReadRepairDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private ReadRepairDiagnostics() + { + } + + static void startRepair(AbstractReadRepair readRepair, List<InetAddressAndPort> endpointDestinations, + DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints) + { + if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR)) + service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR, + readRepair, endpointDestinations, allEndpoints, digestResolver)); + } + + static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint, + Iterable<InetAddressAndPort> allEndpoints) + { + if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ)) + service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ, + readRepair, Collections.singletonList(endpoint), + Lists.newArrayList(allEndpoints), null)); + } + + static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation) + { + if (service.isEnabled(PartitionRepairEvent.class, PartitionRepairEventType.SEND_INITIAL_REPAIRS)) + service.publish(new PartitionRepairEvent(PartitionRepairEventType.SEND_INITIAL_REPAIRS, partitionRepair, + destination, mutation)); + } + + static void speculatedWrite(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation) + { + if (service.isEnabled(PartitionRepairEvent.class, PartitionRepairEventType.SPECULATED_WRITE)) + service.publish(new PartitionRepairEvent(PartitionRepairEventType.SPECULATED_WRITE, partitionRepair, + destination, mutation)); + } + + static void speculatedWriteOversized(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination) + { + if (service.isEnabled(PartitionRepairEvent.class, PartitionRepairEventType.UPDATE_OVERSIZED)) + service.publish(new PartitionRepairEvent(PartitionRepairEventType.UPDATE_OVERSIZED, partitionRepair, + destination, null)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java new file mode 100644 index 0000000..152f7e6 --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.reads.DigestResolver; +import org.apache.cassandra.service.reads.DigestResolver.DigestResolverDebugResult; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; + +final class ReadRepairEvent extends DiagnosticEvent +{ + + private final ReadRepairEventType type; + private final Keyspace keyspace; + private final String tableName; + private final String cqlCommand; + private final ConsistencyLevel consistency; + private final SpeculativeRetryPolicy.Kind speculativeRetry; + @VisibleForTesting + final List<InetAddressAndPort> destinations; + @VisibleForTesting + final List<InetAddressAndPort> allEndpoints; + @Nullable + private final DigestResolverDebugResult[] digestsByEndpoint; + + enum ReadRepairEventType + { + START_REPAIR, + SPECULATED_READ + } + + ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, List<InetAddressAndPort> destinations, + List<InetAddressAndPort> allEndpoints, DigestResolver digestResolver) + { + this.keyspace = readRepair.cfs.keyspace; + this.tableName = readRepair.cfs.getTableName(); + this.cqlCommand = readRepair.command.toCQLString(); + this.consistency = readRepair.consistency; + this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind(); + this.destinations = destinations; + this.allEndpoints = allEndpoints; + this.digestsByEndpoint = digestResolver != null ? digestResolver.getDigestsByEndpoint() : null; + this.type = type; + } + + public ReadRepairEventType getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + + ret.put("keyspace", keyspace.getName()); + ret.put("table", tableName); + ret.put("command", cqlCommand); + ret.put("consistency", consistency.name()); + ret.put("speculativeRetry", speculativeRetry.name()); + + Set<String> eps = destinations.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet()); + ret.put("endpointDestinations", new HashSet<>(eps)); + + if (digestsByEndpoint != null) + { + HashMap<String, Serializable> digestsMap = new HashMap<>(); + for (DigestResolverDebugResult digestsByEndpoint : digestsByEndpoint) + { + HashMap<String, Serializable> digests = new HashMap<>(); + digests.put("digestHex", digestsByEndpoint.digestHex); + digests.put("isDigestResponse", digestsByEndpoint.isDigestResponse); + digestsMap.put(digestsByEndpoint.from.toString(), digests); + } + ret.put("digestsByEndpoint", digestsMap); + } + if (allEndpoints != null) + { + eps = allEndpoints.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet()); + ret.put("allEndpoints", new HashSet<>(eps)); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2f5244/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java new file mode 100644 index 0000000..1f07c2b --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.OverrideConfigurationLoader; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; + +/** + * Variation of {@link BlockingReadRepair} using diagnostic events instead of instrumentation for test validation. + */ +public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest +{ + + @BeforeClass + public static void setUpClass() throws Throwable + { + OverrideConfigurationLoader.override((config) -> { + config.diagnostic_events_enabled = true; + }); + configureClass(ReadRepairStrategy.BLOCKING); + } + + @After + public void unsubscribeAll() + { + DiagnosticEventService.instance().cleanup(); + } + + @Test + public void additionalMutationRequired() + { + Mutation repair1 = mutation(cell2); + Mutation repair2 = mutation(cell1); + + // check that the correct repairs are calculated + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, repair1); + repairs.put(target2, repair2); + + + DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2); + + Assert.assertTrue(handler.updatesByEp.isEmpty()); + + // check that the correct mutations are sent + handler.sendInitialRepairs(); + Assert.assertEquals(2, handler.updatesByEp.size()); + + Assert.assertEquals(repair1.toString(), handler.updatesByEp.get(target1)); + Assert.assertEquals(repair2.toString(), handler.updatesByEp.get(target2)); + + // check that a combined mutation is speculatively sent to the 3rd target + handler.updatesByEp.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertEquals(1, handler.updatesByEp.size()); + Assert.assertEquals(resolved.toString(), handler.updatesByEp.get(target3)); + + // check repairs stop blocking after receiving 2 acks + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target1); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target3); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + } + + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + { + return new DiagnosticBlockingRepairHandler(command, queryStartNanoTime, consistency); + } + + private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor) + { + Set<InetAddressAndPort> participants = repairs.keySet(); + InetAddressAndPort[] participantArray = new InetAddressAndPort[participants.size()]; + participants.toArray(participantArray); + return new DiagnosticPartitionReadRepairHandler(ks, key, ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray); + } + + private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair + { + private Set<InetAddressAndPort> recipients = Collections.emptySet(); + private ReadCallback readCallback = null; + + DiagnosticBlockingRepairHandler(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + { + super(command, queryStartNanoTime, consistency); + DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, this::onRepairEvent); + } + + private void onRepairEvent(ReadRepairEvent e) + { + if (e.getType() == ReadRepairEventType.START_REPAIR) recipients = new HashSet<>(e.destinations); + else if (e.getType() == ReadRepairEventType.SPECULATED_READ) recipients.addAll(e.destinations); + Assert.assertEquals(targets, e.allEndpoints); + Assert.assertNotNull(e.toMap()); + } + + void sendReadCommand(InetAddressAndPort to, ReadCallback callback) + { + assert readCallback == null || readCallback == callback; + readCallback = callback; + } + + Iterable<InetAddressAndPort> getCandidatesForToken(Token token) + { + return targets; + } + + public Set<InetAddressAndPort> getReadRecipients() + { + return recipients; + } + + public ReadCallback getReadCallback() + { + return readCallback; + } + } + + private static class DiagnosticPartitionReadRepairHandler extends BlockingPartitionRepair + { + private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>(); + + DiagnosticPartitionReadRepairHandler(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants) + { + super(keyspace, key, consistency, repairs, maxBlockFor, participants); + DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent); + } + + private void onRepairEvent(PartitionRepairEvent e) + { + updatesByEp.put(e.destination, e.mutationSummary); + Assert.assertNotNull(e.toMap()); + } + + protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort endpoint) + { + } + + List<InetAddressAndPort> candidates = targets; + + protected List<InetAddressAndPort> getCandidateEndpoints() + { + return candidates; + } + + @Override + protected boolean isLocal(InetAddressAndPort endpoint) + { + return targets.contains(endpoint); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org