Github user belliottsmith commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/269#discussion_r218383055
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -337,165 +364,167 @@ else if (useStrictConsistency)
boolean
useStrictConsistency,
TokenMetadata tmdBefore,
TokenMetadata tmdAfter,
- Predicate<Replica>
isAlive,
String keyspace,
-
Collection<Predicate<Replica>> sourceFilters)
- {
- EndpointsByRange rangeAddresses =
strat.getRangeAddresses(tmdBefore);
-
- InetAddressAndPort localAddress =
FBUtilities.getBroadcastAddressAndPort();
- logger.debug ("Keyspace: {}", keyspace);
- logger.debug("To fetch RN: {}", fetchRanges);
- logger.debug("Fetch ranges: {}", rangeAddresses);
-
- Predicate<Replica> testSourceFilters = and(sourceFilters);
- Function<EndpointsForRange, EndpointsForRange> sorted =
- endpoints ->
snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
- //This list of replicas is just candidates. With strict
consistency it's going to be a narrow list.
- EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints =
new EndpointsByReplica.Mutable();
- for (Replica toFetch : fetchRanges)
- {
- //Replica that is sufficient to provide the data we need
- //With strict consistency and transient replication we may end
up with multiple types
- //so this isn't used with strict consistency
- Predicate<Replica> isSufficient = r -> (toFetch.isTransient()
|| r.isFull());
- Predicate<Replica> accept = r ->
- isSufficient.test(r) // is
sufficient
- && !r.endpoint().equals(localAddress) // is not self
- && isAlive.test(r); // is alive
-
- logger.debug("To fetch {}", toFetch);
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(toFetch.range()))
- {
- EndpointsForRange oldEndpoints =
rangeAddresses.get(range);
-
- //Ultimately we populate this with whatever is going
to be fetched from to satisfy toFetch
- //It could be multiple endpoints and we must fetch
from all of them if they are there
- //With transient replication and strict consistency
this is to get the full data from a full replica and
- //transient data from the transient replica losing data
- EndpointsForRange sources;
- if (useStrictConsistency)
- {
- //Start with two sets of who replicates the range
before and who replicates it after
- EndpointsForRange newEndpoints =
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
- logger.debug("Old endpoints {}", oldEndpoints);
- logger.debug("New endpoints {}", newEndpoints);
-
- //Due to CASSANDRA-5953 we can have a higher RF
then we have endpoints.
- //So we need to be careful to only be strict when
endpoints == RF
- if (oldEndpoints.size() ==
strat.getReplicationFactor().allReplicas)
- {
- Set<InetAddressAndPort>
endpointsStillReplicated = newEndpoints.endpoints();
- // Remove new endpoints from old endpoints
based on address
- oldEndpoints = oldEndpoints.filter(r ->
!endpointsStillReplicated.contains(r.endpoint()));
-
- if (!all(oldEndpoints, isAlive))
- throw new IllegalStateException("A node
required to move the data consistently is down: "
- +
oldEndpoints.filter(not(isAlive)));
-
- if (oldEndpoints.size() > 1)
- throw new AssertionError("Expected <= 1
endpoint but found " + oldEndpoints);
-
- //If we are transitioning from transient to
full and and the set of replicas for the range is not changing
- //we might end up with no endpoints to fetch
from by address. In that case we can pick any full replica safely
- //since we are already a transient replica and
the existing replica remains.
- //The old behavior where we might be asked to
fetch ranges we don't need shouldn't occur anymore.
- //So it's an error if we don't find what we
need.
- if (oldEndpoints.isEmpty() &&
toFetch.isTransient())
- {
- throw new AssertionError("If there are no
endpoints to fetch from then we must be transitioning from transient to full
for range " + toFetch);
- }
-
- if (!any(oldEndpoints, isSufficient))
- {
- // need an additional replica
- EndpointsForRange endpointsForRange =
sorted.apply(rangeAddresses.get(range));
- // include all our filters, to ensure we
include a matching node
- Optional<Replica> fullReplica =
Iterables.<Replica>tryFind(endpointsForRange, and(accept,
testSourceFilters)).toJavaUtil();
- if (fullReplica.isPresent())
- oldEndpoints =
Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
- else
- throw new
IllegalStateException("Couldn't find any matching sufficient replica out of " +
endpointsForRange);
- }
-
- //We have to check the source filters here to
see if they will remove any replicas
- //required for strict consistency
- if (!all(oldEndpoints, testSourceFilters))
- throw new IllegalStateException("Necessary
replicas for strict consistency were removed by source filters: " +
oldEndpoints.filter(not(testSourceFilters)));
- }
- else
- {
- oldEndpoints =
sorted.apply(oldEndpoints.filter(accept));
- }
-
- //Apply testSourceFilters that were given to us,
and establish everything remaining is alive for the strict case
- sources = oldEndpoints.filter(testSourceFilters);
- }
- else
- {
- //Without strict consistency we have given up on
correctness so no point in fetching from
- //a random full + transient replica since it's
also likely to lose data
- //Also apply testSourceFilters that were given to
us so we can safely select a single source
- sources =
sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
- //Limit it to just the first possible source, we
don't need more than one and downstream
- //will fetch from every source we supply
- sources = sources.size() > 0 ? sources.subList(0,
1) : sources;
- }
-
- // storing range and preferred endpoint set
- rangesToFetchWithPreferredEndpoints.putAll(toFetch,
sources, Conflict.NONE);
- logger.debug("Endpoints to fetch for {} are {}",
toFetch, sources);
- }
- }
-
- EndpointsForRange addressList =
rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
- if (addressList == null)
- throw new IllegalStateException("Failed to find endpoints
to fetch " + toFetch);
-
- /*
- * When we move forwards (shrink our bucket) we are the one
losing a range and no one else loses
- * from that action (we also don't gain). When we move
backwards there are two people losing a range. One is a full replica
- * and the other is a transient replica. So we must need fetch
from two places in that case for the full range we gain.
- * For a transient range we only need to fetch from one.
- */
- if (useStrictConsistency && addressList.size() > 1 &&
(addressList.filter(Replica::isFull).size() > 1 ||
addressList.filter(Replica::isTransient).size() > 1))
- throw new IllegalStateException(String.format("Multiple
strict sources found for %s, sources: %s", toFetch, addressList));
-
- //We must have enough stuff to fetch from
- if ((toFetch.isFull() && !any(addressList, Replica::isFull))
|| addressList.isEmpty())
- {
- if (strat.getReplicationFactor().allReplicas == 1)
- {
- if (useStrictConsistency)
- {
- logger.warn("A node required to move the data
consistently is down");
- throw new IllegalStateException("Unable to find
sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace
+ " with RF=1. " +
- "Ensure this
keyspace contains replicas in the source datacenter.");
- }
- else
- logger.warn("Unable to find sufficient sources for
streaming range {} in keyspace {} with RF=1. " +
- "Keyspace might be missing data.",
toFetch, keyspace);
-
- }
- else
- {
- if (useStrictConsistency)
- logger.warn("A node required to move the data
consistently is down");
- throw new IllegalStateException("Unable to find
sufficient sources for streaming range " + toFetch + " in keyspace " +
keyspace);
- }
- }
- }
- return rangesToFetchWithPreferredEndpoints.asImmutableView();
- }
+ Collection<SourceFilter>
sourceFilters)
+ {
+ EndpointsByRange rangeAddresses =
strat.getRangeAddresses(tmdBefore);
+
+ InetAddressAndPort localAddress =
FBUtilities.getBroadcastAddressAndPort();
+ logger.debug ("Keyspace: {}", keyspace);
+ logger.debug("To fetch RN: {}", fetchRanges);
+ logger.debug("Fetch ranges: {}", rangeAddresses);
+
+ Predicate<Replica> testSourceFilters = and(sourceFilters);
+ Function<EndpointsForRange, EndpointsForRange> sorted =
+ endpoints -> snitchGetSortedListByProximity.apply(localAddress,
endpoints);
+
+ //This list of replicas is just candidates. With strict
consistency it's going to be a narrow list.
+ EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints =
new EndpointsByReplica.Mutable();
+ for (Replica toFetch : fetchRanges)
+ {
+ //Replica that is sufficient to provide the data we need
+ //With strict consistency and transient replication we may
end up with multiple types
+ //so this isn't used with strict consistency
+ Predicate<Replica> isSufficient = r -> (toFetch.isTransient()
|| r.isFull());
+
+ logger.debug("To fetch {}", toFetch);
+ for (Range<Token> range : rangeAddresses.keySet())
+ {
+ if (!range.contains(toFetch.range()))
+ continue;
+
+ EndpointsForRange oldEndpoints =
rangeAddresses.get(range);
+
+ //Ultimately we populate this with whatever is going to
be fetched from to satisfy toFetch
+ //It could be multiple endpoints and we must fetch from
all of them if they are there
+ //With transient replication and strict consistency this
is to get the full data from a full replica and
+ //transient data from the transient replica losing data
+ EndpointsForRange sources;
+ if (useStrictConsistency)
+ {
+ //Start with two sets of who replicates the range
before and who replicates it after
+ EndpointsForRange newEndpoints =
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
+ //Due to CASSANDRA-5953 we can have a higher RF then
we have endpoints.
+ //So we need to be careful to only be strict when
endpoints == RF
+ if (oldEndpoints.size() ==
strat.getReplicationFactor().allReplicas)
+ {
+ Set<InetAddressAndPort> endpointsStillReplicated
= newEndpoints.endpoints();
+ // Remove new endpoints from old endpoints based
on address
+ oldEndpoints = oldEndpoints.filter(r ->
!endpointsStillReplicated.contains(r.endpoint()));
+
+ if (oldEndpoints.size() > 1)
+ throw new AssertionError("Expected <= 1
endpoint but found " + oldEndpoints);
+
+ //If we are transitioning from transient to full
and and the set of replicas for the range is not changing
+ //we might end up with no endpoints to fetch from
by address. In that case we can pick any full replica safely
+ //since we are already a transient replica and
the existing replica remains.
+ //The old behavior where we might be asked to
fetch ranges we don't need shouldn't occur anymore.
+ //So it's an error if we don't find what we need.
+ if (oldEndpoints.isEmpty() &&
toFetch.isTransient())
+ {
+ throw new AssertionError("If there are no
endpoints to fetch from then we must be transitioning from transient to full
for range " + toFetch);
+ }
+
+ if (!any(oldEndpoints, isSufficient))
+ {
+ // need an additional replica
+ EndpointsForRange endpointsForRange =
sorted.apply(rangeAddresses.get(range));
+ // include all our filters, to ensure we
include a matching node
+ Optional<Replica> fullReplica =
Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient,
testSourceFilters)).toJavaUtil();
+ if (fullReplica.isPresent())
+ oldEndpoints =
Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
+ else
+ throw new IllegalStateException("Couldn't
find any matching sufficient replica out of " + endpointsForRange);
+ }
+
+ //We have to check the source filters here to see
if they will remove any replicas
+ //required for strict consistency
+ if (!all(oldEndpoints, testSourceFilters))
+ {
+ StringBuilder failureMessage = new
StringBuilder();
+ for (Replica r : oldEndpoints)
+ {
+ for (SourceFilter filter : sourceFilters)
+ {
+ if (!filter.apply(r))
+ {
+
failureMessage.append(filter.message(r));
+ break;
+ }
+ }
+ }
+ throw new IllegalStateException("Necessary
replicas for strict consistency were removed by source filters: " +
failureMessage);
+ }
+ }
+ else
+ {
+ oldEndpoints =
sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
--- End diff --
We are now filtering testSourceFilters twice, once here, and once on L468
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]