http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 110fed6..e8aa5d3 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -18,27 +18,40 @@ package org.apache.cassandra.dht; import java.util.*; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; +import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.EndpointsByReplica; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.locator.EndpointsByRange; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PreviewKind; @@ -47,13 +60,25 @@ import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.FBUtilities; +import static com.google.common.base.Predicates.and; +import static com.google.common.base.Predicates.not; +import static com.google.common.collect.Iterables.all; +import static com.google.common.collect.Iterables.any; +import static org.apache.cassandra.locator.Replica.fullReplica; + /** - * Assists in streaming ranges to a node. + * Assists in streaming ranges to this node. */ public class RangeStreamer { private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class); + public static Predicate<Replica> ALIVE_PREDICATE = replica -> + (!Gossiper.instance.isEnabled() || + (Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()) == null || + Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()).isAlive())) && + FailureDetector.instance.isAlive(replica.endpoint()); + /* bootstrap tokens. can be null if replacing the node. */ private final Collection<Token> tokens; /* current token ring */ @@ -62,26 +87,59 @@ public class RangeStreamer private final InetAddressAndPort address; /* streaming description */ private final String description; - private final Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = HashMultimap.create(); - private final Set<ISourceFilter> sourceFilters = new HashSet<>(); + private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create(); + private final Set<Predicate<Replica>> sourceFilters = new HashSet<>(); private final StreamPlan streamPlan; private final boolean useStrictConsistency; private final IEndpointSnitch snitch; private final StreamStateStore stateStore; - /** - * A filter applied to sources to stream from when constructing a fetch map. - */ - public static interface ISourceFilter + public static class FetchReplica { - public boolean shouldInclude(InetAddressAndPort endpoint); + public final Replica local; + public final Replica remote; + + public FetchReplica(Replica local, Replica remote) + { + Preconditions.checkNotNull(local); + Preconditions.checkNotNull(remote); + assert local.isLocal() && !remote.isLocal(); + this.local = local; + this.remote = remote; + } + + public String toString() + { + return "FetchReplica{" + + "local=" + local + + ", remote=" + remote + + '}'; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FetchReplica that = (FetchReplica) o; + + if (!local.equals(that.local)) return false; + return remote.equals(that.remote); + } + + public int hashCode() + { + int result = local.hashCode(); + result = 31 * result + remote.hashCode(); + return result; + } } /** * Source filter which excludes any endpoints that are not alive according to a * failure detector. */ - public static class FailureDetectorSourceFilter implements ISourceFilter + public static class FailureDetectorSourceFilter implements Predicate<Replica> { private final IFailureDetector fd; @@ -90,16 +148,16 @@ public class RangeStreamer this.fd = fd; } - public boolean shouldInclude(InetAddressAndPort endpoint) + public boolean apply(Replica replica) { - return fd.isAlive(endpoint); + return fd.isAlive(replica.endpoint()); } } /** * Source filter which excludes any endpoints that are not in a specific data center. */ - public static class SingleDatacenterFilter implements ISourceFilter + public static class SingleDatacenterFilter implements Predicate<Replica> { private final String sourceDc; private final IEndpointSnitch snitch; @@ -110,27 +168,27 @@ public class RangeStreamer this.snitch = snitch; } - public boolean shouldInclude(InetAddressAndPort endpoint) + public boolean apply(Replica replica) { - return snitch.getDatacenter(endpoint).equals(sourceDc); + return snitch.getDatacenter(replica).equals(sourceDc); } } /** * Source filter which excludes the current node from source calculations */ - public static class ExcludeLocalNodeFilter implements ISourceFilter + public static class ExcludeLocalNodeFilter implements Predicate<Replica> { - public boolean shouldInclude(InetAddressAndPort endpoint) + public boolean apply(Replica replica) { - return !FBUtilities.getBroadcastAddressAndPort().equals(endpoint); + return !replica.isLocal(); } } /** * Source filter which only includes endpoints contained within a provided set. */ - public static class WhitelistedSourcesFilter implements ISourceFilter + public static class WhitelistedSourcesFilter implements Predicate<Replica> { private final Set<InetAddressAndPort> whitelistedSources; @@ -139,9 +197,9 @@ public class RangeStreamer this.whitelistedSources = whitelistedSources; } - public boolean shouldInclude(InetAddressAndPort endpoint) + public boolean apply(Replica replica) { - return whitelistedSources.contains(endpoint); + return whitelistedSources.contains(replica.endpoint()); } } @@ -167,7 +225,7 @@ public class RangeStreamer streamPlan.listeners(this.stateStore); } - public void addSourceFilter(ISourceFilter filter) + public void addSourceFilter(Predicate<Replica> filter) { sourceFilters.add(filter); } @@ -176,80 +234,95 @@ public class RangeStreamer * Add ranges to be streamed for given keyspace. * * @param keyspaceName keyspace name - * @param ranges ranges to be streamed + * @param replicas ranges to be streamed */ - public void addRanges(String keyspaceName, Collection<Range<Token>> ranges) + public void addRanges(String keyspaceName, ReplicaCollection<?> replicas) { - if(Keyspace.open(keyspaceName).getReplicationStrategy() instanceof LocalStrategy) + Keyspace keyspace = Keyspace.open(keyspaceName); + AbstractReplicationStrategy strat = keyspace.getReplicationStrategy(); + if(strat instanceof LocalStrategy) { logger.info("Not adding ranges for Local Strategy keyspace={}", keyspaceName); return; } - boolean useStrictSource = useStrictSourcesForRanges(keyspaceName); - Multimap<Range<Token>, InetAddressAndPort> rangesForKeyspace = useStrictSource - ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges); + boolean useStrictSource = useStrictSourcesForRanges(strat); + EndpointsByReplica fetchMap = calculateRangesToFetchWithPreferredEndpoints(replicas, keyspace, useStrictSource); - for (Map.Entry<Range<Token>, InetAddressAndPort> entry : rangesForKeyspace.entries()) + for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries()) logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName); - AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); - Multimap<InetAddressAndPort, Range<Token>> rangeFetchMap = useStrictSource || strat == null || strat.getReplicationFactor() == 1 - ? getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency) - : getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName); - for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangeFetchMap.asMap().entrySet()) + Multimap<InetAddressAndPort, FetchReplica> workMap; + //Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no + //transient replicas. + if (useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas()) + { + workMap = convertPreferredEndpointsToWorkMap(fetchMap); + } + else + { + workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName); + } + + toFetch.put(keyspaceName, workMap); + for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet()) { if (logger.isTraceEnabled()) { - for (Range<Token> r : entry.getValue()) - logger.trace("{}: range {} from source {} for keyspace {}", description, r, entry.getKey(), keyspaceName); + for (FetchReplica r : entry.getValue()) + logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName); } - toFetch.put(keyspaceName, entry); } } /** - * @param keyspaceName keyspace name to check + * @param strat AbstractReplicationStrategy of keyspace to check * @return true when the node is bootstrapping, useStrictConsistency is true and # of nodes in the cluster is more than # of replica */ - private boolean useStrictSourcesForRanges(String keyspaceName) + private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat) { - AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); return useStrictConsistency && tokens != null - && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor(); + && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor().allReplicas; } /** - * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges - * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress. - * - * @throws java.lang.IllegalStateException when there is no source to get data streamed + * Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters + * @param fetchRanges + * @param keyspace + * @param useStrictConsistency + * @return */ - private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges) + private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency) { - AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); - Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap()); + AbstractReplicationStrategy strat = keyspace.getReplicationStrategy(); - Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create(); - for (Range<Token> desiredRange : desiredRanges) - { - for (Range<Token> range : rangeAddresses.keySet()) - { - if (range.contains(desiredRange)) - { - List<InetAddressAndPort> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range)); - rangeSources.putAll(desiredRange, preferred); - break; - } - } + TokenMetadata tmd = metadata.cloneOnlyTokenMap(); + + TokenMetadata tmdAfter = null; - if (!rangeSources.keySet().contains(desiredRange)) - throw new IllegalStateException("No sources found for " + desiredRange); + if (tokens != null) + { + // Pending ranges + tmdAfter = tmd.cloneOnlyTokenMap(); + tmdAfter.updateNormalTokens(tokens, address); } + else if (useStrictConsistency) + { + throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens"); + } + + return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity, + strat, + fetchRanges, + useStrictConsistency, + tmd, + tmdAfter, + ALIVE_PREDICATE, + keyspace.getName(), + sourceFilters); - return rangeSources; } /** @@ -257,129 +330,234 @@ public class RangeStreamer * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. * - * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found. - */ - private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges) + **/ + public static EndpointsByReplica + calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity, + AbstractReplicationStrategy strat, + ReplicaCollection<?> fetchRanges, + boolean useStrictConsistency, + TokenMetadata tmdBefore, + TokenMetadata tmdAfter, + Predicate<Replica> isAlive, + String keyspace, + Collection<Predicate<Replica>> sourceFilters) { - assert tokens != null; - AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); - - // Active ranges - TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); - Multimap<Range<Token>, InetAddressAndPort> addressRanges = strat.getRangeAddresses(metadataClone); + EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - // Pending ranges - metadataClone.updateNormalTokens(tokens, address); - Multimap<Range<Token>, InetAddressAndPort> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); + InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + logger.debug ("Keyspace: {}", keyspace); + logger.debug("To fetch RN: {}", fetchRanges); + logger.debug("Fetch ranges: {}", rangeAddresses); - // Collects the source that will have its range moved to the new node - Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create(); + Predicate<Replica> testSourceFilters = and(sourceFilters); + Function<EndpointsForRange, EndpointsForRange> sorted = + endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - for (Range<Token> desiredRange : desiredRanges) + //This list of replicas is just candidates. With strict consistency it's going to be a narrow list. + EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); + for (Replica toFetch : fetchRanges) { - for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> preEntry : addressRanges.asMap().entrySet()) + //Replica that is sufficient to provide the data we need + //With strict consistency and transient replication we may end up with multiple types + //so this isn't used with strict consistency + Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull()); + Predicate<Replica> accept = r -> + isSufficient.test(r) // is sufficient + && !r.endpoint().equals(localAddress) // is not self + && isAlive.test(r); // is alive + + logger.debug("To fetch {}", toFetch); + for (Range<Token> range : rangeAddresses.keySet()) { - if (preEntry.getKey().contains(desiredRange)) + if (range.contains(toFetch.range())) { - Set<InetAddressAndPort> oldEndpoints = Sets.newHashSet(preEntry.getValue()); - Set<InetAddressAndPort> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); + EndpointsForRange oldEndpoints = rangeAddresses.get(range); - // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - // So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strat.getReplicationFactor()) + //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch + //It could be multiple endpoints and we must fetch from all of them if they are there + //With transient replication and strict consistency this is to get the full data from a full replica and + //transient data from the transient replica losing data + EndpointsForRange sources; + if (useStrictConsistency) + { + //Start with two sets of who replicates the range before and who replicates it after + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); + logger.debug("Old endpoints {}", oldEndpoints); + logger.debug("New endpoints {}", newEndpoints); + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) + { + Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints(); + // Remove new endpoints from old endpoints based on address + oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); + + if (!all(oldEndpoints, isAlive)) + throw new IllegalStateException("A node required to move the data consistently is down: " + + oldEndpoints.filter(not(isAlive))); + + if (oldEndpoints.size() > 1) + throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); + + //If we are transitioning from transient to full and and the set of replicas for the range is not changing + //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely + //since we are already a transient replica and the existing replica remains. + //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. + //So it's an error if we don't find what we need. + if (oldEndpoints.isEmpty() && toFetch.isTransient()) + { + throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); + } + + if (!any(oldEndpoints, isSufficient)) + { + // need an additional replica + EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range)); + // include all our filters, to ensure we include a matching node + Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil(); + if (fullReplica.isPresent()) + oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get())); + else + throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange); + } + + //We have to check the source filters here to see if they will remove any replicas + //required for strict consistency + if (!all(oldEndpoints, testSourceFilters)) + throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters))); + } + else + { + oldEndpoints = sorted.apply(oldEndpoints.filter(accept)); + } + + //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case + sources = oldEndpoints.filter(testSourceFilters); + } + else { - oldEndpoints.removeAll(newEndpoints); - assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); + //Without strict consistency we have given up on correctness so no point in fetching from + //a random full + transient replica since it's also likely to lose data + //Also apply testSourceFilters that were given to us so we can safely select a single source + sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters))); + //Limit it to just the first possible source, we don't need more than one and downstream + //will fetch from every source we supply + sources = sources.size() > 0 ? sources.subList(0, 1) : sources; } - rangeSources.put(desiredRange, oldEndpoints.iterator().next()); + // storing range and preferred endpoint set + rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE); + logger.debug("Endpoints to fetch for {} are {}", toFetch, sources); } } - // Validate - Collection<InetAddressAndPort> addressList = rangeSources.get(desiredRange); - if (addressList == null || addressList.isEmpty()) - throw new IllegalStateException("No sources found for " + desiredRange); - - if (addressList.size() > 1) - throw new IllegalStateException("Multiple endpoints found for " + desiredRange); + EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch); + if (addressList == null) + throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch); + + /* + * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses + * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica + * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain. + * For a transient range we only need to fetch from one. + */ + if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1)) + throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList)); + + //We must have enough stuff to fetch from + if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty()) + { + if (strat.getReplicationFactor().allReplicas == 1) + { + if (useStrictConsistency) + { + logger.warn("A node required to move the data consistently is down"); + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " + + "Ensure this keyspace contains replicas in the source datacenter."); + } + else + logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " + + "Keyspace might be missing data.", toFetch, keyspace); - InetAddressAndPort sourceIp = addressList.iterator().next(); - EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp); - if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive())) - throw new RuntimeException("A node required to move the data consistently is down (" + sourceIp + "). " + - "If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); + } + else + { + if (useStrictConsistency) + logger.warn("A node required to move the data consistently is down"); + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace); + } + } } - - return rangeSources; + return rangesToFetchWithPreferredEndpoints.asImmutableView(); } /** - * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) - * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given - * here, we always exclude ourselves. - * @param keyspace keyspace name - * @return Map of source endpoint to collection of ranges + * The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source + * endpoint we will fetch from which streaming wants. + * @param preferredEndpoints + * @return */ - private static Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, - Collection<ISourceFilter> sourceFilters, String keyspace, - boolean useStrictConsistency) + public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints) { - Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = HashMultimap.create(); - for (Range<Token> range : rangesWithSources.keySet()) + Multimap<InetAddressAndPort, FetchReplica> workMap = HashMultimap.create(); + for (Map.Entry<Replica, EndpointsForRange> e : preferredEndpoints.entrySet()) { - boolean foundSource = false; - - outer: - for (InetAddressAndPort address : rangesWithSources.get(range)) + for (Replica source : e.getValue()) { - for (ISourceFilter filter : sourceFilters) - { - if (!filter.shouldInclude(address)) - continue outer; - } + assert (e.getKey()).isLocal(); + assert !source.isLocal(); + workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source)); + } + } + logger.debug("Work map {}", workMap); + return workMap; + } - if (address.equals(FBUtilities.getBroadcastAddressAndPort())) - { - // If localhost is a source, we have found one, but we don't add it to the map to avoid streaming locally - foundSource = true; - continue; - } + /** + * Optimized version that also outputs the final work map + */ + private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources, + Collection<Predicate<Replica>> sourceFilters, String keyspace) + { + //For now we just aren't going to use the optimized range fetch map with transient replication to shrink + //the surface area to test and introduce bugs. + //In the future it's possible we could run it twice once for full ranges with only full replicas + //and once with transient ranges and all replicas. Then merge the result. + EndpointsByRange.Mutable unwrapped = new EndpointsByRange.Mutable(); + for (Map.Entry<Replica, Replica> entry : rangesWithSources.flattenEntries()) + { + Replicas.temporaryAssertFull(entry.getValue()); + unwrapped.put(entry.getKey().range(), entry.getValue()); + } - rangeFetchMapMap.put(address, range); - foundSource = true; - break; // ensure we only stream from one other node for each range - } + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace); + Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap(); + logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace); + validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace); - if (!foundSource) + //Need to rewrap as Replicas + Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create(); + for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries()) + { + Replica toFetch = null; + for (Replica r : rangesWithSources.keySet()) { - AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); - if (strat != null && strat.getReplicationFactor() == 1) + if (r.range().equals(entry.getValue())) { - if (useStrictConsistency) - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1. " + - "Ensure this keyspace contains replicas in the source datacenter."); - else - logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " + - "Keyspace might be missing data.", range, keyspace); + if (toFetch != null) + throw new AssertionError(String.format("There shouldn't be multiple replicas for range %s, replica %s and %s here", r.range(), r, toFetch)); + toFetch = r; } - else - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace); } + if (toFetch == null) + throw new AssertionError("Shouldn't be possible for the Replica we fetch to be null here"); + //Committing the cardinal sin of synthesizing a Replica, but it's ok because we assert earlier all of them + //are full and optimized range fetch map doesn't support transient replication yet. + wrapped.put(entry.getKey(), new FetchReplica(toFetch, fullReplica(entry.getKey(), entry.getValue()))); } - return rangeFetchMapMap; - } - - - private static Multimap<InetAddressAndPort, Range<Token>> getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, - Collection<ISourceFilter> sourceFilters, String keyspace) - { - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, sourceFilters, keyspace); - Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap(); - logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace); - validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace); - return rangeFetchMapMap; + return wrapped; } /** @@ -388,7 +566,7 @@ public class RangeStreamer * @param rangeFetchMapMap * @param keyspace */ - private static void validateRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace) + private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace) { for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries()) { @@ -398,7 +576,7 @@ public class RangeStreamer + " in keyspace " + keyspace); } - if (!rangesWithSources.get(entry.getValue()).contains(entry.getKey())) + if (!rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey())) { throw new IllegalStateException("Trying to stream from wrong endpoint. Range: " + entry.getValue() + " in keyspace " + keyspace + " from endpoint: " + entry.getKey()); @@ -408,39 +586,70 @@ public class RangeStreamer } } - public static Multimap<InetAddressAndPort, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSourceTarget, String keyspace, - IFailureDetector fd, boolean useStrictConsistency) - { - return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace, useStrictConsistency); - } - // For testing purposes @VisibleForTesting - Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch() + Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch() { return toFetch; } public StreamResultFuture fetchAsync() { - for (Map.Entry<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> entry : toFetch.entries()) - { - String keyspace = entry.getKey(); - InetAddressAndPort source = entry.getValue().getKey(); - Collection<Range<Token>> ranges = entry.getValue().getValue(); + toFetch.forEach((keyspace, sources) -> { + logger.debug("Keyspace {} Sources {}", keyspace, sources); + sources.asMap().forEach((source, fetchReplicas) -> { - // filter out already streamed ranges - Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner); - if (ranges.removeAll(availableRanges)) - { - logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges); - } + // filter out already streamed ranges + RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner); - if (logger.isTraceEnabled()) - logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", ")); - /* Send messages to respective folks to stream data over to me */ - streamPlan.requestRanges(source, keyspace, ranges); - } + Predicate<FetchReplica> isAvailable = fetch -> { + Replica availableRange = available.byRange().get(fetch.local.range()); + if (availableRange == null) + //Range is unavailable + return false; + if (fetch.local.isFull()) + //For full, pick only replicas with matching transientness + return availableRange.isFull() == fetch.remote.isFull(); + + // Any transient or full will do + return true; + }; + + List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList()); + + if (remaining.size() < available.size()) + { + List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList()); + logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}", + fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges()); + } + + if (logger.isTraceEnabled()) + logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", ")); + + //At the other end the distinction between full and transient is ignored it just used the transient status + //of the Replica objects we send to determine what to send. The real reason we have this split down to + //StreamRequest is that on completion StreamRequest is used to write to the system table tracking + //what has already been streamed. At that point since we only have the local Replica instances so we don't + //know what we got from the remote. We preserve that here by splitting based on the remotes transient + //status. + InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); + RangesAtEndpoint full = remaining.stream() + .filter(pair -> pair.remote.isFull()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(self)); + RangesAtEndpoint transientReplicas = remaining.stream() + .filter(pair -> pair.remote.isTransient()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(self)); + + logger.debug("Source and our replicas {}", fetchReplicas); + logger.debug("Source {} Keyspace {} streaming full {} transient {}", source, keyspace, full, transientReplicas); + + /* Send messages to respective folks to stream data over to me */ + streamPlan.requestRanges(source, keyspace, full, transientReplicas); + }); + }); return streamPlan.execute(); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Splitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java index c63fe91..8578448 100644 --- a/src/java/org/apache/cassandra/dht/Splitter.java +++ b/src/java/org/apache/cassandra/dht/Splitter.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import com.google.common.annotations.VisibleForTesting; @@ -117,32 +118,31 @@ public abstract class Splitter return new BigDecimal(elapsedTokens(token, range)).divide(new BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue(); } - public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges) + public List<Token> splitOwnedRanges(int parts, List<WeightedRange> weightedRanges, boolean dontSplitRanges) { - if (localRanges.isEmpty() || parts == 1) + if (weightedRanges.isEmpty() || parts == 1) return Collections.singletonList(partitioner.getMaximumToken()); BigInteger totalTokens = BigInteger.ZERO; - for (Range<Token> r : localRanges) + for (WeightedRange weightedRange : weightedRanges) { - BigInteger right = valueForToken(token(r.right)); - totalTokens = totalTokens.add(right.subtract(valueForToken(r.left))); + totalTokens = totalTokens.add(weightedRange.totalTokens(this)); } + BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts)); // the range owned is so tiny we can't split it: if (perPart.equals(BigInteger.ZERO)) return Collections.singletonList(partitioner.getMaximumToken()); if (dontSplitRanges) - return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts); + return splitOwnedRangesNoPartialRanges(weightedRanges, perPart, parts); List<Token> boundaries = new ArrayList<>(); BigInteger sum = BigInteger.ZERO; - for (Range<Token> r : localRanges) + for (WeightedRange weightedRange : weightedRanges) { - Token right = token(r.right); - BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs(); - BigInteger left = valueForToken(r.left); + BigInteger currentRangeWidth = weightedRange.totalTokens(this); + BigInteger left = valueForToken(weightedRange.left()); while (sum.add(currentRangeWidth).compareTo(perPart) >= 0) { BigInteger withinRangeBoundary = perPart.subtract(sum); @@ -155,26 +155,24 @@ public abstract class Splitter } boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken()); - assert boundaries.size() == parts : boundaries.size() + "!=" + parts + " " + boundaries + ":" + localRanges; + assert boundaries.size() == parts : boundaries.size() + "!=" + parts + " " + boundaries + ":" + weightedRanges; return boundaries; } - private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> localRanges, BigInteger perPart, int parts) + private List<Token> splitOwnedRangesNoPartialRanges(List<WeightedRange> weightedRanges, BigInteger perPart, int parts) { List<Token> boundaries = new ArrayList<>(parts); BigInteger sum = BigInteger.ZERO; int i = 0; - final int rangesCount = localRanges.size(); + final int rangesCount = weightedRanges.size(); while (boundaries.size() < parts - 1 && i < rangesCount - 1) { - Range<Token> r = localRanges.get(i); - Range<Token> nextRange = localRanges.get(i + 1); - Token right = token(r.right); - Token nextRight = token(nextRange.right); + WeightedRange r = weightedRanges.get(i); + WeightedRange nextRange = weightedRanges.get(i + 1); - BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)); - BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left)); + BigInteger currentRangeWidth = r.totalTokens(this); + BigInteger nextRangeWidth = nextRange.totalTokens(this); sum = sum.add(currentRangeWidth); // does this or next range take us beyond the per part limit? @@ -187,7 +185,7 @@ public abstract class Splitter if (diffNext.compareTo(diffCurrent) >= 0) { sum = BigInteger.ZERO; - boundaries.add(right); + boundaries.add(token(r.right())); } } i++; @@ -256,4 +254,61 @@ public abstract class Splitter } return subranges; } + + public static class WeightedRange + { + private final double weight; + private final Range<Token> range; + + public WeightedRange(double weight, Range<Token> range) + { + this.weight = weight; + this.range = range; + } + + public BigInteger totalTokens(Splitter splitter) + { + BigInteger right = splitter.valueForToken(splitter.token(range.right)); + BigInteger left = splitter.valueForToken(range.left); + BigInteger factor = BigInteger.valueOf(Math.max(1, (long) (1 / weight))); + BigInteger size = right.subtract(left); + return size.abs().divide(factor); + } + + public Token left() + { + return range.left; + } + + public Token right() + { + return range.right; + } + + public Range<Token> range() + { + return range; + } + + public String toString() + { + return "WeightedRange{" + + "weight=" + weight + + ", range=" + range + + '}'; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof WeightedRange)) return false; + WeightedRange that = (WeightedRange) o; + return Objects.equals(range, that.range); + } + + public int hashCode() + { + return Objects.hash(weight, range); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/StreamStateStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java index e3ea838..3144e81 100644 --- a/src/java/org/apache/cassandra/dht/StreamStateStore.java +++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java @@ -19,38 +19,43 @@ package org.apache.cassandra.dht; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamRequest; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.Pair; /** * Store and update available ranges (data already received) to system keyspace. */ public class StreamStateStore implements StreamEventHandler { - public Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) + private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class); + + public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner) { return SystemKeyspace.getAvailableRanges(keyspace, partitioner); } /** - * Check if given token's data is available in this node. + * Check if given token's data is available in this node. This doesn't handle transientness in a useful way + * so it's only used by a legacy test * * @param keyspace keyspace name * @param token token to check * @return true if given token in the keyspace is already streamed and ready to be served. */ + @VisibleForTesting public boolean isDataAvailable(String keyspace, Token token) { - Set<Range<Token>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); - for (Range<Token> range : availableRanges) - { - if (range.contains(token)) - return true; - } - return false; + RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); + return availableRanges.ranges().stream().anyMatch(range -> range.contains(token)); } /** @@ -73,7 +78,7 @@ public class StreamStateStore implements StreamEventHandler } for (StreamRequest request : se.requests) { - SystemKeyspace.updateAvailableRanges(request.keyspace, request.ranges); + SystemKeyspace.updateAvailableRanges(request.keyspace, request.full.ranges(), request.transientReplicas.ranges()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java index efd2766..36fc8c2 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java @@ -73,7 +73,7 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit> Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups); if (groups.size() < replicas) { - // We need at least replicas groups to do allocation correctly. If there aren't enough, + // We need at least replicas groups to do allocation correctly. If there aren't enough, // use random allocation. // This part of the code should only be reached via the RATATest. StrategyAdapter should disallow // token allocation in this case as the algorithm is not able to cover the behavior of NetworkTopologyStrategy. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index 61082df..ef91fbb 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -113,7 +113,7 @@ public class TokenAllocation { double size = current.size(next); Token representative = current.getPartitioner().midpoint(current, next); - for (InetAddressAndPort n : rs.calculateNaturalEndpoints(representative, tokenMetadata)) + for (InetAddressAndPort n : rs.calculateNaturalReplicas(representative, tokenMetadata).endpoints()) { Double v = ownership.get(n); ownership.put(n, v != null ? v + size : size); @@ -169,7 +169,7 @@ public class TokenAllocation static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final SimpleStrategy rs, final InetAddressAndPort endpoint) { - final int replicas = rs.getReplicationFactor(); + final int replicas = rs.getReplicationFactor().allReplicas; return new StrategyAdapter() { @@ -196,7 +196,7 @@ public class TokenAllocation static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final InetAddressAndPort endpoint) { final String dc = snitch.getDatacenter(endpoint); - final int replicas = rs.getReplicationFactor(dc); + final int replicas = rs.getReplicationFactor(dc).allReplicas; if (replicas == 0 || replicas == 1) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/exceptions/UnavailableException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/UnavailableException.java b/src/java/org/apache/cassandra/exceptions/UnavailableException.java index 7b4edd8..d6e8488 100644 --- a/src/java/org/apache/cassandra/exceptions/UnavailableException.java +++ b/src/java/org/apache/cassandra/exceptions/UnavailableException.java @@ -25,14 +25,26 @@ public class UnavailableException extends RequestExecutionException public final int required; public final int alive; - public UnavailableException(ConsistencyLevel consistency, int required, int alive) + public static UnavailableException create(ConsistencyLevel consistency, int required, int alive) { - this("Cannot achieve consistency level " + consistency, consistency, required, alive); + assert alive < required; + return create(consistency, required, 0, alive, 0); } - public UnavailableException(ConsistencyLevel consistency, String dc, int required, int alive) + public static UnavailableException create(ConsistencyLevel consistency, int required, int requiredFull, int alive, int aliveFull) { - this("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive); + if (required > alive) + return new UnavailableException("Cannot achieve consistency level " + consistency, consistency, required, alive); + assert requiredFull < aliveFull; + return new UnavailableException("Insufficient full replicas", consistency, required, alive); + } + + public static UnavailableException create(ConsistencyLevel consistency, String dc, int required, int requiredFull, int alive, int aliveFull) + { + if (required > alive) + return new UnavailableException("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive); + assert requiredFull < aliveFull; + return new UnavailableException("Insufficient full replicas in DC " + dc, consistency, required, alive); } public UnavailableException(String msg, ConsistencyLevel consistency, int required, int alive) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 5646bf6..8546a70 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -144,6 +144,11 @@ public class EndpointState return rpcState != null && Boolean.parseBoolean(rpcState.value); } + public boolean isNormalState() + { + return getStatus().equals(VersionedValue.STATUS_NORMAL); + } + public String getStatus() { VersionedValue status = getApplicationState(ApplicationState.STATUS_WITH_PORT); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 0cd1278..c6ad3d9 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -20,11 +20,14 @@ package org.apache.cassandra.hints; import java.io.File; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -39,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.metrics.StorageMetrics; @@ -46,9 +50,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; -import static com.google.common.collect.Iterables.size; /** * A singleton-ish wrapper over various hints components: @@ -151,7 +153,7 @@ public final class HintsService implements HintsServiceMBean * @param hostIds host ids of the hint's target nodes * @param hint the hint to store */ - public void write(Iterable<UUID> hostIds, Hint hint) + public void write(Collection<UUID> hostIds, Hint hint) { if (isShutDown) throw new IllegalStateException("HintsService is shut down and can't accept new hints"); @@ -161,7 +163,7 @@ public final class HintsService implements HintsServiceMBean bufferPool.write(hostIds, hint); - StorageMetrics.totalHints.inc(size(hostIds)); + StorageMetrics.totalHints.inc(hostIds.size()); } /** @@ -183,9 +185,14 @@ public final class HintsService implements HintsServiceMBean String keyspaceName = hint.mutation.getKeyspaceName(); Token token = hint.mutation.key().getToken(); - Iterable<UUID> hostIds = - transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint), - StorageService.instance::getHostIdForEndpoint); + EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token); + + // judicious use of streams: eagerly materializing probably cheaper + // than performing filters / translations 2x extra via Iterables.filter/transform + List<UUID> hostIds = replicas.stream() + .filter(StorageProxy::shouldHint) + .map(replica -> StorageService.instance.getHostIdForEndpoint(replica.endpoint())) + .collect(Collectors.toList()); write(hostIds, hint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 044a00b..4eaf1fe 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -69,13 +69,14 @@ abstract class AbstractSSTableSimpleWriter implements Closeable SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS); if (makeRangeAware) - return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, formatType, 0, header); + return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, formatType, 0, header); return SSTableTxnWriter.create(metadata, createDescriptor(directory, metadata.keyspace, metadata.name, formatType), 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, + false, 0, header, Collections.emptySet()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index f2605fb..055bf24 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; +import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; import com.google.common.collect.Sets; @@ -46,6 +47,9 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.memory.HeapAllocator; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + /** * This class is built on top of the SequenceFile. It stores * data on disk in sorted fashion. However the sorting is upto @@ -350,4 +354,13 @@ public abstract class SSTable { return AbstractBounds.bounds(first.getToken(), true, last.getToken(), true); } + + public static void validateRepairedMetadata(long repairedAt, UUID pendingRepair, boolean isTransient) + { + Preconditions.checkArgument((pendingRepair == NO_PENDING_REPAIR) || (repairedAt == UNREPAIRED_SSTABLE), + "pendingRepair cannot be set on a repaired sstable"); + Preconditions.checkArgument(!isTransient || (pendingRepair != NO_PENDING_REPAIR), + "isTransient can only be true for sstables pending repair"); + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 4ba0533..ec2a700 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -126,7 +126,7 @@ public class SSTableLoader implements StreamEventHandler for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : ranges.entrySet()) { InetAddressAndPort endpoint = entry.getKey(); - Collection<Range<Token>> tokenRanges = entry.getValue(); + List<Range<Token>> tokenRanges = Range.normalize(entry.getValue()); List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges); long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 60b8962..cfb1365 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -99,10 +99,10 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem } @SuppressWarnings("resource") // log and writer closed during doPostCleanup - public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header) { LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); - SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, txn); + SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn); return new SSTableTxnWriter(txn, writer); } @@ -112,6 +112,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, SSTableFormat.Type type, int sstableLevel, SerializationHeader header) @@ -122,7 +123,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem SSTableMultiWriter writer; try { - writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, type, sstableLevel, 0, txn, header); + writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, isTransient, type, sstableLevel, 0, txn, header); } catch (IOException e) { @@ -140,6 +141,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, int sstableLevel, SerializationHeader header, Collection<Index> indexes) @@ -147,12 +149,12 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem // if the column family store does not exist, we create a new default SSTableMultiWriter to use: LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel); - SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn); + SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn); return new SSTableTxnWriter(txn, writer); } - public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header) { - return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, header); + return create(cfs, desc, keyCount, repairedAt, pendingRepair, isTransient, 0, header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index a40ec18..eb5c5fe 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -111,13 +111,14 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn); + SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, indexes, txn); return new SimpleSSTableMultiWriter(writer, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index f289fe3..29fa573 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@ -44,6 +44,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter private final long estimatedKeys; private final long repairedAt; private final UUID pendingRepair; + private final boolean isTransient; private final SSTableFormat.Type format; private final SerializationHeader header; private final LifecycleTransaction txn; @@ -53,7 +54,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter private final List<SSTableReader> finishedReaders = new ArrayList<>(); private SSTableMultiWriter currentWriter = null; - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException + public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException { DiskBoundaries db = cfs.getDiskBoundaries(); directories = db.directories; @@ -62,6 +63,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter this.estimatedKeys = estimatedKeys / directories.size(); this.repairedAt = repairedAt; this.pendingRepair = pendingRepair; + this.isTransient = isTransient; this.format = format; this.txn = txn; this.header = header; @@ -73,7 +75,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn); } } @@ -95,7 +97,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter finishedWriters.add(currentWriter); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 2fade21..edb3afa 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1852,6 +1852,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return sstableMetadata.repairedAt; } + public boolean isTransient() + { + return sstableMetadata.isTransient; + } + public boolean intersects(Collection<Range<Token>> ranges) { Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 1e183e2..cca59cf 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -55,6 +55,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional { protected long repairedAt; protected UUID pendingRepair; + protected boolean isTransient; protected long maxDataAge = -1; protected final long keyCount; protected final MetadataCollector metadataCollector; @@ -77,6 +78,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, @@ -86,6 +88,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional this.keyCount = keyCount; this.repairedAt = repairedAt; this.pendingRepair = pendingRepair; + this.isTransient = isTransient; this.metadataCollector = metadataCollector; this.header = header; this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header); @@ -96,6 +99,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional Long keyCount, Long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, @@ -103,20 +107,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional LifecycleTransaction txn) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); + return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); } public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, int sstableLevel, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, indexes, txn); + return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, txn); } public static SSTableWriter create(TableMetadataRef metadata, @@ -124,13 +129,14 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, int sstableLevel, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn); + return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn); } @VisibleForTesting @@ -138,11 +144,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - return create(descriptor, keyCount, repairedAt, pendingRepair, 0, header, indexes, txn); + return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, txn); } private static Set<Component> components(TableMetadata metadata) @@ -309,6 +316,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional metadata().params.bloomFilterFpChance, repairedAt, pendingRepair, + isTransient, header); } @@ -338,6 +346,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index 1d965ce..9b82c14 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -55,6 +55,8 @@ public abstract class Version public abstract boolean hasPendingRepair(); + public abstract boolean hasIsTransient(); + public abstract boolean hasMetadataChecksum(); /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org