[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217186512
  
--- Diff: src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---
@@ -459,36 +458,34 @@ private void writeHintsForUndeliveredEndpoints(int 
startFrom, Set

[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217185707
  
--- Diff: 
src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---
@@ -189,28 +189,30 @@ public final void expired()
 /**
  * @return the minimum number of endpoints that must reply.
  */
-protected int totalBlockFor()
+protected int blockFor()
 {
 // During bootstrap, we have to include the pending endpoints or 
we may fail the consistency level
 // guarantees (see #833)
-return 
replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), 
replicaLayout.pending());
+return replicaPlan.blockFor();
 }
 
 /**
+ * TODO: this method is brittle for its purpose of deciding when we 
should fail a query;
--- End diff --

Should this be a JIRA rather than a TODO or just get done?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217185167
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
@@ -53,15 +53,16 @@
 protected final Queue repairs = new 
ConcurrentLinkedQueue<>();
 private final int blockFor;
 
-BlockingReadRepair(ReadCommand command, P replicaPlan, long 
queryStartNanoTime)
+BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared 
replicaPlan, long queryStartNanoTime)
 {
 super(command, replicaPlan, queryStartNanoTime);
-this.blockFor = 
replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
+this.blockFor = 
replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
 }
 
 public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
 {
-return new PartitionIteratorMergeListener<>(replicaPlan, command, 
this.replicaPlan.consistencyLevel(), this);
+// TODO: why are we referencing a different replicaPlan here?
--- End diff --

OK, I think I understand, the original replica plan contacted nodes that 
never responded and weren't part of the read repair. It's a new replica plan 
that only includes the nodes that responded.. The consistency level happens to 
be unchanged across both of them. It would be less misdirecting if it fetched 
the consistency level from the plan that it is given.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217175374
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
@@ -53,15 +53,16 @@
 protected final Queue repairs = new 
ConcurrentLinkedQueue<>();
 private final int blockFor;
 
-BlockingReadRepair(ReadCommand command, P replicaPlan, long 
queryStartNanoTime)
+BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared 
replicaPlan, long queryStartNanoTime)
 {
 super(command, replicaPlan, queryStartNanoTime);
-this.blockFor = 
replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
+this.blockFor = 
replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
 }
 
 public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
 {
-return new PartitionIteratorMergeListener<>(replicaPlan, command, 
this.replicaPlan.consistencyLevel(), this);
+// TODO: why are we referencing a different replicaPlan here?
--- End diff --

I think that was a stale comment.  The merging never modified the 
replicaPlan, and it should never be modified during the SRP.  Logically, it 
should be a snapshot of only the relevant replicas for the SRP.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217112723
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -18,364 +18,296 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
 
 /**
- * Encapsulates knowledge about the ring necessary for performing a 
specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
  *
- * Constitutes:
- *  - the 'natural' replicas replicating the range or token relevant for 
the operation
- *  - if for performing a write, any 'pending' replicas that are taking 
ownership of the range, and must receive updates
- *  - the 'selected' replicas, those that should be targeted for any 
operation
- *  - 'all' replicas represents natural+pending
- *
- * @param  the type of Endpoints this ReplayLayout holds (either 
EndpointsForToken or EndpointsForRange)
- * @param  the type of itself, including its type parameters, for 
return type of modifying methods
+ * @param 
  */
-public abstract class ReplicaLayout, L extends 
ReplicaLayout>
+public abstract class ReplicaLayout>
 {
-private volatile E all;
-protected final E natural;
-protected final E pending;
-protected final E selected;
-
-protected final Keyspace keyspace;
-protected final ConsistencyLevel consistencyLevel;
-
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected)
-{
-this(keyspace, consistencyLevel, natural, pending, selected, null);
-}
+private final E natural;
 
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected, E all)
+ReplicaLayout(E natural)
 {
-assert selected != null;
-assert pending == null || !Endpoints.haveConflicts(natural, 
pending);
-this.keyspace = keyspace;
-this.consistencyLevel = consistencyLevel;
 this.natural = natural;
-this.pending = pending;
-this.selected = selected;
-// if we logically have no pending endpoints (they are null), then 
'all' our endpoints are natural
-if (all == null && pending == null)
-all = natural;
-this.all = all;
 }
 
