http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index cb2ea46..c63f4f3 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -20,10 +20,12 @@ package org.apache.cassandra.locator; import java.util.*; import java.util.Map.Entry; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Datacenters; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.TokenMetadata.Topology; @@ -49,14 +51,17 @@ import com.google.common.collect.Multimap; */ public class NetworkTopologyStrategy extends AbstractReplicationStrategy { - private final Map<String, Integer> datacenters; + private final Map<String, ReplicationFactor> datacenters; + private final ReplicationFactor aggregateRf; private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class); public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException { super(keyspaceName, tokenMetadata, snitch, configOptions); - Map<String, Integer> newDatacenters = new HashMap<String, Integer>(); + int replicas = 0; + int trans = 0; + Map<String, ReplicationFactor> newDatacenters = new HashMap<>(); if (configOptions != null) { for (Entry<String, String> entry : configOptions.entrySet()) @@ -64,12 +69,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy String dc = entry.getKey(); if (dc.equalsIgnoreCase("replication_factor")) throw new ConfigurationException("replication_factor is an option for SimpleStrategy, not NetworkTopologyStrategy"); - Integer replicas = Integer.valueOf(entry.getValue()); - newDatacenters.put(dc, replicas); + ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue()); + replicas += rf.allReplicas; + trans += rf.transientReplicas(); + newDatacenters.put(dc, rf); } } datacenters = Collections.unmodifiableMap(newDatacenters); + aggregateRf = ReplicationFactor.withTransient(replicas, trans); logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters)); } @@ -79,7 +87,8 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy private static final class DatacenterEndpoints { /** List accepted endpoints get pushed into. */ - Set<InetAddressAndPort> endpoints; + EndpointsForRange.Mutable replicas; + /** * Racks encountered so far. Replicas are put into separate racks while possible. * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure @@ -90,41 +99,51 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy /** Number of replicas left to fill from this DC. */ int rfLeft; int acceptableRackRepeats; + int transients; - DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks) + DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, EndpointsForRange.Mutable replicas, Set<Pair<String, String>> racks) { - this.endpoints = endpoints; + this.replicas = replicas; this.racks = racks; // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF. - this.rfLeft = Math.min(rf, nodeCount); + this.rfLeft = Math.min(rf.allReplicas, nodeCount); // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack, // and the difference is to be filled by the first encountered nodes. - acceptableRackRepeats = rf - rackCount; + acceptableRackRepeats = rf.allReplicas - rackCount; + + // if we have fewer replicas than rf calls for, reduce transients accordingly + int reduceTransients = rf.allReplicas - this.rfLeft; + transients = Math.max(rf.transientReplicas() - reduceTransients, 0); + ReplicationFactor.validate(rfLeft, transients); } /** - * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful. + * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful. * Returns true if the endpoint was added, and this datacenter does not require further replicas. */ - boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location) + boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location, Range<Token> replicatedRange) { if (done()) return false; + if (replicas.endpoints().contains(ep)) + // Cannot repeat a node. + return false; + + Replica replica = new Replica(ep, replicatedRange, rfLeft > transients); + if (racks.add(location)) { // New rack. --rfLeft; - boolean added = endpoints.add(ep); - assert added; + replicas.add(replica, Conflict.NONE); return done(); } if (acceptableRackRepeats <= 0) // There must be rfLeft distinct racks left, do not add any more rack repeats. return false; - if (!endpoints.add(ep)) - // Cannot repeat a node. - return false; + + replicas.add(replica, Conflict.NONE); // Added a node that is from an already met rack to match RF when there aren't enough racks. --acceptableRackRepeats; --rfLeft; @@ -141,10 +160,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy /** * calculate endpoints in one pass through the tokens by tracking our progress in each DC. */ - public List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) + public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata) { // we want to preserve insertion order so that the first added endpoint becomes primary - Set<InetAddressAndPort> replicas = new LinkedHashSet<>(); + ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens(); + Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken); + Token replicaStart = tokenMetadata.getPredecessor(replicaEnd); + Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd); + + EndpointsForRange.Mutable builder = new EndpointsForRange.Mutable(replicatedRange); Set<Pair<String, String>> seenRacks = new HashSet<>(); Topology topology = tokenMetadata.getTopology(); @@ -158,31 +182,31 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2); // Create a DatacenterEndpoints object for each non-empty DC. - for (Map.Entry<String, Integer> en : datacenters.entrySet()) + for (Map.Entry<String, ReplicationFactor> en : datacenters.entrySet()) { String dc = en.getKey(); - int rf = en.getValue(); + ReplicationFactor rf = en.getValue(); int nodeCount = sizeOrZero(allEndpoints.get(dc)); - if (rf <= 0 || nodeCount <= 0) + if (rf.allReplicas <= 0 || nodeCount <= 0) continue; - DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks); + DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks); dcs.put(dc, dcEndpoints); ++dcsToFill; } - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); + Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, searchToken, false); while (dcsToFill > 0 && tokenIter.hasNext()) { Token next = tokenIter.next(); InetAddressAndPort ep = tokenMetadata.getEndpoint(next); Pair<String, String> location = topology.getLocation(ep); DatacenterEndpoints dcEndpoints = dcs.get(location.left); - if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location)) + if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange)) --dcsToFill; } - return new ArrayList<>(replicas); + return builder.asImmutableView(); } private int sizeOrZero(Multimap<?, ?> collection) @@ -195,18 +219,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy return collection != null ? collection.size() : 0; } - public int getReplicationFactor() + public ReplicationFactor getReplicationFactor() { - int total = 0; - for (int repFactor : datacenters.values()) - total += repFactor; - return total; + return aggregateRf; } - public int getReplicationFactor(String dc) + public ReplicationFactor getReplicationFactor(String dc) { - Integer replicas = datacenters.get(dc); - return replicas == null ? 0 : replicas; + ReplicationFactor replicas = datacenters.get(dc); + return replicas == null ? ReplicationFactor.ZERO : replicas; } public Set<String> getDatacenters()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java index 93e629e..449c51e 100644 --- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java @@ -21,9 +21,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; @@ -36,27 +36,32 @@ import org.apache.cassandra.dht.Token; */ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy { + private final ReplicationFactor rf; public OldNetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) { super(keyspaceName, tokenMetadata, snitch, configOptions); + this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor")); } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - int replicas = getReplicationFactor(); - List<InetAddressAndPort> endpoints = new ArrayList<>(replicas); ArrayList<Token> tokens = metadata.sortedTokens(); - if (tokens.isEmpty()) - return endpoints; + return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken())); Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false); Token primaryToken = iter.next(); - endpoints.add(metadata.getEndpoint(primaryToken)); + Token previousToken = metadata.getPredecessor(primaryToken); + Range<Token> tokenRange = new Range<>(previousToken, primaryToken); + + EndpointsForRange.Builder replicas = EndpointsForRange.builder(tokenRange, rf.allReplicas); + + assert !rf.hasTransientReplicas() : "support transient replicas"; + replicas.add(new Replica(metadata.getEndpoint(primaryToken), previousToken, primaryToken, true)); boolean bDataCenter = false; boolean bOtherRack = false; - while (endpoints.size() < replicas && iter.hasNext()) + while (replicas.size() < rf.allReplicas && iter.hasNext()) { // First try to find one in a different data center Token t = iter.next(); @@ -65,7 +70,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy // If we have already found something in a diff datacenter no need to find another if (!bDataCenter) { - endpoints.add(metadata.getEndpoint(t)); + replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true)); bDataCenter = true; } continue; @@ -77,7 +82,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy // If we have already found something in a diff rack no need to find another if (!bOtherRack) { - endpoints.add(metadata.getEndpoint(t)); + replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true)); bOtherRack = true; } } @@ -86,23 +91,24 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy // If we found N number of nodes we are good. This loop wil just exit. Otherwise just // loop through the list and add until we have N nodes. - if (endpoints.size() < replicas) + if (replicas.size() < rf.allReplicas) { iter = TokenMetadata.ringIterator(tokens, token, false); - while (endpoints.size() < replicas && iter.hasNext()) + while (replicas.size() < rf.allReplicas && iter.hasNext()) { Token t = iter.next(); - if (!endpoints.contains(metadata.getEndpoint(t))) - endpoints.add(metadata.getEndpoint(t)); + Replica replica = new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true); + if (!replicas.containsEndpoint(replica.endpoint())) + replicas.add(replica); } } - return endpoints; + return replicas.build(); } - public int getReplicationFactor() + public ReplicationFactor getReplicationFactor() { - return Integer.parseInt(this.configOptions.get("replication_factor")); + return rf; } public void validateOptions() throws ConfigurationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/PendingRangeMaps.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java index 92307a3..b8b7bc6 100644 --- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java +++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java @@ -23,166 +23,147 @@ package org.apache.cassandra.locator; import com.google.common.collect.Iterators; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import java.util.*; -public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>> +public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> { - private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class); - /** * We have for NavigableMap to be able to search for ranges containing a token efficiently. * * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges. */ // ascendingMap will sort the ranges by the ascending order of right token - final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap; + private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap; + /** * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap. */ - static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>() - { - @Override - public int compare(Range<Token> o1, Range<Token> o2) - { - int res = o1.right.compareTo(o2.right); - if (res != 0) - return res; + private static final Comparator<Range<Token>> ascendingComparator = (o1, o2) -> { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; - return o2.left.compareTo(o1.left); - } - }; + return o2.left.compareTo(o1.left); + }; // ascendingMap will sort the ranges by the descending order of left token - final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap; + private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap; + /** * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap. */ - static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>() - { - @Override - public int compare(Range<Token> o1, Range<Token> o2) - { - int res = o2.left.compareTo(o1.left); - if (res != 0) - return res; + private static final Comparator<Range<Token>> descendingComparator = (o1, o2) -> { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; - // if left tokens are same, sort by the descending of the right tokens. - return o2.right.compareTo(o1.right); - } - }; + // if left tokens are same, sort by the descending of the right tokens. + return o2.right.compareTo(o1.right); + }; // these two maps are for warp around ranges. - final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround; + private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMapForWrapAround; + /** * for wrap around range (begin, end], which begin > end. * Sorting end ascending, if ends are same, sorting begin ascending, * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in * the tailMap. */ - static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>() - { - @Override - public int compare(Range<Token> o1, Range<Token> o2) - { - int res = o1.right.compareTo(o2.right); - if (res != 0) - return res; + private static final Comparator<Range<Token>> ascendingComparatorForWrapAround = (o1, o2) -> { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; - return o1.left.compareTo(o2.left); - } + return o1.left.compareTo(o2.left); }; - final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround; + private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMapForWrapAround; + /** * for wrap around ranges, which begin > end. * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin, * and (begin, end) won't be selected in the tailMap. */ - static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>() - { - @Override - public int compare(Range<Token> o1, Range<Token> o2) - { - int res = o2.left.compareTo(o1.left); - if (res != 0) - return res; - return o1.right.compareTo(o2.right); - } + private static final Comparator<Range<Token>> descendingComparatorForWrapAround = (o1, o2) -> { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; + return o1.right.compareTo(o2.right); }; public PendingRangeMaps() { - this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator); - this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator); - this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround); - this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround); + this.ascendingMap = new TreeMap<>(ascendingComparator); + this.descendingMap = new TreeMap<>(descendingComparator); + this.ascendingMapForWrapAround = new TreeMap<>(ascendingComparatorForWrapAround); + this.descendingMapForWrapAround = new TreeMap<>(descendingComparatorForWrapAround); } static final void addToMap(Range<Token> range, - InetAddressAndPort address, - NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap, - NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap) + Replica replica, + NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap, + NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap) { - List<InetAddressAndPort> addresses = ascendingMap.get(range); - if (addresses == null) + EndpointsForRange.Mutable replicas = ascendingMap.get(range); + if (replicas == null) { - addresses = new ArrayList<>(1); - ascendingMap.put(range, addresses); - descendingMap.put(range, addresses); + replicas = new EndpointsForRange.Mutable(range,1); + ascendingMap.put(range, replicas); + descendingMap.put(range, replicas); } - addresses.add(address); + replicas.add(replica, Conflict.DUPLICATE); } - public void addPendingRange(Range<Token> range, InetAddressAndPort address) + public void addPendingRange(Range<Token> range, Replica replica) { if (Range.isWrapAround(range.left, range.right)) { - addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround); + addToMap(range, replica, ascendingMapForWrapAround, descendingMapForWrapAround); } else { - addToMap(range, address, ascendingMap, descendingMap); + addToMap(range, replica, ascendingMap, descendingMap); } } - static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd, - NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap, - NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap) + static final void addIntersections(EndpointsForToken.Builder replicasToAdd, + NavigableMap<Range<Token>, EndpointsForRange.Mutable> smallerMap, + NavigableMap<Range<Token>, EndpointsForRange.Mutable> biggerMap) { // find the intersection of two sets for (Range<Token> range : smallerMap.keySet()) { - List<InetAddressAndPort> addresses = biggerMap.get(range); - if (addresses != null) + EndpointsForRange.Mutable replicas = biggerMap.get(range); + if (replicas != null) { - endpointsToAdd.addAll(addresses); + replicasToAdd.addAll(replicas); } } } - public Collection<InetAddressAndPort> pendingEndpointsFor(Token token) + public EndpointsForToken pendingEndpointsFor(Token token) { - Set<InetAddressAndPort> endpoints = new HashSet<>(); + EndpointsForToken.Builder replicas = EndpointsForToken.builder(token); - Range searchRange = new Range(token, token); + Range<Token> searchRange = new Range<>(token, token); // search for non-wrap-around maps - NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true); - NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false); + NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingTailMap = ascendingMap.tailMap(searchRange, true); + NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingTailMap = descendingMap.tailMap(searchRange, false); // add intersections of two maps if (ascendingTailMap.size() < descendingTailMap.size()) { - addIntersections(endpoints, ascendingTailMap, descendingTailMap); + addIntersections(replicas, ascendingTailMap, descendingTailMap); } else { - addIntersections(endpoints, descendingTailMap, ascendingTailMap); + addIntersections(replicas, descendingTailMap, ascendingTailMap); } // search for wrap-around sets @@ -190,29 +171,29 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false); // add them since they are all necessary. - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : ascendingTailMap.entrySet()) { - endpoints.addAll(entry.getValue()); + replicas.addAll(entry.getValue()); } - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet()) + for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : descendingTailMap.entrySet()) { - endpoints.addAll(entry.getValue()); + replicas.addAll(entry.getValue()); } - return endpoints; + return replicas.build(); } public String printPendingRanges() { StringBuilder sb = new StringBuilder(); - for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this) + for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : this) { Range<Token> range = entry.getKey(); - for (InetAddressAndPort address : entry.getValue()) + for (Replica replica : entry.getValue()) { - sb.append(address).append(':').append(range); + sb.append(replica).append(':').append(range); sb.append(System.getProperty("line.separator")); } } @@ -221,7 +202,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I } @Override - public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator() + public Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator() { return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java new file mode 100644 index 0000000..74828ad --- /dev/null +++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*; + +/** + * A ReplicaCollection for Ranges occurring at an endpoint. All Replica will be for the same endpoint, + * and must be unique Ranges (though overlapping ranges are presently permitted, these should probably not be permitted to occur) + */ +public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint> +{ + private static final Map<Range<Token>, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>()); + + private final InetAddressAndPort endpoint; + private volatile Map<Range<Token>, Replica> byRange; + private volatile RangesAtEndpoint fullRanges; + private volatile RangesAtEndpoint transRanges; + + private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot) + { + this(endpoint, list, isSnapshot, null); + } + private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot, Map<Range<Token>, Replica> byRange) + { + super(list, isSnapshot); + this.endpoint = endpoint; + this.byRange = byRange; + assert endpoint != null; + } + + public InetAddressAndPort endpoint() + { + return endpoint; + } + + @Override + public Set<InetAddressAndPort> endpoints() + { + return Collections.unmodifiableSet(list.isEmpty() + ? Collections.emptySet() + : Collections.singleton(endpoint) + ); + } + + public Set<Range<Token>> ranges() + { + return byRange().keySet(); + } + + public Map<Range<Token>, Replica> byRange() + { + Map<Range<Token>, Replica> map = byRange; + if (map == null) + byRange = map = buildByRange(list); + return map; + } + + @Override + protected RangesAtEndpoint snapshot(List<Replica> subList) + { + if (subList.isEmpty()) return empty(endpoint); + return new RangesAtEndpoint(endpoint, subList, true); + } + + @Override + public RangesAtEndpoint self() + { + return this; + } + + @Override + public ReplicaCollection.Mutable<RangesAtEndpoint> newMutable(int initialCapacity) + { + return new Mutable(endpoint, initialCapacity); + } + + @Override + public boolean contains(Replica replica) + { + return replica != null + && Objects.equals( + byRange().get(replica.range()), + replica); + } + + public RangesAtEndpoint full() + { + RangesAtEndpoint coll = fullRanges; + if (fullRanges == null) + fullRanges = coll = filter(Replica::isFull); + return coll; + } + + public RangesAtEndpoint trans() + { + RangesAtEndpoint coll = transRanges; + if (transRanges == null) + transRanges = coll = filter(Replica::isTransient); + return coll; + } + + public Collection<Range<Token>> fullRanges() + { + return full().ranges(); + } + + public Collection<Range<Token>> transientRanges() + { + return trans().ranges(); + } + + public boolean contains(Range<Token> range, boolean isFull) + { + Replica replica = byRange().get(range); + return replica != null && replica.isFull() == isFull; + } + + private static Map<Range<Token>, Replica> buildByRange(List<Replica> list) + { + // TODO: implement a delegating map that uses our superclass' list, and is immutable + Map<Range<Token>, Replica> byRange = new LinkedHashMap<>(list.size()); + for (Replica replica : list) + { + Replica prev = byRange.put(replica.range(), replica); + assert prev == null : "duplicate range in RangesAtEndpoint: " + prev + " and " + replica; + } + + return Collections.unmodifiableMap(byRange); + } + + public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint) + { + return collector(ImmutableSet.of(), () -> new Builder(endpoint)); + } + + public static class Mutable extends RangesAtEndpoint implements ReplicaCollection.Mutable<RangesAtEndpoint> + { + boolean hasSnapshot; + public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); } + public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + + public void add(Replica replica, Conflict ignoreConflict) + { + if (hasSnapshot) throw new IllegalStateException(); + Preconditions.checkNotNull(replica); + if (!Objects.equals(super.endpoint, replica.endpoint())) + throw new IllegalArgumentException("Replica " + replica + " has incorrect endpoint (expected " + super.endpoint + ")"); + + Replica prev = super.byRange.put(replica.range(), replica); + if (prev != null) + { + super.byRange.put(replica.range(), prev); // restore prev + switch (ignoreConflict) + { + case DUPLICATE: + if (prev.equals(replica)) + break; + case NONE: + throw new IllegalArgumentException("Conflicting replica added (expected unique ranges): " + replica + "; existing: " + prev); + case ALL: + } + return; + } + + list.add(replica); + } + + @Override + public Map<Range<Token>, Replica> byRange() + { + // our internal map is modifiable, but it is unsafe to modify the map externally + // it would be possible to implement a safe modifiable map, but it is probably not valuable + return Collections.unmodifiableMap(super.byRange()); + } + + public RangesAtEndpoint get(boolean isSnapshot) + { + return new RangesAtEndpoint(super.endpoint, super.list, isSnapshot, Collections.unmodifiableMap(super.byRange)); + } + + public RangesAtEndpoint asImmutableView() + { + return get(false); + } + + public RangesAtEndpoint asSnapshot() + { + hasSnapshot = true; + return get(true); + } + } + + public static class Builder extends ReplicaCollection.Builder<RangesAtEndpoint, Mutable, RangesAtEndpoint.Builder> + { + public Builder(InetAddressAndPort endpoint) { this(endpoint, 0); } + public Builder(InetAddressAndPort endpoint, int capacity) { super(new Mutable(endpoint, capacity)); } + } + + public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint) + { + return new RangesAtEndpoint.Builder(endpoint); + } + + public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint, int capacity) + { + return new RangesAtEndpoint.Builder(endpoint, capacity); + } + + public static RangesAtEndpoint empty(InetAddressAndPort endpoint) + { + return new RangesAtEndpoint(endpoint, EMPTY_LIST, true, EMPTY_MAP); + } + + public static RangesAtEndpoint of(Replica replica) + { + ArrayList<Replica> one = new ArrayList<>(1); + one.add(replica); + return new RangesAtEndpoint(replica.endpoint(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.range(), replica))); + } + + public static RangesAtEndpoint of(Replica ... replicas) + { + return copyOf(Arrays.asList(replicas)); + } + + public static RangesAtEndpoint copyOf(List<Replica> replicas) + { + if (replicas.isEmpty()) + throw new IllegalArgumentException("Must specify a non-empty collection of replicas"); + return builder(replicas.get(0).endpoint(), replicas.size()).addAll(replicas).build(); + } + + + /** + * Use of this method to synthesize Replicas is almost always wrong. In repair it turns out the concerns of transient + * vs non-transient are handled at a higher level, but eventually repair needs to ask streaming to actually move + * the data and at that point it doesn't have a great handle on what the replicas are and it doesn't really matter. + * + * Streaming expects to be given Replicas with each replica indicating what type of data (transient or not transient) + * should be sent. + * + * So in this one instance we can lie to streaming and pretend all the replicas are full and use a dummy address + * and it doesn't matter because streaming doesn't rely on the address for anything other than debugging and full + * is a valid value for transientness because streaming is selecting candidate tables from the repair/unrepaired + * set already. + * @param ranges + * @return + */ + @VisibleForTesting + public static RangesAtEndpoint toDummyList(Collection<Range<Token>> ranges) + { + InetAddressAndPort dummy; + try + { + dummy = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + + //For repair we are less concerned with full vs transient since repair is already dealing with those concerns. + //Always say full and then if the repair is incremental or not will determine what is streamed. + return ranges.stream() + .map(range -> new Replica(dummy, range, true)) + .collect(collector(dummy)); + } + + /** + * @return concatenate two DISJOINT collections together + */ + public static RangesAtEndpoint concat(RangesAtEndpoint replicas, RangesAtEndpoint extraReplicas) + { + return AbstractReplicaCollection.concat(replicas, extraReplicas, NONE); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesByEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java new file mode 100644 index 0000000..698b133 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +import java.util.Collections; +import java.util.Map; + +public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, RangesAtEndpoint> +{ + public RangesByEndpoint(Map<InetAddressAndPort, RangesAtEndpoint> map) + { + super(map); + } + + public RangesAtEndpoint get(InetAddressAndPort endpoint) + { + Preconditions.checkNotNull(endpoint); + return map.getOrDefault(endpoint, RangesAtEndpoint.empty(endpoint)); + } + + public static class Mutable extends ReplicaMultimap.Mutable<InetAddressAndPort, RangesAtEndpoint.Mutable> + { + @Override + protected RangesAtEndpoint.Mutable newMutable(InetAddressAndPort endpoint) + { + return new RangesAtEndpoint.Mutable(endpoint); + } + + public RangesByEndpoint asImmutableView() + { + return new RangesByEndpoint(Collections.unmodifiableMap(Maps.transformValues(map, RangesAtEndpoint.Mutable::asImmutableView))); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replica.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java new file mode 100644 index 0000000..37b6050 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/Replica.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.Objects; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A Replica represents an owning node for a copy of a portion of the token ring. + * + * It consists of: + * - the logical token range that is being replicated (i.e. for the first logical replica only, this will be equal + * to one of its owned portions of the token ring; all other replicas will have this token range also) + * - an endpoint (IP and port) + * - whether the range is replicated in full, or transiently (CASSANDRA-14404) + * + * In general, it is preferred to use a Replica to a Range<Token>, particularly when users of the concept depend on + * knowledge of the full/transient status of the copy. + * + * That means you should avoid unwrapping and rewrapping these things and think hard about subtraction + * and such and what the result is WRT to transientness. Definitely avoid creating fake Replicas with misinformation + * about endpoints, ranges, or transientness. + */ +public final class Replica implements Comparable<Replica> +{ + private final Range<Token> range; + private final InetAddressAndPort endpoint; + private final boolean full; + + public Replica(InetAddressAndPort endpoint, Range<Token> range, boolean full) + { + Preconditions.checkNotNull(endpoint); + Preconditions.checkNotNull(range); + this.endpoint = endpoint; + this.range = range; + this.full = full; + } + + public Replica(InetAddressAndPort endpoint, Token start, Token end, boolean full) + { + this(endpoint, new Range<>(start, end), full); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Replica replica = (Replica) o; + return full == replica.full && + Objects.equals(endpoint, replica.endpoint) && + Objects.equals(range, replica.range); + } + + @Override + public int compareTo(Replica o) + { + int c = range.compareTo(o.range); + if (c == 0) + c = endpoint.compareTo(o.endpoint); + if (c == 0) + c = Boolean.compare(full, o.full); + return c; + } + + public int hashCode() + { + return Objects.hash(endpoint, range, full); + } + + @Override + public String toString() + { + return (full ? "Full" : "Transient") + '(' + endpoint() + ',' + range + ')'; + } + + public final InetAddressAndPort endpoint() + { + return endpoint; + } + + public boolean isLocal() + { + return endpoint.equals(FBUtilities.getBroadcastAddressAndPort()); + } + + public Range<Token> range() + { + return range; + } + + public boolean isFull() + { + return full; + } + + public final boolean isTransient() + { + return !isFull(); + } + + /** + * This is used exclusively in TokenMetadata to check if a portion of a range is already replicated + * by an endpoint so that we only mark as pending the portion that is either not replicated sufficiently (transient + * when we need full) or at all. + * + * If it's not replicated at all it needs to be pending because there is no data. + * If it's replicated but only transiently and we need to replicate it fully it must be marked as pending until it + * is available fully otherwise a read might treat this replica as full and not read from a full replica that has + * the data. + */ + public RangesAtEndpoint subtractSameReplication(RangesAtEndpoint toSubtract) + { + Set<Range<Token>> subtractedRanges = range().subtractAll(toSubtract.filter(r -> r.isFull() == isFull()).ranges()); + RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, subtractedRanges.size()); + for (Range<Token> range : subtractedRanges) + { + result.add(decorateSubrange(range)); + } + return result.build(); + } + + /** + * Don't use this method and ignore transient status unless you are explicitly handling it outside this method. + * + * This helper method is used by StorageService.calculateStreamAndFetchRanges to perform subtraction. + * It ignores transient status because it's already being handled in calculateStreamAndFetchRanges. + */ + public RangesAtEndpoint subtractIgnoreTransientStatus(Range<Token> subtract) + { + Set<Range<Token>> ranges = this.range.subtract(subtract); + RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, ranges.size()); + for (Range<Token> subrange : ranges) + result.add(decorateSubrange(subrange)); + return result.build(); + } + + public boolean contains(Range<Token> that) + { + return range().contains(that); + } + + public boolean intersectsOnRange(Replica replica) + { + return range().intersects(replica.range()); + } + + public Replica decorateSubrange(Range<Token> subrange) + { + Preconditions.checkArgument(range.contains(subrange)); + return new Replica(endpoint(), subrange, isFull()); + } + + public static Replica fullReplica(InetAddressAndPort endpoint, Range<Token> range) + { + return new Replica(endpoint, range, true); + } + + public static Replica fullReplica(InetAddressAndPort endpoint, Token start, Token end) + { + return fullReplica(endpoint, new Range<>(start, end)); + } + + public static Replica transientReplica(InetAddressAndPort endpoint, Range<Token> range) + { + return new Replica(endpoint, range, false); + } + + public static Replica transientReplica(InetAddressAndPort endpoint, Token start, Token end) + { + return transientReplica(endpoint, new Range<>(start, end)); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java new file mode 100644 index 0000000..6833f4b --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; + +/** + * A collection like class for Replica objects. Represents both a well defined order on the contained Replica objects, + * and efficient methods for accessing the contained Replicas, directly and as a projection onto their endpoints and ranges. + */ +public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Iterable<Replica> +{ + /** + * @return a Set of the endpoints of the contained Replicas. + * Iteration order is maintained where there is a 1:1 relationship between endpoint and Replica + * Typically this collection offers O(1) access methods, and this is true for all but ReplicaList. + */ + public abstract Set<InetAddressAndPort> endpoints(); + + /** + * @param i a value in the range [0..size()) + * @return the i'th Replica, in our iteration order + */ + public abstract Replica get(int i); + + /** + * @return the number of Replica contained + */ + public abstract int size(); + + /** + * @return true iff size() == 0 + */ + public abstract boolean isEmpty(); + + /** + * @return true iff a Replica in this collection is equal to the provided Replica. + * Typically this method is expected to take O(1) time, and this is true for all but ReplicaList. + */ + public abstract boolean contains(Replica replica); + + /** + * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate. + * An effort will be made to either return ourself, or a subList, where possible. + * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + */ + public abstract C filter(Predicate<Replica> predicate); + + /** + * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate. + * An effort will be made to either return ourself, or a subList, where possible. + * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + * Only the first maxSize items will be returned. + */ + public abstract C filter(Predicate<Replica> predicate, int maxSize); + + /** + * @return an *eagerly constructed* copy of this collection containing the Replica at positions [start..end); + * An effort will be made to either return ourself, or a subList, where possible. + * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + */ + public abstract C subList(int start, int end); + + /** + * @return an *eagerly constructed* copy of this collection containing the Replica re-ordered according to this comparator + * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + */ + public abstract C sorted(Comparator<Replica> comparator); + + public abstract Iterator<Replica> iterator(); + public abstract Stream<Replica> stream(); + + public abstract boolean equals(Object o); + public abstract int hashCode(); + public abstract String toString(); + + /** + * A mutable extension of a ReplicaCollection. This is append-only, so it is safe to select a subList, + * or at any time take an asImmutableView() snapshot. + */ + public interface Mutable<C extends ReplicaCollection<C>> extends ReplicaCollection<C> + { + /** + * @return an Immutable clone that mirrors any modifications to this Mutable instance. + */ + C asImmutableView(); + + /** + * @return an Immutable clone that assumes this Mutable will never be modified again. + * If this is not true, behaviour is undefined. + */ + C asSnapshot(); + + enum Conflict { NONE, DUPLICATE, ALL} + + /** + * @param replica add this replica to the end of the collection + * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics) + */ + void add(Replica replica, Conflict ignoreConflict); + + default public void add(Replica replica) + { + add(replica, Conflict.NONE); + } + + default public void addAll(Iterable<Replica> replicas, Conflict ignoreConflicts) + { + for (Replica replica : replicas) + add(replica, ignoreConflicts); + } + + default public void addAll(Iterable<Replica> replicas) + { + addAll(replicas, Conflict.NONE); + } + } + + public static class Builder<C extends ReplicaCollection<C>, M extends Mutable<C>, B extends Builder<C, M, B>> + { + Mutable<C> mutable; + public Builder(Mutable<C> mutable) { this.mutable = mutable; } + + public int size() { return mutable.size(); } + public B add(Replica replica) { mutable.add(replica); return (B) this; } + public B add(Replica replica, Conflict ignoreConflict) { mutable.add(replica, ignoreConflict); return (B) this; } + public B addAll(Iterable<Replica> replica) { mutable.addAll(replica); return (B) this; } + public B addAll(Iterable<Replica> replica, Conflict ignoreConflict) { mutable.addAll(replica, ignoreConflict); return (B) this; } + + public C build() + { + C result = mutable.asSnapshot(); + mutable = null; + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java new file mode 100644 index 0000000..946a7f8 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.utils.FBUtilities; + +import static com.google.common.collect.Iterables.any; + +/** + * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors + * for building the relevant layout. + * + * Constitutes: + * - the 'natural' replicas replicating the range or token relevant for the operation + * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates + * - the 'selected' replicas, those that should be targeted for any operation + * - 'all' replicas represents natural+pending + * + * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange) + * @param <L> the type of itself, including its type parameters, for return type of modifying methods + */ +public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> +{ + private volatile E all; + protected final E natural; + protected final E pending; + protected final E selected; + + protected final Keyspace keyspace; + protected final ConsistencyLevel consistencyLevel; + + private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected) + { + this(keyspace, consistencyLevel, natural, pending, selected, null); + } + + private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all) + { + assert selected != null; + assert pending == null || !Endpoints.haveConflicts(natural, pending); + this.keyspace = keyspace; + this.consistencyLevel = consistencyLevel; + this.natural = natural; + this.pending = pending; + this.selected = selected; + // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural + if (all == null && pending == null) + all = natural; + this.all = all; + } + + public Replica getReplicaFor(InetAddressAndPort endpoint) + { + return natural.byEndpoint().get(endpoint); + } + + public E natural() + { + return natural; + } + + public E all() + { + E result = all; + if (result == null) + all = result = Endpoints.concat(natural, pending); + return result; + } + + public E selected() + { + return selected; + } + + /** + * @return the pending replicas - will be null for read layouts + * TODO: ideally we would enforce at compile time that read layouts have no pending to access + */ + public E pending() + { + return pending; + } + + public int blockFor() + { + return pending == null + ? consistencyLevel.blockFor(keyspace) + : consistencyLevel.blockForWrite(keyspace, pending); + } + + public Keyspace keyspace() + { + return keyspace; + } + + public ConsistencyLevel consistencyLevel() + { + return consistencyLevel; + } + + abstract public L withSelected(E replicas); + + abstract public L withConsistencyLevel(ConsistencyLevel cl); + + public L forNaturalUncontacted() + { + E more; + if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + { + IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch; + String localDC = DatabaseDescriptor.getLocalDataCenter(); + + more = natural.filter(replica -> !selected.contains(replica) && + snitch.getDatacenter(replica).equals(localDC)); + } else + { + more = natural.filter(replica -> !selected.contains(replica)); + } + + return withSelected(more); + } + + public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange> + { + public final AbstractBounds<PartitionPosition> range; + + @VisibleForTesting + public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected) + { + // Range queries do not contact pending replicas + super(keyspace, consistencyLevel, natural, null, selected); + this.range = range; + } + + @Override + public ForRange withSelected(EndpointsForRange newSelected) + { + return new ForRange(keyspace, consistencyLevel, range, natural, newSelected); + } + + @Override + public ForRange withConsistencyLevel(ConsistencyLevel cl) + { + return new ForRange(keyspace, cl, range, natural, selected); + } + } + + public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken> + { + public final Token token; + + @VisibleForTesting + public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected) + { + super(keyspace, consistencyLevel, natural, pending, selected); + this.token = token; + } + + public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all) + { + super(keyspace, consistencyLevel, natural, pending, selected, all); + this.token = token; + } + + public ForToken withSelected(EndpointsForToken newSelected) + { + return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected); + } + + @Override + public ForToken withConsistencyLevel(ConsistencyLevel cl) + { + return new ForToken(keyspace, cl, token, natural, pending, selected); + } + } + + public static class ForPaxos extends ForToken + { + private final int requiredParticipants; + + private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all) + { + super(keyspace, consistencyLevel, token, natural, pending, selected, all); + this.requiredParticipants = requiredParticipants; + } + + public int getRequiredParticipants() + { + return requiredParticipants; + } + } + + public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica) + { + EndpointsForToken singleReplica = EndpointsForToken.of(token, replica); + return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica); + } + + public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica) + { + EndpointsForRange singleReplica = EndpointsForRange.of(replica); + return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica); + } + + public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica) + { + return forSingleReplica(keyspace, token, replica); + } + + public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException + { + // A single case we write not for range or token, but multiple mutations to many tokens + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints)); + EndpointsForToken pending = EndpointsForToken.empty(token); + ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + + return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending); + } + + public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException + { + return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue()); + } + + public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException + { + EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token); + EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName()); + return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive); + } + + public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException + { + return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue()); + } + + public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException + { + if (Endpoints.haveConflicts(natural, pending)) + { + natural = Endpoints.resolveConflictsInNatural(natural, pending); + pending = Endpoints.resolveConflictsInPending(natural, pending); + } + + if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient)) + { + EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint())); + return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected); + } + + return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive); + } + + public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + { + Token tk = key.getToken(); + EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk); + EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName()); + if (Endpoints.haveConflicts(natural, pending)) + { + natural = Endpoints.resolveConflictsInNatural(natural, pending); + pending = Endpoints.resolveConflictsInPending(natural, pending); + } + + // TODO CASSANDRA-14547 + Replicas.temporaryAssertFull(natural); + Replicas.temporaryAssertFull(pending); + + if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) + { + // Restrict natural and pending to node in the local DC only + String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica)); + + natural = natural.filter(isLocalDc); + pending = pending.filter(isLocalDc); + } + + int participants = pending.size() + natural.size(); + int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 + + EndpointsForToken all = Endpoints.concat(natural, pending); + EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive); + if (selected.size() < requiredParticipants) + throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size()); + + // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. + // Note that we fake an impossible number of required nodes in the unavailable exception + // to nail home the point that it's an impossible operation no matter how many nodes are live. + if (pending.size() > 1) + throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()), + consistencyForPaxos, + participants + 1, + selected.size()); + + return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all); + } + + /** + * We want to send mutations to as many full replicas as we can, and just as many transient replicas + * as we need to meet blockFor. + */ + @VisibleForTesting + public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException + { + EndpointsForToken all = Endpoints.concat(natural, pending); + EndpointsForToken selected = all + .select() + .add(r -> r.isFull() && livePredicate.test(r.endpoint())) + .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor) + .get(); + + consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending); + + return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all); + } + + public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) + { + EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token); + EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)); + + // Throw UAE early if we don't have enough replicas. + consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected); + + return new ForToken(keyspace, consistencyLevel, token, natural, null, selected); + } + + public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected) + { + return new ForRange(keyspace, consistencyLevel, range, natural, selected); + } + + public String toString() + { + return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]"; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaMultimap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaMultimap.java b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java new file mode 100644 index 0000000..3e3fcb4 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +public abstract class ReplicaMultimap<K, C extends ReplicaCollection<?>> +{ + final Map<K, C> map; + ReplicaMultimap(Map<K, C> map) + { + this.map = map; + } + + public abstract C get(K key); + public C getIfPresent(K key) { return map.get(key); } + + public static abstract class Mutable + <K, MutableCollection extends ReplicaCollection.Mutable<?>> + extends ReplicaMultimap<K, MutableCollection> + { + protected abstract MutableCollection newMutable(K key); + + Mutable() + { + super(new HashMap<>()); + } + + public MutableCollection get(K key) + { + Preconditions.checkNotNull(key); + return map.computeIfAbsent(key, k -> newMutable(key)); + } + + public void put(K key, Replica replica) + { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(replica); + get(key).add(replica); + } + } + + public Iterable<Replica> flattenValues() + { + return Iterables.concat(map.values()); + } + + public Iterable<Map.Entry<K, Replica>> flattenEntries() + { + return () -> { + Stream<Map.Entry<K, Replica>> s = map.entrySet() + .stream() + .flatMap(entry -> entry.getValue() + .stream() + .map(replica -> (Map.Entry<K, Replica>)new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), replica))); + return s.iterator(); + }; + } + + public boolean isEmpty() + { + return map.isEmpty(); + } + + public boolean containsKey(Object key) + { + return map.containsKey(key); + } + + public Set<K> keySet() + { + return map.keySet(); + } + + public Set<Map.Entry<K, C>> entrySet() + { + return map.entrySet(); + } + + public Map<K, C> asMap() + { + return map; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplicaMultimap<?, ?> that = (ReplicaMultimap<?, ?>) o; + return Objects.equals(map, that.map); + } + + public int hashCode() + { + return map.hashCode(); + } + + @Override + public String toString() + { + return map.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java new file mode 100644 index 0000000..299e6ec --- /dev/null +++ b/src/java/org/apache/cassandra/locator/Replicas.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.Iterables; + +import static com.google.common.collect.Iterables.all; + +public class Replicas +{ + + public static int countFull(ReplicaCollection<?> liveReplicas) + { + int count = 0; + for (Replica replica : liveReplicas) + if (replica.isFull()) + ++count; + return count; + } + + /** + * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future + */ + public static void temporaryAssertFull(Replica replica) + { + if (!replica.isFull()) + { + throw new UnsupportedOperationException("transient replicas are currently unsupported: " + replica); + } + } + + /** + * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future + */ + public static void temporaryAssertFull(Iterable<Replica> replicas) + { + if (!all(replicas, Replica::isFull)) + { + throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas)); + } + } + + /** + * For areas of the code that should never see a transient replica + */ + public static void assertFull(Iterable<Replica> replicas) + { + if (!all(replicas, Replica::isFull)) + { + throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas)); + } + } + + public static List<String> stringify(ReplicaCollection<?> replicas, boolean withPort) + { + List<String> stringEndpoints = new ArrayList<>(replicas.size()); + for (Replica replica: replicas) + { + stringEndpoints.add(replica.endpoint().getHostAddress(withPort)); + } + return stringEndpoints; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicationFactor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java b/src/java/org/apache/cassandra/locator/ReplicationFactor.java new file mode 100644 index 0000000..c0ed31f --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.utils.FBUtilities; + +public class ReplicationFactor +{ + public static final ReplicationFactor ZERO = new ReplicationFactor(0); + + public final int allReplicas; + public final int fullReplicas; + + private ReplicationFactor(int allReplicas, int transientReplicas) + { + validate(allReplicas, transientReplicas); + this.allReplicas = allReplicas; + this.fullReplicas = allReplicas - transientReplicas; + } + + public int transientReplicas() + { + return allReplicas - fullReplicas; + } + + public boolean hasTransientReplicas() + { + return allReplicas != fullReplicas; + } + + private ReplicationFactor(int allReplicas) + { + this(allReplicas, 0); + } + + static void validate(int totalRF, int transientRF) + { + Preconditions.checkArgument(transientRF == 0 || DatabaseDescriptor.isTransientReplicationEnabled(), + "Transient replication is not enabled on this node"); + Preconditions.checkArgument(totalRF >= 0, + "Replication factor must be non-negative, found %s", totalRF); + Preconditions.checkArgument(transientRF == 0 || transientRF < totalRF, + "Transient replicas must be zero, or less than total replication factor. For %s/%s", totalRF, transientRF); + if (transientRF > 0) + { + Preconditions.checkArgument(DatabaseDescriptor.getNumTokens() == 1, + "Transient nodes are not allowed with multiple tokens"); + Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream()); + List<InetAddressAndPort> badVersionEndpoints = endpoints.filter(Predicates.not(FBUtilities.getBroadcastAddressAndPort()::equals)) + .filter(endpoint -> Gossiper.instance.getReleaseVersion(endpoint) != null && Gossiper.instance.getReleaseVersion(endpoint).major < 4) + .collect(Collectors.toList()); + if (!badVersionEndpoints.isEmpty()) + throw new AssertionError("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints); + } + else if (transientRF < 0) + { + throw new AssertionError(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF)); + } + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplicationFactor that = (ReplicationFactor) o; + return allReplicas == that.allReplicas && fullReplicas == that.fullReplicas; + } + + public int hashCode() + { + return Objects.hash(allReplicas, fullReplicas); + } + + public static ReplicationFactor fullOnly(int totalReplicas) + { + return new ReplicationFactor(totalReplicas); + } + + public static ReplicationFactor withTransient(int totalReplicas, int transientReplicas) + { + return new ReplicationFactor(totalReplicas, transientReplicas); + } + + public static ReplicationFactor fromString(String s) + { + if (s.contains("/")) + { + String[] parts = s.split("/"); + Preconditions.checkArgument(parts.length == 2, + "Replication factor format is <replicas> or <replicas>/<transient>"); + return new ReplicationFactor(Integer.valueOf(parts[0]), Integer.valueOf(parts[1])); + } + else + { + return new ReplicationFactor(Integer.valueOf(s), 0); + } + } + + @Override + public String toString() + { + return "rf(" + allReplicas + (hasTransientReplicas() ? '/' + transientReplicas() : "") + ')'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java index e31fc6b..d605b6e 100644 --- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java +++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.locator; -import java.util.List; - /** * A simple endpoint snitch implementation that treats Strategy order as proximity, * allowing non-read-repaired reads to prefer a single endpoint, which improves @@ -37,12 +35,14 @@ public class SimpleSnitch extends AbstractEndpointSnitch } @Override - public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress) { // Optimization to avoid walking the list + return unsortedAddress; } - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + @Override + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2) { // Making all endpoints equal ensures we won't change the original ordering (since // Collections.sort is guaranteed to be stable) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org