http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 9467c9a..7f4ae14 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.MatchResult; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -41,9 +42,12 @@ import javax.management.openmbean.TabularDataSupport; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.*; import com.google.common.util.concurrent.*; +import org.apache.cassandra.dht.RangeStreamer.FetchReplica; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -110,6 +114,8 @@ import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Iterables.tryFind; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; @@ -164,9 +170,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return isShutdown; } - public Collection<Range<Token>> getLocalRanges(String keyspaceName) + public RangesAtEndpoint getLocalReplicas(String keyspaceName) { - return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); + return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); } public List<Range<Token>> getLocalAndPendingRanges(String ks) @@ -174,9 +180,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort(); Keyspace keyspace = Keyspace.open(ks); List<Range<Token>> ranges = new ArrayList<>(); - ranges.addAll(keyspace.getReplicationStrategy().getAddressRanges().get(broadcastAddress)); - ranges.addAll(getTokenMetadata().getPendingRanges(ks, broadcastAddress)); - return Range.normalize(ranges); + for (Replica r : keyspace.getReplicationStrategy().getAddressReplicas(broadcastAddress)) + ranges.add(r.range()); + for (Replica r : getTokenMetadata().getPendingRanges(ks, broadcastAddress)) + ranges.add(r.range()); + return ranges; } public Collection<Range<Token>> getPrimaryRanges(String keyspace) @@ -1225,11 +1233,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (keyspace == null) { for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) - streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + streamer.addRanges(keyspaceName, getLocalReplicas(keyspaceName)); } else if (tokens == null) { - streamer.addRanges(keyspace, getLocalRanges(keyspace)); + streamer.addRanges(keyspace, getLocalReplicas(keyspace)); } else { @@ -1251,14 +1259,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // Ensure all specified ranges are actually ranges owned by this host - Collection<Range<Token>> localRanges = getLocalRanges(keyspace); + RangesAtEndpoint localReplicas = getLocalReplicas(keyspace); + RangesAtEndpoint.Builder streamRanges = new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort(), ranges.size()); for (Range<Token> specifiedRange : ranges) { boolean foundParentRange = false; - for (Range<Token> localRange : localRanges) + for (Replica localReplica : localReplicas) { - if (localRange.contains(specifiedRange)) + if (localReplica.contains(specifiedRange)) { + streamRanges.add(localReplica.decorateSubrange(specifiedRange)); foundParentRange = true; break; } @@ -1292,7 +1302,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources)); } - streamer.addRanges(keyspace, ranges); + streamer.addRanges(keyspace, streamRanges.build()); } StreamResultFuture resultFuture = streamer.fetchAsync(); @@ -1700,9 +1710,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { /* All the ranges for the tokens */ Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>,List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange> entry : getRangeToAddressMap(keyspace).entrySet()) { - map.put(entry.getKey().asList(), stringify(entry.getValue(), withPort)); + map.put(entry.getKey().asList(), Replicas.stringify(entry.getValue(), withPort)); } return map; } @@ -1753,12 +1763,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { /* All the ranges for the tokens */ Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange> entry : getRangeToAddressMap(keyspace).entrySet()) { List<String> rpcaddrs = new ArrayList<>(entry.getValue().size()); - for (InetAddressAndPort endpoint: entry.getValue()) + for (Replica replicas: entry.getValue()) { - rpcaddrs.add(getNativeaddress(endpoint, withPort)); + rpcaddrs.add(getNativeaddress(replicas.endpoint(), withPort)); } map.put(entry.getKey().asList(), rpcaddrs); } @@ -1783,38 +1793,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE keyspace = Schema.instance.getNonLocalStrategyKeyspaces().get(0); Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet()) { - List<InetAddressAndPort> l = new ArrayList<>(entry.getValue()); - map.put(entry.getKey().asList(), stringify(l, withPort)); + map.put(entry.getKey().asList(), Replicas.stringify(entry.getValue(), withPort)); } return map; } - public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace) + public EndpointsByRange getRangeToAddressMap(String keyspace) { return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens()); } - public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMapInLocalDC(String keyspace) + public EndpointsByRange getRangeToAddressMapInLocalDC(String keyspace) { - Predicate<InetAddressAndPort> isLocalDC = new Predicate<InetAddressAndPort>() - { - public boolean apply(InetAddressAndPort address) - { - return isLocalDC(address); - } - }; + Predicate<Replica> isLocalDC = replica -> isLocalDC(replica.endpoint()); - Map<Range<Token>, List<InetAddressAndPort>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); - Map<Range<Token>, List<InetAddressAndPort>> filteredMap = Maps.newHashMap(); - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : origMap.entrySet()) + EndpointsByRange origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); + Map<Range<Token>, EndpointsForRange> filteredMap = Maps.newHashMap(); + for (Map.Entry<Range<Token>, EndpointsForRange> entry : origMap.entrySet()) { - List<InetAddressAndPort> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); + EndpointsForRange endpointsInLocalDC = entry.getValue().filter(isLocalDC); filteredMap.put(entry.getKey(), endpointsInLocalDC); } - return filteredMap; + return new EndpointsByRange(filteredMap); } private List<Token> getTokensInLocalDC() @@ -1836,7 +1839,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return remoteDC.equals(localDC); } - private Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens) + private EndpointsByRange getRangeToAddressMap(String keyspace, List<Token> sortedTokens) { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. @@ -1917,13 +1920,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<TokenRange> ranges = new ArrayList<>(); Token.TokenFactory tf = getTokenFactory(); - Map<Range<Token>, List<InetAddressAndPort>> rangeToAddressMap = + EndpointsByRange rangeToAddressMap = includeOnlyLocalDC ? getRangeToAddressMapInLocalDC(keyspace) : getRangeToAddressMap(keyspace); - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : rangeToAddressMap.entrySet()) - ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue(), withPort)); + for (Map.Entry<Range<Token>, EndpointsForRange> entry : rangeToAddressMap.entrySet()) + ranges.add(TokenRange.create(tf, entry.getKey(), ImmutableList.copyOf(entry.getValue().endpoints()), withPort)); return ranges; } @@ -2010,14 +2013,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - private Map<Range<Token>, List<InetAddressAndPort>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) + private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) { - Map<Range<Token>, List<InetAddressAndPort>> rangeToEndpointMap = new HashMap<>(ranges.size()); + Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size()); for (Range<Token> range : ranges) { - rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right)); + rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right)); } - return rangeToEndpointMap; + return new EndpointsByRange(rangeToEndpointMap); } public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) @@ -2735,32 +2738,56 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * Finds living endpoints responsible for the given ranges * * @param keyspaceName the keyspace ranges belong to - * @param ranges the ranges to find sources for + * @param leavingReplicas the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - private Multimap<InetAddressAndPort, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges) + private Multimap<InetAddressAndPort, FetchReplica> getNewSourceReplicas(String keyspaceName, Set<LeavingReplica> leavingReplicas) { InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort(); - Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap()); - Multimap<InetAddressAndPort, Range<Token>> sourceRanges = HashMultimap.create(); + EndpointsByRange rangeReplicas = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap()); + Multimap<InetAddressAndPort, FetchReplica> sourceRanges = HashMultimap.create(); IFailureDetector failureDetector = FailureDetector.instance; + logger.debug("Getting new source replicas for {}", leavingReplicas); + // find alive sources for our new ranges - for (Range<Token> range : ranges) - { - Collection<InetAddressAndPort> possibleRanges = rangeAddresses.get(range); + for (LeavingReplica leaver : leavingReplicas) + { + //We need this to find the replicas from before leaving to supply the data + Replica leavingReplica = leaver.leavingReplica; + //We need this to know what to fetch and what the transient status is + Replica ourReplica = leaver.ourReplica; + //If we are going to be a full replica only consider full replicas + Predicate<Replica> replicaFilter = ourReplica.isFull() ? Replica::isFull : Predicates.alwaysTrue(); + Predicate<Replica> notSelf = replica -> !replica.endpoint().equals(myAddress); + EndpointsForRange possibleReplicas = rangeReplicas.get(leavingReplica.range()); + logger.info("Possible replicas for newReplica {} are {}", ourReplica, possibleReplicas); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - List<InetAddressAndPort> sources = snitch.getSortedListByProximity(myAddress, possibleRanges); + EndpointsForRange sortedPossibleReplicas = snitch.sortedByProximity(myAddress, possibleReplicas); + logger.info("Sorted possible replicas starts as {}", sortedPossibleReplicas); + Optional<Replica> myCurrentReplica = tryFind(possibleReplicas, replica -> replica.endpoint().equals(myAddress)).toJavaUtil(); - assert (!sources.contains(myAddress)); + boolean transientToFull = myCurrentReplica.isPresent() && myCurrentReplica.get().isTransient() && ourReplica.isFull(); + assert !sortedPossibleReplicas.endpoints().contains(myAddress) || transientToFull : String.format("My address %s, sortedPossibleReplicas %s, myCurrentReplica %s, myNewReplica %s", myAddress, sortedPossibleReplicas, myCurrentReplica, ourReplica); - for (InetAddressAndPort source : sources) + //Originally this didn't log if it couldn't restore replication and that seems wrong + boolean foundLiveReplica = false; + for (Replica possibleReplica : sortedPossibleReplicas.filter(Predicates.and(replicaFilter, notSelf))) { - if (failureDetector.isAlive(source)) + if (failureDetector.isAlive(possibleReplica.endpoint())) { - sourceRanges.put(source, range); + foundLiveReplica = true; + sourceRanges.put(possibleReplica.endpoint(), new FetchReplica(ourReplica, possibleReplica)); break; } + else + { + logger.debug("Skipping down replica {}", possibleReplica); + } + } + if (!foundLiveReplica) + { + logger.warn("Didn't find live replica to restore replication for " + ourReplica); } } return sourceRanges; @@ -2793,6 +2820,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + private static class LeavingReplica + { + //The node that is leaving + private final Replica leavingReplica; + + //Our range and transient status + private final Replica ourReplica; + + public LeavingReplica(Replica leavingReplica, Replica ourReplica) + { + Preconditions.checkNotNull(leavingReplica); + Preconditions.checkNotNull(ourReplica); + this.leavingReplica = leavingReplica; + this.ourReplica = ourReplica; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LeavingReplica that = (LeavingReplica) o; + + if (!leavingReplica.equals(that.leavingReplica)) return false; + return ourReplica.equals(that.ourReplica); + } + + public int hashCode() + { + int result = leavingReplica.hashCode(); + result = 31 * result + ourReplica.hashCode(); + return result; + } + + public String toString() + { + return "LeavingReplica{" + + "leavingReplica=" + leavingReplica + + ", ourReplica=" + ourReplica + + '}'; + } + } + /** * Called when an endpoint is removed from the ring. This function checks * whether this node becomes responsible for new ranges as a @@ -2805,38 +2875,52 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private void restoreReplicaCount(InetAddressAndPort endpoint, final InetAddressAndPort notifyEndpoint) { - Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create(); + Map<String, Multimap<InetAddressAndPort, FetchReplica>> replicasToFetch = new HashMap<>(); InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort(); for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - Multimap<Range<Token>, InetAddressAndPort> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint); - Set<Range<Token>> myNewRanges = new HashSet<>(); - for (Map.Entry<Range<Token>, InetAddressAndPort> entry : changedRanges.entries()) - { - if (entry.getValue().equals(myAddress)) - myNewRanges.add(entry.getKey()); - } - Multimap<InetAddressAndPort, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges); - for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet()) + logger.debug("Restoring replica count for keyspace {}", keyspaceName); + EndpointsByReplica changedReplicas = getChangedReplicasForLeaving(keyspaceName, endpoint, tokenMetadata, Keyspace.open(keyspaceName).getReplicationStrategy()); + Set<LeavingReplica> myNewReplicas = new HashSet<>(); + for (Map.Entry<Replica, Replica> entry : changedReplicas.flattenEntries()) { - rangesToFetch.put(keyspaceName, entry); + Replica replica = entry.getValue(); + if (replica.endpoint().equals(myAddress)) + { + //Maybe we don't technically need to fetch transient data from somewhere + //but it's probably not a lot and it probably makes things a hair more resilient to people + //not running repair when they should. + myNewReplicas.add(new LeavingReplica(entry.getKey(), entry.getValue())); + } } + logger.debug("Changed replicas for leaving {}, myNewReplicas {}", changedReplicas, myNewReplicas); + replicasToFetch.put(keyspaceName, getNewSourceReplicas(keyspaceName, myNewReplicas)); } StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT); - for (String keyspaceName : rangesToFetch.keySet()) - { - for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) - { - InetAddressAndPort source = entry.getKey(); - Collection<Range<Token>> ranges = entry.getValue(); + replicasToFetch.forEach((keyspaceName, sources) -> { + logger.debug("Requesting keyspace {} sources", keyspaceName); + sources.asMap().forEach((sourceAddress, fetchReplicas) -> { + logger.debug("Source and our replicas are {}", fetchReplicas); + //Remember whether this node is providing the full or transient replicas for this range. We are going + //to pass streaming the local instance of Replica for the range which doesn't tell us anything about the source + //By encoding it as two separate sets we retain this information about the source. + RangesAtEndpoint full = fetchReplicas.stream() + .filter(f -> f.remote.isFull()) + .map(f -> f.local) + .collect(RangesAtEndpoint.collector(myAddress)); + RangesAtEndpoint transientReplicas = fetchReplicas.stream() + .filter(f -> f.remote.isTransient()) + .map(f -> f.local) + .collect(RangesAtEndpoint.collector(myAddress)); if (logger.isDebugEnabled()) - logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); - stream.requestRanges(source, keyspaceName, ranges); - } - } + logger.debug("Requesting from {} full replicas {} transient replicas {}", sourceAddress, StringUtils.join(full, ", "), StringUtils.join(transientReplicas, ", ")); + + stream.requestRanges(sourceAddress, keyspaceName, full, transientReplicas); + }); + }); StreamResultFuture future = stream.execute(); Futures.addCallback(future, new FutureCallback<StreamState>() { @@ -2854,21 +2938,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE }); } + /** + * This is used in three contexts, graceful decomission, and restoreReplicaCount/removeNode. + * Graceful decomission should never lose data and it's going to be important that transient data + * is streamed to at least one other node from this one for each range. + * + * For ranges this node replicates its removal should cause a new replica to be selected either as transient or full + * for every range. So I believe the current code doesn't have to do anything special because it will engage in streaming + * for every range it replicates to at least one other node and that should propagate the transient data that was here. + * When I graphed this out on paper the result of removal looked correct and there are no issues such as + * this node needing to create a full replica for a range it transiently replicates because what is created is just another + * transient replica to replace this node. + * @param keyspaceName + * @param endpoint + * @return + */ // needs to be modified to accept either a keyspace or ARS. - private Multimap<Range<Token>, InetAddressAndPort> getChangedRangesForLeaving(String keyspaceName, InetAddressAndPort endpoint) + static EndpointsByReplica getChangedReplicasForLeaving(String keyspaceName, InetAddressAndPort endpoint, TokenMetadata tokenMetadata, AbstractReplicationStrategy strat) { // First get all ranges the leaving endpoint is responsible for - Collection<Range<Token>> ranges = getRangesForEndpoint(keyspaceName, endpoint); + RangesAtEndpoint replicas = strat.getAddressReplicas(endpoint); if (logger.isDebugEnabled()) - logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", ")); + logger.debug("Node {} replicas [{}]", endpoint, StringUtils.join(replicas, ", ")); - Map<Range<Token>, List<InetAddressAndPort>> currentReplicaEndpoints = new HashMap<>(ranges.size()); + Map<Replica, EndpointsForRange> currentReplicaEndpoints = Maps.newHashMapWithExpectedSize(replicas.size()); // Find (for each range) all nodes that store replicas for these ranges as well TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); // don't do this in the loop! #7758 - for (Range<Token> range : ranges) - currentReplicaEndpoints.put(range, Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, metadata)); + for (Replica replica : replicas) + currentReplicaEndpoints.put(replica, strat.calculateNaturalReplicas(replica.range().right, metadata)); TokenMetadata temp = tokenMetadata.cloneAfterAllLeft(); @@ -2877,26 +2976,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (temp.isMember(endpoint)) temp.removeEndpoint(endpoint); - Multimap<Range<Token>, InetAddressAndPort> changedRanges = HashMultimap.create(); + EndpointsByReplica.Mutable changedRanges = new EndpointsByReplica.Mutable(); // Go through the ranges and for each range check who will be // storing replicas for these ranges when the leaving endpoint // is gone. Whoever is present in newReplicaEndpoints list, but // not in the currentReplicaEndpoints list, will be needing the // range. - for (Range<Token> range : ranges) - { - Collection<InetAddressAndPort> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp); - newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range)); + for (Replica replica : replicas) + { + EndpointsForRange newReplicaEndpoints = strat.calculateNaturalReplicas(replica.range().right, temp); + newReplicaEndpoints = newReplicaEndpoints.filter(newReplica -> { + Optional<Replica> currentReplicaOptional = + tryFind(currentReplicaEndpoints.get(replica), + currentReplica -> newReplica.endpoint().equals(currentReplica.endpoint()) + ).toJavaUtil(); + //If it is newly replicating then yes we must do something to get the data there + if (!currentReplicaOptional.isPresent()) + return true; + + Replica currentReplica = currentReplicaOptional.get(); + //This transition requires streaming to occur + //Full -> transient is handled by nodetool cleanup + //transient -> transient and full -> full don't require any action + if (currentReplica.isTransient() && newReplica.isFull()) + return true; + return false; + }); + if (logger.isDebugEnabled()) if (newReplicaEndpoints.isEmpty()) - logger.debug("Range {} already in all replicas", range); + logger.debug("Replica {} already in all replicas", replica); else - logger.debug("Range {} will be responsibility of {}", range, StringUtils.join(newReplicaEndpoints, ", ")); - changedRanges.putAll(range, newReplicaEndpoints); + logger.debug("Replica {} will be responsibility of {}", replica, StringUtils.join(newReplicaEndpoints, ", ")); + changedRanges.putAll(replica, newReplicaEndpoints, Conflict.NONE); } - return changedRanges; + return changedRanges.asImmutableView(); } public void onJoin(InetAddressAndPort endpoint, EndpointState epState) @@ -3602,10 +3718,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - option.getRanges().addAll(getLocalRanges(keyspace)); + Iterables.addAll(option.getRanges(), getLocalReplicas(keyspace).filter(Replica::isFull).ranges()); } } - if (option.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) + if (option.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor().allReplicas < 2) return 0; int cmd = nextRepairCommand.incrementAndGet(); @@ -3703,7 +3819,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * Get the "primary ranges" for the specified keyspace and endpoint. * "Primary ranges" are the ranges that the node is responsible for storing replica primarily. * The node that stores replica primarily is defined as the first node returned - * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}. + * by {@link AbstractReplicationStrategy#calculateNaturalReplicas}. * * @param keyspace Keyspace name to check primary ranges * @param ep endpoint we are interested in. @@ -3716,9 +3832,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); for (Token token : metadata.sortedTokens()) { - List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata); - if (endpoints.size() > 0 && endpoints.get(0).equals(ep)) + EndpointsForRange replicas = strategy.calculateNaturalReplicas(token, metadata); + if (replicas.size() > 0 && replicas.get(0).endpoint().equals(ep)) + { + Preconditions.checkState(replicas.get(0).isFull()); primaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); + } } return primaryRanges; } @@ -3741,12 +3860,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>(); for (Token token : metadata.sortedTokens()) { - List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata); - for (InetAddressAndPort endpoint : endpoints) + EndpointsForRange replicas = strategy.calculateNaturalReplicas(token, metadata); + for (Replica replica : replicas) { - if (localDcNodes.contains(endpoint)) + if (localDcNodes.contains(replica.endpoint())) { - if (endpoint.equals(referenceEndpoint)) + if (replica.endpoint().equals(referenceEndpoint)) { localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); } @@ -3763,9 +3882,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ep endpoint we are interested in. * @return ranges for the specified endpoint. */ - Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddressAndPort ep) + RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep) { - return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep); + return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep); } /** @@ -3806,40 +3925,53 @@ public class StorageService extends NotificationBroadcasterSupport implements IE @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key) { - KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); - if (ksMetaData == null) - throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'"); - - TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf); - if (metadata == null) - throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - - return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))).stream().map(i -> i.address).collect(toList()); + EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, cf, key); + List<InetAddress> inetList = new ArrayList<>(replicas.size()); + replicas.forEach(r -> inetList.add(r.endpoint().address)); + return inetList; } public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key) { - KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); - if (ksMetaData == null) - throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'"); - - TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf); - if (metadata == null) - throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - - return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))), true); + return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true); } @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key) { - return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)).stream().map(i -> i.address).collect(toList()); + EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)); + List<InetAddress> inetList = new ArrayList<>(replicas.size()); + replicas.forEach(r -> inetList.add(r.endpoint().address)); + return inetList; } public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key) { - return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)), true); + return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true); + } + + public List<String> getReplicas(String keyspaceName, String cf, String key) + { + List<String> res = new ArrayList<>(); + for (Replica replica : getNaturalReplicasForToken(keyspaceName, cf, key)) + { + res.add(replica.toString()); + } + return res; + } + + public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, String cf, String key) + { + KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (ksMetaData == null) + throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'"); + + TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf); + if (metadata == null) + throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); + + return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))); } /** @@ -3850,17 +3982,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param pos position for which we need to find the endpoint * @return the endpoint responsible for this token */ - public List<InetAddressAndPort> getNaturalEndpoints(String keyspaceName, RingPosition pos) + public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos) { - return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos); + return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos); } /** * Returns the endpoints currently responsible for storing the token plus pending ones */ - public Iterable<InetAddressAndPort> getNaturalAndPendingEndpoints(String keyspaceName, Token token) + public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token) { - return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName)); + // TODO: race condition to fetch these. impliciations?? + EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token); + EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName); + if (Endpoints.haveConflicts(natural, pending)) + { + natural = Endpoints.resolveConflictsInNatural(natural, pending); + pending = Endpoints.resolveConflictsInPending(natural, pending); + } + return Endpoints.concat(natural, pending); } /** @@ -3868,19 +4008,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * specified key i.e for replication. * * @param keyspace keyspace name also known as keyspace - * @param key key for which we need to find the endpoint - * @return the endpoint responsible for this key + * @param pos position for which we need to find the endpoint */ - public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key) - { - return getLiveNaturalEndpoints(keyspace, tokenMetadata.decorateKey(key)); - } - - public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos) + public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos) { - List<InetAddressAndPort> liveEps = new ArrayList<>(); - getLiveNaturalEndpoints(keyspace, pos, liveEps); - return liveEps; + return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken()); } /** @@ -3889,17 +4021,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param keyspace keyspace name also known as keyspace * @param pos position for which we need to find the endpoint - * @param liveEps the list of endpoints to mutate */ - public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddressAndPort> liveEps) + public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos) { - List<InetAddressAndPort> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos); - - for (InetAddressAndPort endpoint : endpoints) - { - if (FailureDetector.instance.isAlive(endpoint)) - liveEps.add(endpoint); - } + EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos); + return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint())); } public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception @@ -4019,13 +4145,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - rf = strategy.getReplicationFactor(dc); + rf = strategy.getReplicationFactor(dc).allReplicas; numNodes = metadata.getTopology().getDatacenterEndpoints().get(dc).size(); } else { numNodes = metadata.getAllEndpoints().size(); - rf = keyspace.getReplicationStrategy().getReplicationFactor(); + rf = keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; } if (numNodes <= rf) @@ -4033,6 +4159,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE + keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")." + " Perform a forceful decommission to ignore."); } + // TODO: do we care about fixing transient/full self-movements here? probably if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddressAndPort()).size() > 0) throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); } @@ -4095,11 +4222,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException { - Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStream = new HashMap<>(); + Map<String, EndpointsByReplica> rangesToStream = new HashMap<>(); for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - Multimap<Range<Token>, InetAddressAndPort> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); + EndpointsByReplica rangesMM = getChangedReplicasForLeaving(keyspaceName, FBUtilities.getBroadcastAddressAndPort(), tokenMetadata, Keyspace.open(keyspaceName).getReplicationStrategy()); if (logger.isDebugEnabled()) logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ",")); @@ -4135,20 +4262,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return HintsService.instance.transferHints(this::getPreferredHintsStreamTarget); } + private static EndpointsForRange getStreamCandidates(Collection<InetAddressAndPort> endpoints) + { + endpoints = endpoints.stream() + .filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint)) + .collect(Collectors.toList()); + + return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints)); + } /** * Find the best target to stream hints to. Currently the closest peer according to the snitch */ private UUID getPreferredHintsStreamTarget() { - List<InetAddressAndPort> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); - candidates.remove(FBUtilities.getBroadcastAddressAndPort()); - for (Iterator<InetAddressAndPort> iter = candidates.iterator(); iter.hasNext(); ) - { - InetAddressAndPort address = iter.next(); - if (!FailureDetector.instance.isAlive(address)) - iter.remove(); - } + Set<InetAddressAndPort> endpoints = StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints(); + EndpointsForRange candidates = getStreamCandidates(endpoints); if (candidates.isEmpty()) { logger.warn("Unable to stream hints since no live endpoints seen"); @@ -4157,8 +4286,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE else { // stream to the closest peer as chosen by the snitch - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(), candidates); - InetAddressAndPort hintsDestinationHost = candidates.get(0); + candidates = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), candidates); + InetAddressAndPort hintsDestinationHost = candidates.get(0).endpoint(); return tokenMetadata.getHostId(hintsDestinationHost); } } @@ -4207,6 +4336,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // checking if data is moving to this node for (String keyspaceName : keyspacesToProcess) { + // TODO: do we care about fixing transient/full self-movements here? if (tokenMetadata.getPendingRanges(keyspaceName, localAddress).size() > 0) throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); } @@ -4218,7 +4348,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true); Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS); - RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess); + RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess, tokenMetadata); + relocator.calculateToFromStreams(); if (relocator.streamsNeeded()) { @@ -4243,131 +4374,191 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next()); } - private class RangeRelocator + @VisibleForTesting + public static class RangeRelocator { private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION); - - private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames) - { - calculateToFromStreams(tokens, keyspaceNames); - } - - private void calculateToFromStreams(Collection<Token> newTokens, List<String> keyspaceNames) - { - InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); - // clone to avoid concurrent modification in calculateNaturalEndpoints - TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); - - for (String keyspace : keyspaceNames) + private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + private final TokenMetadata tokenMetaCloneAllSettled; + // clone to avoid concurrent modification in calculateNaturalReplicas + private final TokenMetadata tokenMetaClone; + private final Collection<Token> tokens; + private final List<String> keyspaceNames; + + + private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd) + { + this.tokens = tokens; + this.keyspaceNames = keyspaceNames; + this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled(); + // clone to avoid concurrent modification in calculateNaturalReplicas + this.tokenMetaClone = tmd.cloneOnlyTokenMap(); + } + + @VisibleForTesting + public RangeRelocator() + { + this.tokens = null; + this.keyspaceNames = null; + this.tokenMetaCloneAllSettled = null; + this.tokenMetaClone = null; + } + + /** + * Wrapper that supplies accessors to the real implementations of the various dependencies for this method + */ + private Multimap<InetAddressAndPort, FetchReplica> calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy strategy, RangesAtEndpoint fetchRanges, String keyspace) + { + EndpointsByReplica preferredEndpoints = + RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas), + strategy, + fetchRanges, + useStrictConsistency, + tokenMetaClone, + tokenMetaCloneAllSettled, + RangeStreamer.ALIVE_PREDICATE, + keyspace, + Collections.emptyList()); + return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints); + } + + /** + * calculating endpoints to stream current ranges to if needed + * in some situations node will handle current ranges as part of the new ranges + **/ + public RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges, + AbstractReplicationStrategy strat, + TokenMetadata tmdBefore, + TokenMetadata tmdAfter) + { + RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable(); + for (Replica toStream : streamRanges) { - // replication strategy of the current keyspace - AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); - Multimap<InetAddressAndPort, Range<Token>> endpointToRanges = strategy.getAddressRanges(); - - logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); - for (Token newToken : newTokens) + //If the range we are sending is full only send it to the new full replica + //There will also be a new transient replica we need to send the data to, but not + //the repaired data + EndpointsForRange currentEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore); + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter); + logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, currentEndpoints, newEndpoints); + + for (Replica current : currentEndpoints) { - // getting collection of the currently used ranges by this keyspace - Collection<Range<Token>> currentRanges = endpointToRanges.get(localAddress); - // collection of ranges which this node will serve after move to the new token - Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); - - // ring ranges and endpoints associated with them - // this used to determine what nodes should we ping about range data - Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); - - // calculated parts of the ranges to request/stream from/to nodes in the ring - Pair<Set<Range<Token>>, Set<Range<Token>>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges); - - /** - * In this loop we are going through all ranges "to fetch" and determining - * nodes in the ring responsible for data we are interested in - */ - Multimap<Range<Token>, InetAddressAndPort> rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create(); - for (Range<Token> toFetch : rangesPerKeyspace.right) + for (Replica updated : newEndpoints) { - for (Range<Token> range : rangeAddresses.keySet()) + if (current.endpoint().equals(updated.endpoint())) { - if (range.contains(toFetch)) - { - List<InetAddressAndPort> endpoints = null; + //Nothing to do + if (current.equals(updated)) + break; - if (useStrictConsistency) + //In these two (really three) cases the existing data is sufficient and we should subtract whatever is already replicated + if (current.isFull() == updated.isFull() || current.isFull()) + { + //First subtract what we already have + Set<Range<Token>> subsToStream = toStream.range().subtract(current.range()); + //Now we only stream what is still replicated + subsToStream = subsToStream.stream().flatMap(range -> range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet()); + for (Range<Token> subrange : subsToStream) { - Set<InetAddressAndPort> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range)); - Set<InetAddressAndPort> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled)); - - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strategy.getReplicationFactor()) + //Only stream what intersects with what is in the new world + Set<Range<Token>> intersections = subrange.intersectionWith(updated.range()); + for (Range<Token> intersection : intersections) { - oldEndpoints.removeAll(newEndpoints); - - //No relocation required - if (oldEndpoints.isEmpty()) - continue; - - assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); + endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); } - - endpoints = Lists.newArrayList(oldEndpoints.iterator().next()); } - else + } + else + { + for (Range<Token> intersection : toStream.range().intersectionWith(updated.range())) { - endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range)); + endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); } - - // storing range and preferred endpoint set - rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints); } } + } + } - Collection<InetAddressAndPort> addressList = rangesToFetchWithPreferredEndpoints.get(toFetch); - if (addressList == null || addressList.isEmpty()) - continue; - - if (useStrictConsistency) + for (Replica updated : newEndpoints) + { + if (!currentEndpoints.byEndpoint().containsKey(updated.endpoint())) + { + // Completely new range for this endpoint + if (toStream.isTransient() && updated.isFull()) { - if (addressList.size() > 1) - throw new IllegalStateException("Multiple strict sources found for " + toFetch); - - InetAddressAndPort sourceIp = addressList.iterator().next(); - if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive()) - throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); + throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", updated, toStream)); + } + for (Range<Token> intersection : updated.range().intersectionWith(toStream.range())) + { + endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); } } + } + } + return endpointRanges.asImmutableView(); + } - // calculating endpoints to stream current ranges to if needed - // in some situations node will handle current ranges as part of the new ranges - Multimap<InetAddressAndPort, Range<Token>> endpointRanges = HashMultimap.create(); - for (Range<Token> toStream : rangesPerKeyspace.left) + private void calculateToFromStreams() + { + logger.debug("Current tmd " + tokenMetaClone); + logger.debug("Updated tmd " + tokenMetaCloneAllSettled); + for (String keyspace : keyspaceNames) + { + // replication strategy of the current keyspace + AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); + // getting collection of the currently used ranges by this keyspace + RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress); + + logger.info("Calculating ranges to stream and request for keyspace {}", keyspace); + //From what I have seen we only ever call this with a single token from StorageService.move(Token) + for (Token newToken : tokens) + { + Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress); + if (currentTokens.size() > 1 || currentTokens.isEmpty()) { - Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone)); - Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled)); - logger.debug("Range: {} Current endpoints: {} New endpoints: {}", toStream, currentEndpoints, newEndpoints); - for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints)) - { - logger.debug("Range {} has new owner {}", toStream, address); - endpointRanges.put(address, toStream); - } + throw new AssertionError("Unexpected current tokens: " + currentTokens); } + // collection of ranges which this node will serve after move to the new token + RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); + + // calculated parts of the ranges to request/stream from/to nodes in the ring + Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress)); + //In the single node token move there is nothing to do and Range subtraction is broken + //so it's easier to just identify this case up front. + if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort() +)).size() > 1) + { + streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas); + } + + Multimap<InetAddressAndPort, FetchReplica> workMap = calculateRangesToFetchWithPreferredEndpoints(strategy, streamAndFetchOwnRanges.right, keyspace); + + RangesByEndpoint endpointRanges = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled); + + logger.info("Endpoint ranges to stream to " + endpointRanges); + // stream ranges for (InetAddressAndPort address : endpointRanges.keySet()) { logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address); - streamPlan.transferRanges(address, keyspace, endpointRanges.get(address)); + RangesAtEndpoint ranges = endpointRanges.get(address); + streamPlan.transferRanges(address, keyspace, ranges); } // stream requests - Multimap<InetAddressAndPort, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, FailureDetector.instance, useStrictConsistency); - for (InetAddressAndPort address : workMap.keySet()) - { + workMap.asMap().forEach((address, sourceAndOurReplicas) -> { + RangesAtEndpoint full = sourceAndOurReplicas.stream() + .filter(pair -> pair.remote.isFull()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(localAddress)); + RangesAtEndpoint transientReplicas = sourceAndOurReplicas.stream() + .filter(pair -> pair.remote.isTransient()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(localAddress)); logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); - streamPlan.requestRanges(address, keyspace, workMap.get(address)); - } + streamPlan.requestRanges(address, keyspace, full, transientReplicas); + }); logger.debug("Keyspace {}: work map {}.", keyspace, workMap); } @@ -4486,14 +4677,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { // if the replication factor is 1 the data is lost so we shouldn't wait for confirmation - if (Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor() == 1) + if (Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor().allReplicas == 1) continue; // get all ranges that change ownership (that is, a node needs // to take responsibility for new range) - Multimap<Range<Token>, InetAddressAndPort> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint); + EndpointsByReplica changedRanges = getChangedReplicasForLeaving(keyspaceName, endpoint, tokenMetadata, Keyspace.open(keyspaceName).getReplicationStrategy()); IFailureDetector failureDetector = FailureDetector.instance; - for (InetAddressAndPort ep : changedRanges.values()) + for (InetAddressAndPort ep : transform(changedRanges.flattenValues(), Replica::endpoint)) { if (failureDetector.isAlive(ep)) replicatingNodes.add(ep); @@ -4903,15 +5094,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Collection<Collection<InetAddressAndPort>> endpointsGroupedByDc = new ArrayList<>(); // mapping of dc's to nodes, use sorted map so that we get dcs sorted - SortedMap<String, Collection<InetAddressAndPort>> sortedDcsToEndpoints = new TreeMap<>(); - sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap()); + SortedMap<String, Collection<InetAddressAndPort>> sortedDcsToEndpoints = new TreeMap<>(metadata.getTopology().getDatacenterEndpoints().asMap()); for (Collection<InetAddressAndPort> endpoints : sortedDcsToEndpoints.values()) endpointsGroupedByDc.add(endpoints); Map<Token, Float> tokenOwnership = tokenMetadata.partitioner.describeOwnership(tokenMetadata.sortedTokens()); LinkedHashMap<InetAddressAndPort, Float> finalOwnership = Maps.newLinkedHashMap(); - Multimap<InetAddressAndPort, Range<Token>> endpointToRanges = strategy.getAddressRanges(); + RangesByEndpoint endpointToRanges = strategy.getAddressReplicas(); // calculate ownership per dc for (Collection<InetAddressAndPort> endpoints : endpointsGroupedByDc) { @@ -4919,10 +5109,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (InetAddressAndPort endpoint : endpoints) { float ownership = 0.0f; - for (Range<Token> range : endpointToRanges.get(endpoint)) + for (Replica replica : endpointToRanges.get(endpoint)) { - if (tokenOwnership.containsKey(range.right)) - ownership += tokenOwnership.get(range.right); + if (tokenOwnership.containsKey(replica.range().right)) + ownership += tokenOwnership.get(replica.range().right); } finalOwnership.put(endpoint, ownership); } @@ -4974,9 +5164,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE UUID hostId = entry.getValue(); InetAddressAndPort endpoint = entry.getKey(); result.put(endpoint.toString(withPort), - coreViewStatus.containsKey(hostId) - ? coreViewStatus.get(hostId) - : "UNKNOWN"); + coreViewStatus.getOrDefault(hostId, "UNKNOWN")); } return Collections.unmodifiableMap(result); @@ -5079,69 +5267,63 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Seed data to the endpoints that will be responsible for it at the future + * Send data to the endpoints that will be responsible for it in the future * * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStreamByKeyspace) + private Future<StreamState> streamRanges(Map<String, EndpointsByReplica> rangesToStreamByKeyspace) { // First, we build a list of ranges to stream to each host, per table - Map<String, Map<InetAddressAndPort, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>(); + Map<String, RangesByEndpoint> sessionsToStreamByKeyspace = new HashMap<>(); - for (Map.Entry<String, Multimap<Range<Token>, InetAddressAndPort>> entry : rangesToStreamByKeyspace.entrySet()) + for (Map.Entry<String, EndpointsByReplica> entry : rangesToStreamByKeyspace.entrySet()) { String keyspace = entry.getKey(); - Multimap<Range<Token>, InetAddressAndPort> rangesWithEndpoints = entry.getValue(); + EndpointsByReplica rangesWithEndpoints = entry.getValue(); if (rangesWithEndpoints.isEmpty()) continue; + //Description is always Unbootstrap? Is that right? Map<InetAddressAndPort, Set<Range<Token>>> transferredRangePerKeyspace = SystemKeyspace.getTransferredRanges("Unbootstrap", keyspace, StorageService.instance.getTokenMetadata().partitioner); - Map<InetAddressAndPort, List<Range<Token>>> rangesPerEndpoint = new HashMap<>(); - for (Map.Entry<Range<Token>, InetAddressAndPort> endPointEntry : rangesWithEndpoints.entries()) + RangesByEndpoint.Mutable replicasPerEndpoint = new RangesByEndpoint.Mutable(); + for (Map.Entry<Replica, Replica> endPointEntry : rangesWithEndpoints.flattenEntries()) { - Range<Token> range = endPointEntry.getKey(); - InetAddressAndPort endpoint = endPointEntry.getValue(); - - Set<Range<Token>> transferredRanges = transferredRangePerKeyspace.get(endpoint); - if (transferredRanges != null && transferredRanges.contains(range)) + Replica local = endPointEntry.getKey(); + Replica remote = endPointEntry.getValue(); + Set<Range<Token>> transferredRanges = transferredRangePerKeyspace.get(remote.endpoint()); + if (transferredRanges != null && transferredRanges.contains(local.range())) { - logger.debug("Skipping transferred range {} of keyspace {}, endpoint {}", range, keyspace, endpoint); + logger.debug("Skipping transferred range {} of keyspace {}, endpoint {}", local, keyspace, remote); continue; } - List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint); - if (curRanges == null) - { - curRanges = new LinkedList<>(); - rangesPerEndpoint.put(endpoint, curRanges); - } - curRanges.add(range); + replicasPerEndpoint.put(remote.endpoint(), remote.decorateSubrange(local.range())); } - sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint); + sessionsToStreamByKeyspace.put(keyspace, replicasPerEndpoint.asImmutableView()); } StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION); - // Vinculate StreamStateStore to current StreamPlan to update transferred ranges per StreamSession + // Vinculate StreamStateStore to current StreamPlan to update transferred rangeas per StreamSession streamPlan.listeners(streamStateStore); - for (Map.Entry<String, Map<InetAddressAndPort, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet()) + for (Map.Entry<String, RangesByEndpoint> entry : sessionsToStreamByKeyspace.entrySet()) { String keyspaceName = entry.getKey(); - Map<InetAddressAndPort, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); + RangesByEndpoint replicasPerEndpoint = entry.getValue(); - for (Map.Entry<InetAddressAndPort, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> rangesEntry : replicasPerEndpoint.asMap().entrySet()) { - List<Range<Token>> ranges = rangesEntry.getValue(); + RangesAtEndpoint replicas = rangesEntry.getValue(); InetAddressAndPort newEndpoint = rangesEntry.getKey(); // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, keyspaceName, ranges); + streamPlan.transferRanges(newEndpoint, keyspaceName, replicas); } } return streamPlan.execute(); @@ -5151,53 +5333,109 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for keyspace and ranges after move to new token) * + * With transient replication the added wrinkle is that if a range transitions from full to transient then + * we need to stream the range despite the fact that we are retaining it as transient. Some replica + * somewhere needs to transition from transient to full and we wll be the source. + * + * If the range is transient and is transitioning to full then always fetch even if the range was already transient + * since a transiently replicated obviously needs to fetch data to become full. + * + * This why there is a continue after checking for instersection because intersection is not sufficient reason + * to do the subtraction since we might need to stream/fetch data anyways. + * * @param current collection of the ranges by current token * @param updated collection of the ranges after token is changed * @return pair of ranges to stream/fetch for given current and updated range collections */ - public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated) + public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint updated) { - Set<Range<Token>> toStream = new HashSet<>(); - Set<Range<Token>> toFetch = new HashSet<>(); - + // FIXME: transient replication + // this should always be the local node, except for tests TODO: assert this + RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(current.endpoint()); + RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(current.endpoint()); - for (Range<Token> r1 : current) + logger.debug("Calculating toStream"); + for (Replica r1 : current) { boolean intersect = false; - for (Range<Token> r2 : updated) + RangesAtEndpoint.Mutable remainder = null; + for (Replica r2 : updated) { - if (r1.intersects(r2)) + logger.debug("Comparing {} and {}", r1, r2); + //If we will end up transiently replicating send the entire thing and don't subtract + if (r1.intersectsOnRange(r2) && !(r1.isFull() && r2.isTransient())) { - // adding difference ranges to fetch from a ring - toStream.addAll(r1.subtract(r2)); + RangesAtEndpoint.Mutable oldRemainder = remainder; + remainder = new RangesAtEndpoint.Mutable(current.endpoint()); + if (oldRemainder != null) + { + for (Replica replica : oldRemainder) + { + remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range())); + } + } + else + { + remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range())); + } + logger.debug(" Intersects adding {}", remainder); intersect = true; } } if (!intersect) { - toStream.add(r1); // should seed whole old range + logger.debug(" Doesn't intersect adding {}", r1); + toStream.add(r1); // should stream whole old range + } + else + { + toStream.addAll(remainder); } } - for (Range<Token> r2 : updated) + logger.debug("Calculating toFetch"); + for (Replica r2 : updated) { boolean intersect = false; - for (Range<Token> r1 : current) + RangesAtEndpoint.Mutable remainder = null; + for (Replica r1 : current) { - if (r2.intersects(r1)) + logger.info("Comparing {} and {}", r2, r1); + //Transitioning from transient to full means fetch everything so intersection doesn't matter. + if (r2.intersectsOnRange(r1) && !(r1.isTransient() && r2.isFull())) { - // adding difference ranges to fetch from a ring - toFetch.addAll(r2.subtract(r1)); + RangesAtEndpoint.Mutable oldRemainder = remainder; + remainder = new RangesAtEndpoint.Mutable(current.endpoint()); + if (oldRemainder != null) + { + for (Replica replica : oldRemainder) + { + remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range())); + } + } + else + { + remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range())); + } + logger.debug(" Intersects adding {}", remainder); intersect = true; } } if (!intersect) { + logger.debug(" Doesn't intersect adding {}", r2); toFetch.add(r2); // should fetch whole old range } + else + { + toFetch.addAll(remainder); + } } - return Pair.create(toStream, toFetch); + logger.debug("To stream {}", toStream); + logger.debug("To fetch {}", toFetch); + + return Pair.create(toStream.build(), toFetch.build()); } public void bulkLoad(String directory) @@ -5233,10 +5471,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE this.keyspace = keyspace; try { - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : StorageService.instance.getRangeToAddressMap(keyspace).entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange> entry : StorageService.instance.getRangeToAddressMap(keyspace).entrySet()) { Range<Token> range = entry.getKey(); - for (InetAddressAndPort endpoint : entry.getValue()) + EndpointsForRange replicas = entry.getValue(); + Replicas.temporaryAssertFull(replicas); + for (InetAddressAndPort endpoint : replicas.endpoints()) addRangeForEndpoint(range, endpoint); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 0f4c7dd..4e6295a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -217,6 +217,8 @@ public interface StorageServiceMBean extends NotificationEmitter @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key); public List<String> getNaturalEndpointsWithPort(String keysapceName, ByteBuffer key); + public List<String> getReplicas(String keyspaceName, String cf, String key); + /** * @deprecated use {@link #takeSnapshot(String tag, Map options, String... entities)} instead. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 65efeff..a07aae6 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -17,18 +17,15 @@ */ package org.apache.cassandra.service; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.ReplicaLayout; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; /** @@ -42,26 +39,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); - public WriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistencyLevel, - Keyspace keyspace, + public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { - super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime); + super(replicaLayout, callback, writeType, queryStartNanoTime); responses = totalBlockFor(); } - public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime) - { - this(Arrays.asList(endpoint), Collections.<InetAddressAndPort>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime); - } - - public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, long queryStartNanoTime) + public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime) { - this(endpoint, writeType, null, queryStartNanoTime); + this(replicaLayout, null, writeType, queryStartNanoTime); } public void response(MessageIn<T> m) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org