-public Replica getReplicaFor(InetAddressAndPort endpoint)
-{
-return natural.byEndpoint().get(endpoint);
-}
-
-public E natural()
+/**
+ * The 'natural' owners of the ring position(s), as implied by the 
current ring layout.
+ * This excludes any pending owners, i.e. those that are in the 
process of taking ownership of a range, but
+ * have not yet finished obtaining their view of the range.
+ */
+public final E natural()
 {
 return natural;
 }
 
-public E all()
-{
-E result = all;
-if (result == null)
-all = result = Endpoints.concat(natural, pending);
-return result;
-}
-
-public E selected()
-{
-return selected;
-}
-
 /**
- * @return the pending replicas - will be null for read layouts
- * TODO: ideally we would enforce at compile time that read layouts 
have no pending to access
+ * All relevant owners of the ring position(s) 

[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216916456
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -194,14 +198,97 @@ public Token token()
 
 public static ReplicaLayout.ForTokenWrite 
forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
 {
-if (Endpoints.haveConflicts(natural, pending))
+if (haveWriteConflicts(natural, pending))
 {
-natural = Endpoints.resolveConflictsInNatural(natural, 
pending);
-pending = Endpoints.resolveConflictsInPending(natural, 
pending);
+natural = resolveWriteConflictsInNatural(natural, pending);
+pending = resolveWriteConflictsInPending(natural, pending);
 }
 return new ReplicaLayout.ForTokenWrite(natural, pending);
 }
 
+/**
+ * Detect if we have any endpoint in both pending and full; this can 
occur either due to races (there is no isolation)
+ * or because an endpoint is transitioning between full and transient 
replication status.
+ *
+ * We essentially always prefer the full version for writes, because 
this is stricter.
+ *
+ * For transient->full transitions:
+ *
+ *   Since we always write to any pending transient replica, 
effectively upgrading it to full for the transition duration,
+ *   it might at first seem to be OK to continue treating the conflict 
replica as its 'natural' transient form,
+ *   as there is always a quorum of nodes receiving the write.  
However, ring ownership changes are not atomic or
+ *   consistent across the cluster, and it is possible for writers to 
see different ring states.
+ *
+ *   Furthermore, an operator would expect that the full node has 
received all writes, with no extra need for repair
+ *   (as the normal contract dictates) when it completes its 
transition.
+ *
+ *   While we cannot completely eliminate risks due to ring 
inconsistencies, this approach is the most conservative
+ *   available to us today to mitigate, and (we think) the easiest to 
reason about.
+ *
+ * For full->transient transitions:
+ *
+ *   In this case, things are dicier, because in theory we can trigger 
this change instantly.  All we need to do is
+ *   drop some data, surely?
+ *
+ *   Ring movements can put is in a pickle; any other node could 
believe us to be full when we have become transient,
+ *   and perform a full data request to us that we believe ourselves 
capable of answering, but that we are not.
+ *   If the ring is inconsistent, it's even feasible that a transient 
request would be made to the node that is losing
+ *   its transient status, that also does not know it has yet done so, 
resulting in all involved nodes being unaware
+ *   of the data inconsistency.
+ *
+ *   This happens because ring ownership changes are implied by a 
single node; not all owning nodes get a say in when
+ *   the transition takes effect.  As such, a node can hold an 
incorrect belief about its own ownership ranges.
+ *
+ *   This race condition is somewhat inherent in present day 
Cassandra, and there's actually a limit to what we can do about it.
+ *   It is a little more dangerous with transient replication, 
however, because we can completely answer a request without
+ *   ever touching a digest, meaning we are less likely to attempt to 
repair any inconsistency.
+ *
+ *   We aren't guaranteed to contact any different nodes for the data 
requests, of course, though we at least have a chance.
+ *
+ * Note: If we have any pending transient->full movement, we need to 
move the full replica to our 'natural' bucket
+ * to avoid corrupting our count.  This is fine for writes, all we're 
doing is ensuring we always write to the node,
+ * instead of selectively.
+ *
+ * @param natural
+ * @param pending
+ * @param 
+ * @return
+ */
+public static > boolean haveWriteConflicts(E 
natural, E pending)
+{
+Set naturalEndpoints = natural.endpoints();
+for (InetAddressAndPort pendingEndpoint : pending.endpoints())
+{
+if (naturalEndpoints.contains(pendingEndpoint))
+return true;
+}
+return false;
+}
+
+/**
+ * MUST APPLY FIRST
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'natural' replica collection, that has had its conflicts 
with pending repaired
+ */
+public static > E 
resolveWriteConflictsInNatural(E 

[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216913748
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForRangeRead get() { return replicaPlan; }
+public ForRangeRead getWithContact(EndpointsForRange newContact) { 
return replicaPlan.withContact(newContact); }
--- End diff --

This is a getter for the case where we don't want to update the reference.  
I will remove it to avoid any ambiguity.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216913492
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
--- End diff --

It does; in general the non-pluralised form of contact doesn't work well 
with the method names.  Perhaps I should just pluralise and be done with it?  
Not sure why I was averse to this.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216912914
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
@@ -184,30 +185,28 @@ public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand co
 ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
 SpeculativeRetryPolicy retry = 
cfs.metadata().params.speculativeRetry;
 
-// Endpoints for Token
-ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
+ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
 
 // Speculative retry is disabled *OR*
 // 11980: Disable speculative retry if using EACH_QUORUM in order 
to prevent miscounting DC responses
 if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-// TODO Looks like we might want to move speculation into the 
replica layout, but that might be a story for post-4.0
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, false);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, false);
 
 // There are simply no extra replicas to speculate.
 // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
