http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 545ad28..7a000b7 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -21,9 +21,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; @@ -36,34 +36,41 @@ import org.apache.cassandra.dht.Token; */ public class SimpleStrategy extends AbstractReplicationStrategy { + private final ReplicationFactor rf; + public SimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) { super(keyspaceName, tokenMetadata, snitch, configOptions); + this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor")); } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - int replicas = getReplicationFactor(); - ArrayList<Token> tokens = metadata.sortedTokens(); - List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas); + ArrayList<Token> ring = metadata.sortedTokens(); + if (ring.isEmpty()) + return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken())); + + Token replicaEnd = TokenMetadata.firstToken(ring, token); + Token replicaStart = metadata.getPredecessor(replicaEnd); + Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd); + Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false); - if (tokens.isEmpty()) - return endpoints; + EndpointsForRange.Builder replicas = EndpointsForRange.builder(replicaRange, rf.allReplicas); // Add the token at the index by default - Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false); - while (endpoints.size() < replicas && iter.hasNext()) + while (replicas.size() < rf.allReplicas && iter.hasNext()) { - InetAddressAndPort ep = metadata.getEndpoint(iter.next()); - if (!endpoints.contains(ep)) - endpoints.add(ep); + Token tk = iter.next(); + InetAddressAndPort ep = metadata.getEndpoint(tk); + if (!replicas.containsEndpoint(ep)) + replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas)); } - return endpoints; + return replicas.build(); } - public int getReplicationFactor() + public ReplicationFactor getReplicationFactor() { - return Integer.parseInt(this.configOptions.get("replication_factor")); + return rf; } public void validateOptions() throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SystemReplicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java new file mode 100644 index 0000000..13a9d74 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +public class SystemReplicas +{ + private static final Map<InetAddressAndPort, Replica> systemReplicas = new ConcurrentHashMap<>(); + public static final Range<Token> FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), + DatabaseDescriptor.getPartitioner().getMinimumToken()); + + private static Replica createSystemReplica(InetAddressAndPort endpoint) + { + return new Replica(endpoint, FULL_RANGE, true); + } + + /** + * There are a few places where a system function borrows write path functionality, but doesn't otherwise + * fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance + * @param endpoint + * @return + */ + public static Replica getSystemReplica(InetAddressAndPort endpoint) + { + return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica); + } + + public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints) + { + List<Replica> replicas = new ArrayList<>(endpoints.size()); + for (InetAddressAndPort endpoint: endpoints) + { + replicas.add(getSystemReplica(endpoint)); + } + return replicas; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 46c191f..4ab34db 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,7 @@ public class TokenMetadata // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} + // NOTE: this may contain ranges that conflict with the those implied by sortedTokens when a range is changing its transient status private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>(); // nodes which are migrating to the new tokens in the ring @@ -733,24 +735,20 @@ public class TokenMetadata return sortedTokens; } - public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName) + public EndpointsByRange getPendingRangesMM(String keyspaceName) { - Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create(); + EndpointsByRange.Mutable byRange = new EndpointsByRange.Mutable(); PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps != null) { - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps) + for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : pendingRangeMaps) { - Range<Token> range = entry.getKey(); - for (InetAddressAndPort address : entry.getValue()) - { - map.put(range, address); - } + byRange.putAll(entry.getKey(), entry.getValue(), Conflict.ALL); } } - return map; + return byRange.asImmutableView(); } /** a mutable map may be returned but caller should not modify it */ @@ -759,17 +757,18 @@ public class TokenMetadata return this.pendingRanges.get(keyspaceName); } - public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint) + public RangesAtEndpoint getPendingRanges(String keyspaceName, InetAddressAndPort endpoint) { - List<Range<Token>> ranges = new ArrayList<>(); - for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries()) + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); + for (Map.Entry<Range<Token>, Replica> entry : getPendingRangesMM(keyspaceName).flattenEntries()) { - if (entry.getValue().equals(endpoint)) + Replica replica = entry.getValue(); + if (replica.endpoint().equals(endpoint)) { - ranges.add(entry.getKey()); + builder.add(replica); } } - return ranges; + return builder.build(); } /** @@ -858,25 +857,27 @@ public class TokenMetadata { PendingRangeMaps newPendingRanges = new PendingRangeMaps(); - Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata); + RangesByEndpoint addressRanges = strategy.getAddressReplicas(metadata); // Copy of metadata reflecting the situation after all leave operations are finished. TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints); // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); + Set<Range<Token>> removeAffectedRanges = new HashSet<>(); for (InetAddressAndPort endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); + removeAffectedRanges.addAll(addressRanges.get(endpoint).ranges()); // for each of those ranges, find what new nodes will be responsible for the range when // all leaving nodes are gone. - for (Range<Token> range : affectedRanges) + for (Range<Token> range : removeAffectedRanges) { - Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints)) + EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata); + EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata); + for (Replica replica : newReplicas) { - newPendingRanges.addPendingRange(range, address); + if (currentReplicas.endpoints().contains(replica.endpoint())) + continue; + newPendingRanges.addPendingRange(range, replica); } } @@ -891,9 +892,9 @@ public class TokenMetadata Collection<Token> tokens = bootstrapAddresses.get(endpoint); allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) { - newPendingRanges.addPendingRange(range, endpoint); + newPendingRanges.addPendingRange(replica.range(), replica); } allLeftMetadata.removeEndpoint(endpoint); } @@ -906,38 +907,43 @@ public class TokenMetadata for (Pair<Token, InetAddressAndPort> moving : movingEndpoints) { //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. - Set<Range<Token>> moveAffectedRanges = new HashSet<>(); + Set<Replica> moveAffectedReplicas = new HashSet<>(); InetAddressAndPort endpoint = moving.right; // address of the moving node //Add ranges before the move - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) { - moveAffectedRanges.add(range); + moveAffectedReplicas.add(replica); } allLeftMetadata.updateNormalToken(moving.left, endpoint); //Add ranges after the move - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) { - moveAffectedRanges.add(range); + moveAffectedReplicas.add(replica); } - for(Range<Token> range : moveAffectedRanges) + for (Replica replica : moveAffectedReplicas) { - Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + Set<InetAddressAndPort> currentEndpoints = strategy.calculateNaturalReplicas(replica.range().right, metadata).endpoints(); + Set<InetAddressAndPort> newEndpoints = strategy.calculateNaturalReplicas(replica.range().right, allLeftMetadata).endpoints(); Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints); - for(final InetAddressAndPort address : difference) + for (final InetAddressAndPort address : difference) { - Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); - Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); - //We want to get rid of any ranges which the node is currently getting. - newRanges.removeAll(oldRanges); + RangesAtEndpoint newReplicas = strategy.getAddressReplicas(allLeftMetadata, address); + RangesAtEndpoint oldReplicas = strategy.getAddressReplicas(metadata, address); - for(Range<Token> newRange : newRanges) + // Filter out the things that are already replicated + newReplicas = newReplicas.filter(r -> !oldReplicas.contains(r)); + for (Replica newReplica : newReplicas) { - for(Range<Token> pendingRange : newRange.subtractAll(oldRanges)) + // for correctness on write, we need to treat ranges that are becoming full differently + // to those that are presently transient; however reads must continue to use the current view + // for ranges that are becoming transient. We could choose to ignore them here, but it's probably + // cleaner to ensure this is dealt with at point of use, where we can make a conscious decision + // about which to use + for (Replica pendingReplica : newReplica.subtractSameReplication(oldReplicas)) { - newPendingRanges.addPendingRange(pendingRange, address); + newPendingRanges.addPendingRange(pendingReplica.range(), pendingReplica); } } } @@ -1206,11 +1212,11 @@ public class TokenMetadata return sb.toString(); } - public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName) + public EndpointsForToken pendingEndpointsForToken(Token token, String keyspaceName) { PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps == null) - return Collections.emptyList(); + return EndpointsForToken.empty(token); return pendingRangeMaps.pendingEndpointsFor(token); } @@ -1218,9 +1224,15 @@ public class TokenMetadata /** * @deprecated retained for benefit of old tests */ - public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints) + public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural) { - return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName))); + EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName); + if (Endpoints.haveConflicts(natural, pending)) + { + natural = Endpoints.resolveConflictsInNatural(natural, pending); + pending = Endpoints.resolveConflictsInPending(natural, pending); + } + return Endpoints.concat(natural, pending); } /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 9e8d542..5a90804 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -102,6 +102,8 @@ public class KeyspaceMetrics public final Counter speculativeFailedRetries; /** Needed to speculate, but didn't have enough replicas **/ public final Counter speculativeInsufficientReplicas; + /** Needed to write to a transient replica to satisfy quorum **/ + public final Counter speculativeWrites; /** Number of started repairs as coordinator on this keyspace */ public final Counter repairsStarted; /** Number of completed repairs as coordinator on this keyspace */ @@ -268,41 +270,12 @@ public class KeyspaceMetrics writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL")); idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite"); - speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new MetricValue() - { - public Long getValue(TableMetrics metric) - { - return metric.speculativeRetries.getCount(); - } - }); - speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", new MetricValue() - { - public Long getValue(TableMetrics metric) - { - return metric.speculativeFailedRetries.getCount(); - } - }); - speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", new MetricValue() - { - public Long getValue(TableMetrics metric) - { - return metric.speculativeInsufficientReplicas.getCount(); - } - }); - repairsStarted = createKeyspaceCounter("RepairJobsStarted", new MetricValue() - { - public Long getValue(TableMetrics metric) - { - return metric.repairsStarted.getCount(); - } - }); - repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", new MetricValue() - { - public Long getValue(TableMetrics metric) - { - return metric.repairsCompleted.getCount(); - } - }); + speculativeRetries = createKeyspaceCounter("SpeculativeRetries", metric -> metric.speculativeRetries.getCount()); + speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", metric -> metric.speculativeFailedRetries.getCount()); + speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", metric -> metric.speculativeInsufficientReplicas.getCount()); + speculativeWrites = createKeyspaceCounter("SpeculativeWrites", metric -> metric.speculativeWrites.getCount()); + repairsStarted = createKeyspaceCounter("RepairJobsStarted", metric -> metric.repairsStarted.getCount()); + repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", metric -> metric.repairsCompleted.getCount()); repairTime = Metrics.timer(factory.createMetricName("RepairTime")); repairPrepareTime = Metrics.timer(factory.createMetricName("RepairPrepareTime")); anticompactionTime = Metrics.timer(factory.createMetricName("AntiCompactionTime")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java index fe7673d..3d00b12 100644 --- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java @@ -37,6 +37,7 @@ public class ReadRepairMetrics @Deprecated public static final Meter attempted = Metrics.meter(factory.createMetricName("Attempted")); + // Incremented when additional requests were sent during blocking read repair due to unavailable or slow nodes public static final Meter speculatedRead = Metrics.meter(factory.createMetricName("SpeculatedRead")); public static final Meter speculatedWrite = Metrics.meter(factory.createMetricName("SpeculatedWrite")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 49603ba..53ebcb0 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -214,6 +214,9 @@ public class TableMetrics public final Counter speculativeInsufficientReplicas; public final Gauge<Long> speculativeSampleLatencyNanos; + public final Counter speculativeWrites; + public final Gauge<Long> speculativeWriteLatencyNanos; + public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read"); public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); @@ -239,7 +242,7 @@ public class TableMetrics Keyspace k = Schema.instance.getKeyspaceInstance(keyspace); if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName())) continue; - if (k.getReplicationStrategy().getReplicationFactor() < 2) + if (k.getReplicationStrategy().getReplicationFactor().allReplicas < 2) continue; for (ColumnFamilyStore cf : k.getColumnFamilyStores()) @@ -825,13 +828,11 @@ public class TableMetrics speculativeRetries = createTableCounter("SpeculativeRetries"); speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries"); speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas"); - speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", new Gauge<Long>() - { - public Long getValue() - { - return cfs.sampleLatencyNanos; - } - }); + speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> cfs.sampleReadLatencyNanos); + + speculativeWrites = createTableCounter("SpeculativeWrites"); + speculativeWriteLatencyNanos = createTableGauge("SpeculativeWriteLatencyNanos", () -> cfs.transientWriteLatencyNanos); + keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"), aliasFactory.createMetricName("KeyCacheHitRate"), new RatioGauge() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/IAsyncCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java index 251d263..253b412 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallback.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicate; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; /** * implementors of IAsyncCallback need to make sure that any public methods @@ -30,13 +31,9 @@ import org.apache.cassandra.locator.InetAddressAndPort; */ public interface IAsyncCallback<T> { - Predicate<InetAddressAndPort> isAlive = new Predicate<InetAddressAndPort>() - { - public boolean apply(InetAddressAndPort endpoint) - { - return FailureDetector.instance.isAlive(endpoint); - } - }; + final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive; + + final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint()); /** * @param msg response received. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c8fe3b7..bd290a1 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -90,6 +90,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; @@ -604,8 +605,9 @@ public final class MessagingService implements MessagingServiceMBean if (expiredCallbackInfo.shouldHint()) { - Mutation mutation = ((WriteCallbackInfo) expiredCallbackInfo).mutation(); - return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null); + WriteCallbackInfo writeCallbackInfo = ((WriteCallbackInfo) expiredCallbackInfo); + Mutation mutation = writeCallbackInfo.mutation(); + return StorageProxy.submitHint(mutation, writeCallbackInfo.getReplica(), null); } return null; @@ -961,7 +963,7 @@ public final class MessagingService implements MessagingServiceMBean return verbHandlers.get(type); } - public int addCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback) + public int addWriteCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback) { assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel int messageId = nextId(); @@ -970,12 +972,12 @@ public final class MessagingService implements MessagingServiceMBean return messageId; } - public int addCallback(IAsyncCallback cb, - MessageOut<?> message, - InetAddressAndPort to, - long timeout, - ConsistencyLevel consistencyLevel, - boolean allowHints) + public int addWriteCallback(IAsyncCallback cb, + MessageOut<?> message, + Replica to, + long timeout, + ConsistencyLevel consistencyLevel, + boolean allowHints) { assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION @@ -1024,7 +1026,7 @@ public final class MessagingService implements MessagingServiceMBean */ public int sendRR(MessageOut message, InetAddressAndPort to, IAsyncCallback cb, long timeout, boolean failureCallback) { - int id = addCallback(cb, message, to, timeout, failureCallback); + int id = addWriteCallback(cb, message, to, timeout, failureCallback); updateBackPressureOnSend(to, cb, message); sendOneWay(failureCallback ? message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE) : message, id, to); return id; @@ -1042,14 +1044,14 @@ public final class MessagingService implements MessagingServiceMBean * suggest that a timeout occurred to the invoker of the send(). * @return an reference to message id used to match with the result */ - public int sendRR(MessageOut<?> message, - InetAddressAndPort to, - AbstractWriteResponseHandler<?> handler, - boolean allowHints) - { - int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints); - updateBackPressureOnSend(to, handler, message); - sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to); + public int sendWriteRR(MessageOut<?> message, + Replica to, + AbstractWriteResponseHandler<?> handler, + boolean allowHints) + { + int id = addWriteCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel(), allowHints); + updateBackPressureOnSend(to.endpoint(), handler, message); + sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to.endpoint()); return id; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java index 41ac31b..c54e7dc 100644 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@ -21,7 +21,7 @@ package org.apache.cassandra.net; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.utils.FBUtilities; @@ -30,24 +30,31 @@ public class WriteCallbackInfo extends CallbackInfo { // either a Mutation, or a Paxos Commit (MessageOut) private final Object mutation; + private final Replica replica; - public WriteCallbackInfo(InetAddressAndPort target, + public WriteCallbackInfo(Replica replica, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel, boolean allowHints) { - super(target, callback, serializer, true); + super(replica.endpoint(), callback, serializer, true); assert message != null; this.mutation = shouldHint(allowHints, message, consistencyLevel); //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477) assert (!target.equals(FBUtilities.getBroadcastAddressAndPort())); + this.replica = replica; } public boolean shouldHint() { - return mutation != null && StorageProxy.shouldHint(target); + return mutation != null && StorageProxy.shouldHint(replica); + } + + public Replica getReplica() + { + return replica; } public Mutation mutation() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AbstractSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java new file mode 100644 index 0000000..124baa1 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.List; + +import com.google.common.util.concurrent.AbstractFuture; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable +{ + protected abstract void startSync(List<Range<Token>> rangesToStream); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java index 2ca524f..eaf890a 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java @@ -18,12 +18,14 @@ package org.apache.cassandra.repair; +import java.util.Collections; import java.util.List; import java.util.UUID; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; @@ -54,8 +56,9 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) - // request ranges from the remote node - .requestRanges(fetchFrom, desc.keyspace, rangesToFetch, desc.columnFamily); + // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); plan.execute(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java index e24d854..2b171c9 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -30,6 +30,7 @@ import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask { @@ -37,6 +38,10 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp { super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind); } + public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind) + { + this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind); + } public void startSync(List<Range<Token>> rangesToFetch) { @@ -46,6 +51,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp Tracing.traceRepair(message); MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode); } + public void syncComplete(boolean success, List<SessionSummary> summaries) { if (success) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java index 4d38e8a..35474af 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java @@ -21,8 +21,6 @@ package org.apache.cassandra.repair; import java.util.List; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.AbstractFuture; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +31,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; -public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable +public abstract class AsymmetricSyncTask extends AbstractSyncTask { private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class); protected final RepairJobDesc desc; @@ -44,9 +42,9 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem private long startTime = Long.MIN_VALUE; protected volatile SyncStat stat; - public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) { + assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom; this.desc = desc; this.fetchFrom = fetchFrom; this.fetchingNode = fetchingNode; @@ -55,6 +53,7 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size()); this.previewKind = previewKind; } + public void run() { startTime = System.currentTimeMillis(); @@ -79,7 +78,4 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem if (startTime != Long.MIN_VALUE) Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - - - public abstract void startSync(List<Range<Token>> rangesToFetch); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/CommonRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java new file mode 100644 index 0000000..928e570 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/CommonRange.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * Groups ranges with identical endpoints/transient endpoints + */ +public class CommonRange +{ + public final ImmutableSet<InetAddressAndPort> endpoints; + public final ImmutableSet<InetAddressAndPort> transEndpoints; + public final Collection<Range<Token>> ranges; + + public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges) + { + Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty()); + Preconditions.checkArgument(transEndpoints != null); + Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); + Preconditions.checkArgument(ranges != null && !ranges.isEmpty()); + + this.endpoints = ImmutableSet.copyOf(endpoints); + this.transEndpoints = ImmutableSet.copyOf(transEndpoints); + this.ranges = new ArrayList(ranges); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CommonRange that = (CommonRange) o; + + if (!endpoints.equals(that.endpoints)) return false; + if (!transEndpoints.equals(that.transEndpoints)) return false; + return ranges.equals(that.ranges); + } + + public int hashCode() + { + int result = endpoints.hashCode(); + result = 31 * result + transEndpoints.hashCode(); + result = 31 * result + ranges.hashCode(); + return result; + } + + public String toString() + { + return "CommonRange{" + + "endpoints=" + endpoints + + ", transEndpoints=" + transEndpoints + + ", ranges=" + ranges + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java index 8aa4381..bc614dc 100644 --- a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java +++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java @@ -25,8 +25,7 @@ import java.util.concurrent.ExecutorService; import com.google.common.util.concurrent.ListenableFuture; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.RangesAtEndpoint; /** * Keyspace level hook for repair. @@ -38,5 +37,8 @@ public interface KeyspaceRepairManager * been notified that the repair session has been completed, the data associated with the given session id must * not be combined with repaired or unrepaired data, or data from other repair sessions. */ - ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor); + ListenableFuture prepareIncrementalRepair(UUID sessionID, + Collection<ColumnFamilyStore> tables, + RangesAtEndpoint tokenRanges, + ExecutorService executor); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java deleted file mode 100644 index d7e0387..0000000 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.List; -import java.util.UUID; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamEvent; -import org.apache.cassandra.streaming.StreamEventHandler; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.streaming.StreamOperation; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -/** - * LocalSyncTask performs streaming between local(coordinator) node and remote replica. - */ -public class LocalSyncTask extends SyncTask implements StreamEventHandler -{ - private final TraceState state = Tracing.instance.get(); - - private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); - - private final UUID pendingRepair; - private final boolean pullRepair; - - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) - { - super(desc, r1, r2, previewKind); - this.pendingRepair = pendingRepair; - this.pullRepair = pullRepair; - } - - @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences) - { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) - .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - .requestRanges(dst, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node - if (!pullRepair) - { - // send ranges to the remote node if we are not performing a pull repair - plan.transferRanges(dst, desc.keyspace, differences, desc.columnFamily); - } - - return plan; - } - - /** - * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback - * that will be called out of band once the streams complete. - */ - @Override - protected void startSync(List<Range<Token>> differences) - { - InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; - - String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); - Tracing.traceRepair(message); - - createStreamPlan(dst, differences).execute(); - } - - public void handleStreamEvent(StreamEvent event) - { - if (state == null) - return; - switch (event.eventType) - { - case STREAM_PREPARED: - StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; - state.trace("Streaming session with {} prepared", spe.session.peer); - break; - case STREAM_COMPLETE: - StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; - state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); - break; - case FILE_PROGRESS: - ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; - state.trace("{}/{} ({}%) {} idx:{}{}", - new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), - FBUtilities.prettyPrintMemory(pi.totalBytes), - pi.currentBytes * 100 / pi.totalBytes, - pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", - pi.sessionIndex, - pi.peer }); - } - } - - public void onSuccess(StreamState result) - { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); - Tracing.traceRepair(message); - set(stat.withSummaries(result.createSummaries())); - finished(); - } - - public void onFailure(Throwable t) - { - setException(t); - finished(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java deleted file mode 100644 index 0a47f73..0000000 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.RepairException; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.SyncRequest; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.SessionSummary; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -/** - * RemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node - * to repair(stream) data with other replica. - * - * When RemoteSyncTask receives SyncComplete from remote node, task completes. - */ -public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask -{ - private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class); - - public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) - { - super(desc, r1, r2, previewKind); - } - - @Override - protected void startSync(List<Range<Token>> differences) - { - InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); - String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); - Tracing.traceRepair(message); - MessagingService.instance().sendOneWay(request.createMessage(), request.src); - } - - public void syncComplete(boolean success, List<SessionSummary> summaries) - { - if (success) - { - set(stat.withSummaries(summaries)); - } - else - { - setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); - } - finished(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 48973d2..d38435b 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -64,7 +64,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams) { this.session = session; - this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); + this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.commonRange.ranges); this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; this.isIncremental = isIncremental; @@ -83,7 +83,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable Keyspace ks = Keyspace.open(desc.keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily); cfs.metric.repairsStarted.inc(); - List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints); + List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.commonRange.endpoints); allEndpoints.add(FBUtilities.getBroadcastAddressAndPort()); ListenableFuture<List<TreeResponse>> validations; @@ -160,13 +160,18 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable }, taskExecutor); } + private boolean isTransient(InetAddressAndPort ep) + { + return session.commonRange.transEndpoints.contains(ep); + } + private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing() { return trees -> { InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List<SyncTask> syncTasks = new ArrayList<>(); + List<AbstractSyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another for (int i = 0; i < trees.size() - 1; ++i) { @@ -174,17 +179,29 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable for (int j = i + 1; j < trees.size(); ++j) { TreeResponse r2 = trees.get(j); - SyncTask task; + + if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) + continue; + + AbstractSyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint; + task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + } + else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) + { + TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; + TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1; + task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); + session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task); } else { - task = new RemoteSyncTask(desc, r1, r2, session.previewKind); - // RemoteSyncTask expects SyncComplete message sent back. + task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); + // SymmetricRemoteSyncTask expects SyncComplete message sent back. // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); + session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task); } syncTasks.add(task); taskExecutor.submit(task); @@ -200,7 +217,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List<AsymmetricSyncTask> syncTasks = new ArrayList<>(); + List<AbstractSyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another DifferenceHolder diffHolder = new DifferenceHolder(trees); @@ -215,6 +232,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable for (int i = 0; i < trees.size(); i++) { InetAddressAndPort address = trees.get(i).endpoint; + + // we don't stream to transient replicas + if (isTransient(address)) + continue; + HostDifferences streamsFor = reducedDifferences.get(address); if (streamsFor != null) { @@ -373,4 +395,4 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable } return Futures.allAsList(tasks); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 90c0146..8d3cd54 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; import java.io.IOException; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutorService; @@ -29,8 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -38,8 +36,9 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.Replica; import org.apache.commons.lang3.time.DurationFormatUtils; -import org.junit.internal.runners.statements.Fail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +73,6 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.progress.ProgressEvent; @@ -141,46 +139,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti recordFailure(message, completionMessage); } - @VisibleForTesting - static class CommonRange - { - public final Set<InetAddressAndPort> endpoints; - public final Collection<Range<Token>> ranges; - - public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges) - { - Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty()); - Preconditions.checkArgument(ranges != null && !ranges.isEmpty()); - this.endpoints = endpoints; - this.ranges = ranges; - } - - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CommonRange that = (CommonRange) o; - - if (!endpoints.equals(that.endpoints)) return false; - return ranges.equals(that.ranges); - } - - public int hashCode() - { - int result = endpoints.hashCode(); - result = 31 * result + ranges.hashCode(); - return result; - } - - public String toString() - { - return "CommonRange{" + - "endpoints=" + endpoints + - ", ranges=" + ranges + - '}'; - } - } protected void runMayThrow() throws Exception { @@ -238,20 +196,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti Set<InetAddressAndPort> allNeighbors = new HashSet<>(); List<CommonRange> commonRanges = new ArrayList<>(); - //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent + //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent //calculation multiple times - Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace); + // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica + Iterable<Range<Token>> keyspaceLocalRanges = storageService + .getLocalReplicas(keyspace) + .filter(Replica::isFull) + .ranges(); try { for (Range<Token> range : options.getRanges()) { - Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, - options.getDataCenters(), - options.getHosts()); + EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, + options.getDataCenters(), + options.getHosts()); addRangeToNeighbors(commonRanges, range, neighbors); - allNeighbors.addAll(neighbors); + allNeighbors.addAll(neighbors.endpoints()); } progress.incrementAndGet(); @@ -387,11 +349,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti for (CommonRange commonRange: commonRanges) { Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); + Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains)); + Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); // this node is implicitly a participant in this repair, so a single endpoint is ok here if (!endpoints.isEmpty()) { - filtered.add(new CommonRange(endpoints, commonRange.ranges)); + filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges)); } } Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); @@ -514,14 +478,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti // we do endpoint filtering at the start of an incremental repair, // so repair sessions shouldn't also be checking liveness boolean force = options.isForcedRepair() && !isIncremental; - for (CommonRange cr : commonRanges) + for (CommonRange commonRange : commonRanges) { - logger.info("Starting RepairSession for {}", cr); + logger.info("Starting RepairSession for {}", commonRange); RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - cr.ranges, + commonRange, keyspace, options.getParallelism(), - cr.endpoints, isIncremental, options.isPullRepair(), force, @@ -559,7 +522,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onSuccess(RepairSessionResult result) { String message = String.format("Repair session %s for range %s finished", session.getId(), - session.getRanges().toString()); + session.ranges().toString()); logger.info(message); fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), @@ -572,7 +535,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti StorageMetrics.repairExceptions.inc(); String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.getRanges().toString(), t.getMessage()); + session.getId(), session.ranges().toString(), t.getMessage()); logger.error(message, t); fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.incrementAndGet(), @@ -684,13 +647,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ImmutableList.of(failureMessage, completionMessage)); } - private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors) + private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) { + Set<InetAddressAndPort> endpoints = neighbors.endpoints(); + Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); for (int i = 0; i < neighborRangeList.size(); i++) { CommonRange cr = neighborRangeList.get(i); - if (cr.endpoints.containsAll(neighbors)) + if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints)) { cr.ranges.add(range); return; @@ -699,7 +664,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti List<Range<Token>> ranges = new ArrayList<>(); ranges.add(range); - neighborRangeList.add(new CommonRange(neighbors, ranges)); + neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges)); } private Thread createQueryThread(final int cmd, final UUID sessionId) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index ec06f37..2ff60ec 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -55,8 +55,8 @@ import org.apache.cassandra.utils.Pair; * validationComplete()). * </li> * <li>Synchronization phase: once all trees are received, the job compares each tree with - * all the other using a so-called {@link SyncTask}. If there is difference between 2 trees, the - * concerned SyncTask will start a streaming of the difference between the 2 endpoint concerned. + * all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the + * concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned. * </li> * </ol> * The job is done once all its SyncTasks are done (i.e. have either computed no differences @@ -74,7 +74,7 @@ import org.apache.cassandra.utils.Pair; * we still first send a message to each node to flush and snapshot data so each merkle tree * creation is still done on similar data, even if the actual creation is not * done simulatneously). If not sequential, all merkle tree are requested in parallel. - * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle + * Similarly, if a job is sequential, it will handle one SymmetricSyncTask at a time, but will handle * all of them in parallel otherwise. */ public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber, @@ -94,8 +94,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final boolean skippedReplicas; /** Range to repair */ - public final Collection<Range<Token>> ranges; - public final Set<InetAddressAndPort> endpoints; + public final CommonRange commonRange; public final boolean isIncremental; public final PreviewKind previewKind; @@ -114,23 +113,20 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement /** * Create new repair session. - * * @param parentRepairSession the parent sessions id * @param id this sessions id - * @param ranges ranges to repair + * @param commonRange ranges to repair * @param keyspace name of keyspace * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees - * @param endpoints the data centers that should be part of the repair; null for all DCs * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) * @param force true if the repair should ignore dead endpoints (instead of failing) * @param cfnames names of columnfamilies */ public RepairSession(UUID parentRepairSession, UUID id, - Collection<Range<Token>> ranges, + CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, - Set<InetAddressAndPort> endpoints, boolean isIncremental, boolean pullRepair, boolean force, @@ -145,7 +141,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.parallelismDegree = parallelismDegree; this.keyspace = keyspace; this.cfnames = cfnames; - this.ranges = ranges; //If force then filter out dead endpoints boolean forceSkippedReplicas = false; @@ -153,7 +148,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { logger.debug("force flag set, removing dead endpoints"); final Set<InetAddressAndPort> removeCandidates = new HashSet<>(); - for (final InetAddressAndPort endpoint : endpoints) + for (final InetAddressAndPort endpoint : commonRange.endpoints) { if (!FailureDetector.instance.isAlive(endpoint)) { @@ -166,12 +161,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // we shouldn't be recording a successful repair if // any replicas are excluded from the repair forceSkippedReplicas = true; - endpoints = new HashSet<>(endpoints); - endpoints.removeAll(removeCandidates); + Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints); + filteredEndpoints.removeAll(removeCandidates); + commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges); } } - this.endpoints = endpoints; + this.commonRange = commonRange; this.isIncremental = isIncremental; this.previewKind = previewKind; this.pullRepair = pullRepair; @@ -184,9 +180,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return id; } - public Collection<Range<Token>> getRanges() + public Collection<Range<Token>> ranges() + { + return commonRange.ranges; + } + + public Collection<InetAddressAndPort> endpoints() { - return ranges; + return commonRange.endpoints; } public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) @@ -247,7 +248,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { StringBuilder sb = new StringBuilder(); sb.append(FBUtilities.getBroadcastAddressAndPort()); - for (InetAddressAndPort ep : endpoints) + for (InetAddressAndPort ep : commonRange.endpoints) sb.append(", ").append(ep); return sb.toString(); } @@ -266,18 +267,18 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement if (terminated) return; - logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames)); - Tracing.traceRepair("Syncing range {}", ranges); + logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames)); + Tracing.traceRepair("Syncing range {}", commonRange); if (!previewKind.isPreview()) { - SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints); + SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, commonRange); } - if (endpoints.isEmpty()) + if (commonRange.endpoints.isEmpty()) { - logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges)); + logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange)); Tracing.traceRepair(message); - set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas)); + set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas)); if (!previewKind.isPreview()) { SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); @@ -286,7 +287,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } // Checking all nodes are live - for (InetAddressAndPort endpoint : endpoints) + for (InetAddressAndPort endpoint : commonRange.endpoints) { if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas) { @@ -318,8 +319,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { // this repair session is completed logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully"); - Tracing.traceRepair("Completed sync of range {}", ranges); - set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas)); + Tracing.traceRepair("Completed sync of range {}", commonRange); + set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas)); taskExecutor.shutdown(); // mark this session as terminated @@ -372,7 +373,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public void convict(InetAddressAndPort endpoint, double phi) { - if (!endpoints.contains(endpoint)) + if (!commonRange.endpoints.contains(endpoint)) return; // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 5d2b2ec..e9cba89 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Collection; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +79,12 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary - .requestRanges(dest, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + .requestRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node if (!asymmetric) - sp.transferRanges(dest, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + sp.transferRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges), desc.columnFamily); // send ranges to the remote node return sp; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org