[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217186512 --- Diff: src/java/org/apache/cassandra/batchlog/BatchlogManager.java --- @@ -459,36 +458,34 @@ private void writeHintsForUndeliveredEndpoints(int startFrom, Set
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217185707 --- Diff: src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java --- @@ -189,28 +189,30 @@ public final void expired() /** * @return the minimum number of endpoints that must reply. */ -protected int totalBlockFor() +protected int blockFor() { // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) -return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending()); +return replicaPlan.blockFor(); } /** + * TODO: this method is brittle for its purpose of deciding when we should fail a query; --- End diff -- Should this be a JIRA rather than a TODO or just get done? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217185167 --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java --- @@ -53,15 +53,16 @@ protected final Queue repairs = new ConcurrentLinkedQueue<>(); private final int blockFor; -BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime) +BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { super(command, replicaPlan, queryStartNanoTime); -this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace); +this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { -return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this); +// TODO: why are we referencing a different replicaPlan here? --- End diff -- OK, I think I understand, the original replica plan contacted nodes that never responded and weren't part of the read repair. It's a new replica plan that only includes the nodes that responded.. The consistency level happens to be unchanged across both of them. It would be less misdirecting if it fetched the consistency level from the plan that it is given. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217175374 --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java --- @@ -53,15 +53,16 @@ protected final Queue repairs = new ConcurrentLinkedQueue<>(); private final int blockFor; -BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime) +BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { super(command, replicaPlan, queryStartNanoTime); -this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace); +this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { -return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this); +// TODO: why are we referencing a different replicaPlan here? --- End diff -- I think that was a stale comment. The merging never modified the replicaPlan, and it should never be modified during the SRP. Logically, it should be a snapshot of only the relevant replicas for the SRP. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217112723 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -18,364 +18,296 @@ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.utils.FBUtilities; -import static com.google.common.collect.Iterables.any; +import java.util.Set; +import java.util.function.Predicate; /** - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors - * for building the relevant layout. + * The relevant replicas for an operation over a given range or token. * - * Constitutes: - * - the 'natural' replicas replicating the range or token relevant for the operation - * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates - * - the 'selected' replicas, those that should be targeted for any operation - * - 'all' replicas represents natural+pending - * - * @param the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange) - * @param the type of itself, including its type parameters, for return type of modifying methods + * @param */ -public abstract class ReplicaLayout, L extends ReplicaLayout> +public abstract class ReplicaLayout> { -private volatile E all; -protected final E natural; -protected final E pending; -protected final E selected; - -protected final Keyspace keyspace; -protected final ConsistencyLevel consistencyLevel; - -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected) -{ -this(keyspace, consistencyLevel, natural, pending, selected, null); -} +private final E natural; -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all) +ReplicaLayout(E natural) { -assert selected != null; -assert pending == null || !Endpoints.haveConflicts(natural, pending); -this.keyspace = keyspace; -this.consistencyLevel = consistencyLevel; this.natural = natural; -this.pending = pending; -this.selected = selected; -// if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural -if (all == null && pending == null) -all = natural; -this.all = all; } -public Replica getReplicaFor(InetAddressAndPort endpoint) -{ -return natural.byEndpoint().get(endpoint); -} - -public E natural() +/** + * The 'natural' owners of the ring position(s), as implied by the current ring layout. + * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but + * have not yet finished obtaining their view of the range. + */ +public final E natural() { return natural; } -public E all() -{ -E result = all; -if (result == null) -all = result = Endpoints.concat(natural, pending); -return result; -} - -public E selected() -{ -return selected; -} - /** - * @return the pending replicas - will be null for read layouts - * TODO: ideally we would enforce at compile time that read layouts have no pending to access + * All relevant owners of the ring position(s)
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216916456 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -194,14 +198,97 @@ public Token token() public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending) { -if (Endpoints.haveConflicts(natural, pending)) +if (haveWriteConflicts(natural, pending)) { -natural = Endpoints.resolveConflictsInNatural(natural, pending); -pending = Endpoints.resolveConflictsInPending(natural, pending); +natural = resolveWriteConflictsInNatural(natural, pending); +pending = resolveWriteConflictsInPending(natural, pending); } return new ReplicaLayout.ForTokenWrite(natural, pending); } +/** + * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation) + * or because an endpoint is transitioning between full and transient replication status. + * + * We essentially always prefer the full version for writes, because this is stricter. + * + * For transient->full transitions: + * + * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration, + * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form, + * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or + * consistent across the cluster, and it is possible for writers to see different ring states. + * + * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair + * (as the normal contract dictates) when it completes its transition. + * + * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative + * available to us today to mitigate, and (we think) the easiest to reason about. + * + * For full->transient transitions: + * + * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is + * drop some data, surely? + * + * Ring movements can put is in a pickle; any other node could believe us to be full when we have become transient, + * and perform a full data request to us that we believe ourselves capable of answering, but that we are not. + * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing + * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware + * of the data inconsistency. + * + * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when + * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges. + * + * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it. + * It is a little more dangerous with transient replication, however, because we can completely answer a request without + * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency. + * + * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance. + * + * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket + * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node, + * instead of selectively. + * + * @param natural + * @param pending + * @param + * @return + */ +public static > boolean haveWriteConflicts(E natural, E pending) +{ +Set naturalEndpoints = natural.endpoints(); +for (InetAddressAndPort pendingEndpoint : pending.endpoints()) +{ +if (naturalEndpoints.contains(pendingEndpoint)) +return true; +} +return false; +} + +/** + * MUST APPLY FIRST + * See {@link ReplicaLayout#haveWriteConflicts} + * @return a 'natural' replica collection, that has had its conflicts with pending repaired + */ +public static > E resolveWriteConflictsInNatural(E
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216913748 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForRangeRead get() { return replicaPlan; } +public ForRangeRead getWithContact(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); } --- End diff -- This is a getter for the case where we don't want to update the reference. I will remove it to avoid any ambiguity. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216913492 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } --- End diff -- It does; in general the non-pluralised form of contact doesn't work well with the method names. Perhaps I should just pluralise and be done with it? Not sure why I was averse to this. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216912914 --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java --- @@ -184,30 +185,28 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; -// Endpoints for Token -ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); +ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); // Speculative retry is disabled *OR* // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) -// TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0 -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false); // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas -if (replicaLayout.selected().size() == replicaLayout.all().size()) +if (replicaPlan.contact().size() == replicaPlan.candidates().size()) { boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL; -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation); } // If CL.ALL, upgrade to AlwaysSpeculating; --- End diff -- No idea. I don't understand either, but as you say, it looks like the comment is anyway stale. I'll remove it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216846177 --- Diff: src/java/org/apache/cassandra/service/reads/DigestResolver.java --- @@ -93,16 +93,14 @@ public PartitionIterator getData() { // This path can be triggered only if we've got responses from full replicas and they match, but // transient replica response still contains data, which needs to be reconciled. -DataResolver dataResolver = new DataResolver<>(command, - replicaLayout, - (ReadRepair) NoopReadRepair.instance, - queryStartNanoTime); +DataResolver dataResolver += new DataResolver<>(command, replicaPlan, (ReadRepair) NoopReadRepair.instance, queryStartNanoTime); dataResolver.preprocess(dataResponse); // Forward differences to all full nodes --- End diff -- This comment is wrong now right? We don't forward we just merge. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216839700 --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java --- @@ -184,30 +185,28 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; -// Endpoints for Token -ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); +ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); // Speculative retry is disabled *OR* // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) -// TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0 -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false); // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas -if (replicaLayout.selected().size() == replicaLayout.all().size()) +if (replicaPlan.contact().size() == replicaPlan.candidates().size()) { boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL; -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation); } // If CL.ALL, upgrade to AlwaysSpeculating; --- End diff -- This doesn't seem to have anything to do with CL.ALL anymore. And I don't get why CL.ALL benefits from always speculating since it fails if it doesn't get the data response anyways? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216486027 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -194,14 +198,97 @@ public Token token() public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending) { -if (Endpoints.haveConflicts(natural, pending)) +if (haveWriteConflicts(natural, pending)) { -natural = Endpoints.resolveConflictsInNatural(natural, pending); -pending = Endpoints.resolveConflictsInPending(natural, pending); +natural = resolveWriteConflictsInNatural(natural, pending); +pending = resolveWriteConflictsInPending(natural, pending); } return new ReplicaLayout.ForTokenWrite(natural, pending); } +/** + * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation) + * or because an endpoint is transitioning between full and transient replication status. + * + * We essentially always prefer the full version for writes, because this is stricter. + * + * For transient->full transitions: + * + * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration, + * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form, + * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or + * consistent across the cluster, and it is possible for writers to see different ring states. + * + * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair + * (as the normal contract dictates) when it completes its transition. + * + * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative + * available to us today to mitigate, and (we think) the easiest to reason about. + * + * For full->transient transitions: + * + * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is + * drop some data, surely? + * + * Ring movements can put is in a pickle; any other node could believe us to be full when we have become transient, + * and perform a full data request to us that we believe ourselves capable of answering, but that we are not. + * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing + * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware + * of the data inconsistency. + * + * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when + * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges. + * + * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it. + * It is a little more dangerous with transient replication, however, because we can completely answer a request without + * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency. + * + * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance. + * + * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket + * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node, + * instead of selectively. + * + * @param natural + * @param pending + * @param + * @return + */ +public static > boolean haveWriteConflicts(E natural, E pending) +{ +Set naturalEndpoints = natural.endpoints(); +for (InetAddressAndPort pendingEndpoint : pending.endpoints()) +{ +if (naturalEndpoints.contains(pendingEndpoint)) +return true; +} +return false; +} + +/** + * MUST APPLY FIRST + * See {@link ReplicaLayout#haveWriteConflicts} + * @return a 'natural' replica collection, that has had its conflicts with pending repaired + */ +public static > E resolveWriteConflictsInNatural(E natural, E
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216473482 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } --- End diff -- addToContact or addContact, grammar seems odd. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216473356 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForRangeRead get() { return replicaPlan; } +public ForRangeRead getWithContact(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); } --- End diff -- Wait, so it's not shared now? We made a new one without updating the reference? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216451203 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -18,364 +18,296 @@ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.utils.FBUtilities; -import static com.google.common.collect.Iterables.any; +import java.util.Set; +import java.util.function.Predicate; /** - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors - * for building the relevant layout. + * The relevant replicas for an operation over a given range or token. * - * Constitutes: - * - the 'natural' replicas replicating the range or token relevant for the operation - * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates - * - the 'selected' replicas, those that should be targeted for any operation - * - 'all' replicas represents natural+pending - * - * @param the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange) - * @param the type of itself, including its type parameters, for return type of modifying methods + * @param */ -public abstract class ReplicaLayout, L extends ReplicaLayout> +public abstract class ReplicaLayout> { -private volatile E all; -protected final E natural; -protected final E pending; -protected final E selected; - -protected final Keyspace keyspace; -protected final ConsistencyLevel consistencyLevel; - -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected) -{ -this(keyspace, consistencyLevel, natural, pending, selected, null); -} +private final E natural; -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all) +ReplicaLayout(E natural) { -assert selected != null; -assert pending == null || !Endpoints.haveConflicts(natural, pending); -this.keyspace = keyspace; -this.consistencyLevel = consistencyLevel; this.natural = natural; -this.pending = pending; -this.selected = selected; -// if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural -if (all == null && pending == null) -all = natural; -this.all = all; } -public Replica getReplicaFor(InetAddressAndPort endpoint) -{ -return natural.byEndpoint().get(endpoint); -} - -public E natural() +/** + * The 'natural' owners of the ring position(s), as implied by the current ring layout. + * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but + * have not yet finished obtaining their view of the range. + */ +public final E natural() { return natural; } -public E all() -{ -E result = all; -if (result == null) -all = result = Endpoints.concat(natural, pending); -return result; -} - -public E selected() -{ -return selected; -} - /** - * @return the pending replicas - will be null for read layouts - * TODO: ideally we would enforce at compile time that read layouts have no pending to access + * All relevant owners of the ring position(s) for
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216442898 --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java --- @@ -53,15 +53,16 @@ protected final Queue repairs = new ConcurrentLinkedQueue<>(); private final int blockFor; -BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime) +BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { super(command, replicaPlan, queryStartNanoTime); -this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace); +this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { -return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this); +// TODO: why are we referencing a different replicaPlan here? --- End diff -- TODO indeed looking at DataResolver it's modifying the replica plan without updating the shared one https://github.com/apache/cassandra/pull/265/files#diff-7e5dd130632299911e49b12afe86c85aR121 So they would be different? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216434289 --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java --- @@ -196,7 +196,7 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas -if (replicaPlan.contact().size() >= replicaPlan.liveOnly().all().size()) +if (replicaPlan.contact().size() == replicaPlan.candidates().size()) --- End diff -- So >= is not necessary because contact should be a subset of candidates (or it should be an unavailable)? Is it more robust to use >= in case that doesn't hold due to a mistake or have an assertion for that? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216432759 --- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java --- @@ -182,7 +160,17 @@ private void assertSubList(C subCollection, int from, int to) } } -void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth) +private void assertSubSequence(Iterable subSequence, int from, int to) --- End diff -- Unused and also kind of weird (elementsEqual twice?) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org