-if (replicaLayout.selected().size() == replicaLayout.all().size())
+if (replicaPlan.contact().size() == 
replicaPlan.candidates().size())
 {
 boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, recordFailedSpeculation);
 }
 
 // If CL.ALL, upgrade to AlwaysSpeculating;
--- End diff --

No idea.  I don't understand either, but as you say, it looks like the 
comment is anyway stale.  I'll remove it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-11 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216846177
  
--- Diff: src/java/org/apache/cassandra/service/reads/DigestResolver.java 
---
@@ -93,16 +93,14 @@ public PartitionIterator getData()
 {
 // This path can be triggered only if we've got responses from 
full replicas and they match, but
 // transient replica response still contains data, which needs 
to be reconciled.
-DataResolver dataResolver = new DataResolver<>(command,
- 
replicaLayout,
- 
(ReadRepair) NoopReadRepair.instance,
- 
queryStartNanoTime);
+DataResolver dataResolver
+= new DataResolver<>(command, replicaPlan, 
(ReadRepair) NoopReadRepair.instance, queryStartNanoTime);
 
 dataResolver.preprocess(dataResponse);
 // Forward differences to all full nodes
--- End diff --

This comment is wrong now right? We don't forward we just merge.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-11 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216839700
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
@@ -184,30 +185,28 @@ public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand co
 ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
 SpeculativeRetryPolicy retry = 
cfs.metadata().params.speculativeRetry;
 
-// Endpoints for Token
-ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
+ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
 
 // Speculative retry is disabled *OR*
 // 11980: Disable speculative retry if using EACH_QUORUM in order 
to prevent miscounting DC responses
 if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-// TODO Looks like we might want to move speculation into the 
replica layout, but that might be a story for post-4.0
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, false);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, false);
 
 // There are simply no extra replicas to speculate.
 // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
-if (replicaLayout.selected().size() == replicaLayout.all().size())
+if (replicaPlan.contact().size() == 
replicaPlan.candidates().size())
 {
 boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, recordFailedSpeculation);
 }
 
 // If CL.ALL, upgrade to AlwaysSpeculating;
--- End diff --

This doesn't seem to have anything to do with CL.ALL anymore. And I don't 
get why CL.ALL benefits from always speculating since it fails if it doesn't 
get the data response anyways?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216486027
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -194,14 +198,97 @@ public Token token()
 
 public static ReplicaLayout.ForTokenWrite 
forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
 {
-if (Endpoints.haveConflicts(natural, pending))
+if (haveWriteConflicts(natural, pending))
 {
-natural = Endpoints.resolveConflictsInNatural(natural, 
pending);
-pending = Endpoints.resolveConflictsInPending(natural, 
pending);
+natural = resolveWriteConflictsInNatural(natural, pending);
+pending = resolveWriteConflictsInPending(natural, pending);
 }
 return new ReplicaLayout.ForTokenWrite(natural, pending);
 }
 
+/**
+ * Detect if we have any endpoint in both pending and full; this can 
occur either due to races (there is no isolation)
+ * or because an endpoint is transitioning between full and transient 
replication status.
+ *
+ * We essentially always prefer the full version for writes, because 
this is stricter.
+ *
+ * For transient->full transitions:
+ *
+ *   Since we always write to any pending transient replica, 
effectively upgrading it to full for the transition duration,
+ *   it might at first seem to be OK to continue treating the conflict 
replica as its 'natural' transient form,
+ *   as there is always a quorum of nodes receiving the write.  
However, ring ownership changes are not atomic or
+ *   consistent across the cluster, and it is possible for writers to 
see different ring states.
+ *
+ *   Furthermore, an operator would expect that the full node has 
received all writes, with no extra need for repair
+ *   (as the normal contract dictates) when it completes its 
transition.
+ *
+ *   While we cannot completely eliminate risks due to ring 
inconsistencies, this approach is the most conservative
+ *   available to us today to mitigate, and (we think) the easiest to 
reason about.
+ *
+ * For full->transient transitions:
+ *
+ *   In this case, things are dicier, because in theory we can trigger 
this change instantly.  All we need to do is
+ *   drop some data, surely?
+ *
+ *   Ring movements can put is in a pickle; any other node could 
believe us to be full when we have become transient,
+ *   and perform a full data request to us that we believe ourselves 
capable of answering, but that we are not.
+ *   If the ring is inconsistent, it's even feasible that a transient 
request would be made to the node that is losing
+ *   its transient status, that also does not know it has yet done so, 
resulting in all involved nodes being unaware
+ *   of the data inconsistency.
+ *
+ *   This happens because ring ownership changes are implied by a 
single node; not all owning nodes get a say in when
+ *   the transition takes effect.  As such, a node can hold an 
incorrect belief about its own ownership ranges.
+ *
+ *   This race condition is somewhat inherent in present day 
Cassandra, and there's actually a limit to what we can do about it.
+ *   It is a little more dangerous with transient replication, 
however, because we can completely answer a request without
+ *   ever touching a digest, meaning we are less likely to attempt to 
repair any inconsistency.
+ *
+ *   We aren't guaranteed to contact any different nodes for the data 
requests, of course, though we at least have a chance.
+ *
+ * Note: If we have any pending transient->full movement, we need to 
move the full replica to our 'natural' bucket
+ * to avoid corrupting our count.  This is fine for writes, all we're 
doing is ensuring we always write to the node,
+ * instead of selectively.
+ *
+ * @param natural
+ * @param pending
+ * @param 
+ * @return
+ */
+public static > boolean haveWriteConflicts(E 
natural, E pending)
+{
+Set naturalEndpoints = natural.endpoints();
+for (InetAddressAndPort pendingEndpoint : pending.endpoints())
+{
+if (naturalEndpoints.contains(pendingEndpoint))
+return true;
+}
+return false;
+}
+
+/**
+ * MUST APPLY FIRST
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'natural' replica collection, that has had its conflicts 
with pending repaired
+ */
+public static > E 
resolveWriteConflictsInNatural(E natural, E 

[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216473482
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
--- End diff --

addToContact or addContact, grammar seems odd.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216473356
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForRangeRead get() { return replicaPlan; }
+public ForRangeRead getWithContact(EndpointsForRange newContact) { 
return replicaPlan.withContact(newContact); }
--- End diff --

Wait, so it's not shared now? We made a new one without updating the 
reference? 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216451203
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -18,364 +18,296 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
 
 /**
- * Encapsulates knowledge about the ring necessary for performing a 
specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
  *
- * Constitutes:
- *  - the 'natural' replicas replicating the range or token relevant for 
the operation
- *  - if for performing a write, any 'pending' replicas that are taking 
ownership of the range, and must receive updates
- *  - the 'selected' replicas, those that should be targeted for any 
operation
- *  - 'all' replicas represents natural+pending
- *
- * @param  the type of Endpoints this ReplayLayout holds (either 
EndpointsForToken or EndpointsForRange)
- * @param  the type of itself, including its type parameters, for 
return type of modifying methods
+ * @param 
  */
-public abstract class ReplicaLayout, L extends 
ReplicaLayout>
+public abstract class ReplicaLayout>
 {
-private volatile E all;
-protected final E natural;
-protected final E pending;
-protected final E selected;
-
-protected final Keyspace keyspace;
-protected final ConsistencyLevel consistencyLevel;
-
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected)
-{
-this(keyspace, consistencyLevel, natural, pending, selected, null);
-}
+private final E natural;
 
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected, E all)
+ReplicaLayout(E natural)
 {
-assert selected != null;
-assert pending == null || !Endpoints.haveConflicts(natural, 
pending);
-this.keyspace = keyspace;
-this.consistencyLevel = consistencyLevel;
 this.natural = natural;
-this.pending = pending;
-this.selected = selected;
-// if we logically have no pending endpoints (they are null), then 
'all' our endpoints are natural
-if (all == null && pending == null)
-all = natural;
-this.all = all;
 }
 
-public Replica getReplicaFor(InetAddressAndPort endpoint)
-{
-return natural.byEndpoint().get(endpoint);
-}
-
-public E natural()
+/**
+ * The 'natural' owners of the ring position(s), as implied by the 
current ring layout.
+ * This excludes any pending owners, i.e. those that are in the 
process of taking ownership of a range, but
+ * have not yet finished obtaining their view of the range.
+ */
+public final E natural()
 {
 return natural;
 }
 
-public E all()
-{
-E result = all;
-if (result == null)
-all = result = Endpoints.concat(natural, pending);
-return result;
-}
-
-public E selected()
-{
-return selected;
-}
-
 /**
- * @return the pending replicas - will be null for read layouts
- * TODO: ideally we would enforce at compile time that read layouts 
have no pending to access
+ * All relevant owners of the ring position(s) for 

[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216442898
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
@@ -53,15 +53,16 @@
 protected final Queue repairs = new 
ConcurrentLinkedQueue<>();
 private final int blockFor;
 
-BlockingReadRepair(ReadCommand command, P replicaPlan, long 
queryStartNanoTime)
+BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared 
replicaPlan, long queryStartNanoTime)
 {
 super(command, replicaPlan, queryStartNanoTime);
-this.blockFor = 
replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
+this.blockFor = 
replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
 }
 
 public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
 {
-return new PartitionIteratorMergeListener<>(replicaPlan, command, 
this.replicaPlan.consistencyLevel(), this);
+// TODO: why are we referencing a different replicaPlan here?
--- End diff --

TODO indeed looking at DataResolver it's modifying the replica plan without 
updating the shared one 
https://github.com/apache/cassandra/pull/265/files#diff-7e5dd130632299911e49b12afe86c85aR121
So they would be different?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216434289
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
@@ -196,7 +196,7 @@ public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand co
 
 // There are simply no extra replicas to speculate.
 // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
-if (replicaPlan.contact().size() >= 
replicaPlan.liveOnly().all().size())
+if (replicaPlan.contact().size() == 
replicaPlan.candidates().size())
--- End diff --

So >= is not necessary because contact should be a subset of candidates (or 
it should be an unavailable)? Is it more robust to use >= in case that doesn't 
hold due to a mistake or have an assertion for that?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216432759
  
--- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
---
@@ -182,7 +160,17 @@ private void assertSubList(C subCollection, int from, 
int to)
 }
 }
 
-void testSubList(int subListDepth, int filterDepth, int sortDepth, 
int selectDepth)
+private void assertSubSequence(Iterable subSequence, int 
from, int to)
--- End diff --

Unused and also kind of weird (elementsEqual twice?)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org