[GitHub] cassandra pull request #239: Optimize Streaming

2018-07-10 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r201463505
  
--- Diff: 
src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -249,4 +253,50 @@ public ByteBufAllocator getAllocator()
 {
 return channelConfig.getAllocator();
 }
+
+/**
+ * Consumes bytes in the stream until the given length
+ *
+ * @param writer
+ * @param len
+ * @return
+ * @throws IOException
+ */
+public long consumeUntil(BufferedDataOutputStreamPlus writer, long 
len) throws IOException
+{
+long copied = 0; // number of bytes copied
+while (copied < len)
+{
+int position = buffer.position();
+int remaining = buffer.remaining();
+if (remaining == 0)
+{
+try
+{
+reBuffer();
+} catch (EOFException e)
+{
+throw new EOFException("EOF after " + copied + " bytes 
out of " + len);
+}
+position = buffer.position();
+remaining = buffer.remaining();
+if (remaining == 0)
--- End diff --

Even you've done it!  :)


https://github.com/aweisberg/cassandra/commit/1491a40b7b4ea2723bcf22d870ee514b47ea901b#diff-baae9b53db48ff1ed346e469f7506545R192


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: Optimize Streaming

2018-07-10 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r201461568
  
--- Diff: 
src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -249,4 +253,50 @@ public ByteBufAllocator getAllocator()
 {
 return channelConfig.getAllocator();
 }
+
+/**
+ * Consumes bytes in the stream until the given length
+ *
+ * @param writer
+ * @param len
+ * @return
+ * @throws IOException
+ */
+public long consumeUntil(BufferedDataOutputStreamPlus writer, long 
len) throws IOException
+{
+long copied = 0; // number of bytes copied
+while (copied < len)
+{
+int position = buffer.position();
+int remaining = buffer.remaining();
+if (remaining == 0)
+{
+try
+{
+reBuffer();
+} catch (EOFException e)
+{
+throw new EOFException("EOF after " + copied + " bytes 
out of " + len);
+}
+position = buffer.position();
+remaining = buffer.remaining();
+if (remaining == 0)
--- End diff --

I think we need to raise this on the mailing list to possibly modify the 
code style, because for a long time it has been standard practice for many 
contributors (myself included) to elide the braces for single line statements, 
because otherwise the code becomes illegible.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216913492
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
--- End diff --

It does; in general the non-pluralised form of contact doesn't work well 
with the method names.  Perhaps I should just pluralise and be done with it?  
Not sure why I was averse to this.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216913748
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
@@ -249,12 +229,32 @@ public int requiredParticipants()
  * we progressively modify via various forms of speculation (initial 
speculation, rr-read and rr-write)
  * @param 
  */
-public static class Shared>
+public interface Shared, P extends 
ReplicaPlan>
 {
-private P replicaPlan;
-public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
-public void set(P newReplicaPlan) { this.replicaPlan = 
newReplicaPlan; }
-public P get() { return replicaPlan; }
+public void addToContact(Replica replica);
+public P get();
+public abstract P getWithContact(E endpoints);
 }
 
+public static class SharedForTokenRead implements 
Shared
+{
+private ForTokenRead replicaPlan;
+public SharedForTokenRead(ForTokenRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForTokenRead get() { return replicaPlan; }
+public ForTokenRead getWithContact(EndpointsForToken newContact) { 
return replicaPlan.withContact(newContact); }
+}
+
+public static class SharedForRangeRead implements 
Shared
+{
+private ForRangeRead replicaPlan;
+public SharedForRangeRead(ForRangeRead replicaPlan) { 
this.replicaPlan = replicaPlan; }
+public void addToContact(Replica replica) { replicaPlan = 
replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
+public ForRangeRead get() { return replicaPlan; }
+public ForRangeRead getWithContact(EndpointsForRange newContact) { 
return replicaPlan.withContact(newContact); }
--- End diff --

This is a getter for the case where we don't want to update the reference.  
I will remove it to avoid any ambiguity.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216916456
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -194,14 +198,97 @@ public Token token()
 
 public static ReplicaLayout.ForTokenWrite 
forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
 {
-if (Endpoints.haveConflicts(natural, pending))
+if (haveWriteConflicts(natural, pending))
 {
-natural = Endpoints.resolveConflictsInNatural(natural, 
pending);
-pending = Endpoints.resolveConflictsInPending(natural, 
pending);
+natural = resolveWriteConflictsInNatural(natural, pending);
+pending = resolveWriteConflictsInPending(natural, pending);
 }
 return new ReplicaLayout.ForTokenWrite(natural, pending);
 }
 
+/**
+ * Detect if we have any endpoint in both pending and full; this can 
occur either due to races (there is no isolation)
+ * or because an endpoint is transitioning between full and transient 
replication status.
+ *
+ * We essentially always prefer the full version for writes, because 
this is stricter.
+ *
+ * For transient->full transitions:
+ *
+ *   Since we always write to any pending transient replica, 
effectively upgrading it to full for the transition duration,
+ *   it might at first seem to be OK to continue treating the conflict 
replica as its 'natural' transient form,
+ *   as there is always a quorum of nodes receiving the write.  
However, ring ownership changes are not atomic or
+ *   consistent across the cluster, and it is possible for writers to 
see different ring states.
+ *
+ *   Furthermore, an operator would expect that the full node has 
received all writes, with no extra need for repair
+ *   (as the normal contract dictates) when it completes its 
transition.
+ *
+ *   While we cannot completely eliminate risks due to ring 
inconsistencies, this approach is the most conservative
+ *   available to us today to mitigate, and (we think) the easiest to 
reason about.
+ *
+ * For full->transient transitions:
+ *
+ *   In this case, things are dicier, because in theory we can trigger 
this change instantly.  All we need to do is
+ *   drop some data, surely?
+ *
+ *   Ring movements can put is in a pickle; any other node could 
believe us to be full when we have become transient,
+ *   and perform a full data request to us that we believe ourselves 
capable of answering, but that we are not.
+ *   If the ring is inconsistent, it's even feasible that a transient 
request would be made to the node that is losing
+ *   its transient status, that also does not know it has yet done so, 
resulting in all involved nodes being unaware
+ *   of the data inconsistency.
+ *
+ *   This happens because ring ownership changes are implied by a 
single node; not all owning nodes get a say in when
+ *   the transition takes effect.  As such, a node can hold an 
incorrect belief about its own ownership ranges.
+ *
+ *   This race condition is somewhat inherent in present day 
Cassandra, and there's actually a limit to what we can do about it.
+ *   It is a little more dangerous with transient replication, 
however, because we can completely answer a request without
+ *   ever touching a digest, meaning we are less likely to attempt to 
repair any inconsistency.
+ *
+ *   We aren't guaranteed to contact any different nodes for the data 
requests, of course, though we at least have a chance.
+ *
+ * Note: If we have any pending transient->full movement, we need to 
move the full replica to our 'natural' bucket
+ * to avoid corrupting our count.  This is fine for writes, all we're 
doing is ensuring we always write to the node,
+ * instead of selectively.
+ *
+ * @param natural
+ * @param pending
+ * @param 
+ * @return
+ */
+public static > boolean haveWriteConflicts(E 
natural, E pending)
+{
+Set naturalEndpoints = natural.endpoints();
+for (InetAddressAndPort pendingEndpoint : pending.endpoints())
+{
+if (naturalEndpoints.contains(pendingEndpoint))
+return true;
+}
+return false;
+}
+
+/**
+ * MUST APPLY FIRST
+ * See {@link ReplicaLayout#haveWriteConflicts}
+ * @return a 'natural' replica collection, that has had its conflicts 
with pending repaired
+ */
+public static > E 
resolveWriteConflictsInNa

[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r216912914
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
@@ -184,30 +185,28 @@ public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand co
 ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
 SpeculativeRetryPolicy retry = 
cfs.metadata().params.speculativeRetry;
 
-// Endpoints for Token
-ReplicaLayout.ForToken replicaLayout = 
ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
+ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), 
consistencyLevel, retry);
 
 // Speculative retry is disabled *OR*
 // 11980: Disable speculative retry if using EACH_QUORUM in order 
to prevent miscounting DC responses
 if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-// TODO Looks like we might want to move speculation into the 
replica layout, but that might be a story for post-4.0
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, false);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, false);
 
 // There are simply no extra replicas to speculate.
 // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
-if (replicaLayout.selected().size() == replicaLayout.all().size())
+if (replicaPlan.contact().size() == 
replicaPlan.candidates().size())
 {
 boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
-return new NeverSpeculatingReadExecutor(cfs, command, 
replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+return new NeverSpeculatingReadExecutor(cfs, command, 
replicaPlan, queryStartNanoTime, recordFailedSpeculation);
 }
 
 // If CL.ALL, upgrade to AlwaysSpeculating;
--- End diff --

No idea.  I don't understand either, but as you say, it looks like the 
comment is anyway stale.  I'll remove it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #288: In BufferPool, make allocating thread receive a...

2018-10-26 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/288#discussion_r228448169
  
--- Diff: src/java/org/apache/cassandra/utils/memory/BufferPool.java ---
@@ -237,23 +237,25 @@ void check()
 /** Return a chunk, the caller will take owership of the parent 
chunk. */
 public Chunk get()
 {
-while (true)
-{
-Chunk chunk = chunks.poll();
-if (chunk != null)
-return chunk;
+Chunk chunk = chunks.poll();
+if (chunk != null)
+return chunk;
 
-if (!allocateMoreChunks())
-// give it one last attempt, in case someone else 
allocated before us
-return chunks.poll();
-}
+chunk = allocateMoreChunks();
+if (chunk != null)
+return chunk;
+
+/*  another thread may have just allocated last macro chunk, so
+**  make one final attempt before returning null
--- End diff --

Super minor nit, with way more verbiage than necessary to follow (on my 
part), but this is the first time I've seen this style of code comment in the 
codebase. 

The norm is usually to use one of the two basic forms - short info comments 
just using // for each line, and larger explanatory comments using the standard:
 
/** (this line typically left blank, for balance with terminating line, but 
not essential)
 *
 *
 */

I'm sure we don't stick to this everywhere, and it's not super important 
which style of code comment you use, but it would be preferable to stick to one 
of the two most standard formats.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-10-01 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r221526704
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaMultimap.java ---
@@ -39,21 +39,28 @@
 public abstract C get(K key);
 public C getIfPresent(K key) { return map.get(key); }
 
-public static abstract class Mutable
--- End diff --

It's necessary to be visible for testing, so I've simply added the 
annotation


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-10-01 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r221530400
  
--- Diff: src/java/org/apache/cassandra/locator/EndpointsByRange.java ---
@@ -54,9 +55,13 @@ public void putAll(Range range, EndpointsForRange 
replicas, Conflict igno
 get(range).addAll(replicas, ignoreConflicts);
 }
 
-public EndpointsByRange asImmutableView()
+public EndpointsByRange build()
 {
-return new 
EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, 
EndpointsForRange.Mutable::asImmutableView)));
+Map, EndpointsForRange> map =
+Collections.unmodifiableMap(
+new HashMap<>(
--- End diff --

No, it is not.  Good point - thanks!


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219131794
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1021,18 +1021,18 @@ private static void 
syncWriteToBatchlog(Collection mutations, Collecti
 handler.get();
 }
 
-private static void 
asyncRemoveFromBatchlog(Collection endpoints, UUID uuid)
+private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite 
replicaPlan, UUID uuid)
 {
 MessageOut message = new 
MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, 
UUIDSerializer.serializer);
-for (InetAddressAndPort target : endpoints)
+for (Replica target : replicaPlan.contacts())
 {
 if (logger.isTraceEnabled())
 logger.trace("Sending batchlog remove request {} to {}", 
uuid, target);
 
-if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
-performLocally(Stage.MUTATION, 
SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+if (target.isLocal())
--- End diff --

I thought we had ended up calling this `isSelf` because `isLocal` is 
overloaded in this code (meaning, variably, is this instance (has our broadcast 
address); is the same DC as this instance; has the 'local' address).  Not 
strictly related to this patch, but perhaps we should rename to `isSelf` while 
we're here and remember?  This is consistent with the nomenclature amongst the 
TR patch, and in ReplicaCollection (`withoutSelf` and `selfIfPresent`)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219133467
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
+}
+
 ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
-
SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
 EndpointsForToken.empty(token)
 );
-ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 
? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+// Batchlog is hosted by either one node or two nodes from 
different racks.
+consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
--- End diff --

Are we sure it's OK (although consistent with prior behaviour), to only 
require CL.ONE if all nodes besides us are down?  The logic above suggests we 
only require CL.ONE if we're a single-node DC, but we could have multiple 
failing nodes in our DC, and be the only one left.  It seems like we should 
probably claim insufficient consistency in this case, though this question 
probably deserves a separate ticket for discussion.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219129563
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

should EndpointFilter be brought into ReplicaPlans?  It's not used anywhere 
else, and it seems that the two pieces of logic should be proximal


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219128835
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
--- End diff --

indentation?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219138039
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

I don't love it either, but it should live together.  Perhaps we can clean 
it up, and we can certainly at least make it a static method instead of a class.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #273: Fix overflow of 32-bit integer during compactio...

2018-09-21 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/273#discussion_r219557162
  
--- Diff: 
src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
 ---
@@ -595,12 +600,22 @@ private boolean tryCell(int cell, int point, int 
delta)
 }
 }
 
-private static int roundKey(int p, int roundSeconds)
+private static int roundKey(int point, int roundSeconds)
 {
-int d = p % roundSeconds;
+int d = point % roundSeconds;
 if (d > 0)
-return p + (roundSeconds - d);
+{
+point += roundSeconds - d;
+if (point == Cell.MAX_DELETION_TIME)
--- End diff --

This should be >


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #273: Fix overflow of 32-bit integer during compactio...

2018-09-21 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/273#discussion_r219557250
  
--- Diff: 
src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
 ---
@@ -595,12 +600,22 @@ private boolean tryCell(int cell, int point, int 
delta)
 }
 }
 
-private static int roundKey(int p, int roundSeconds)
+private static int roundKey(int point, int roundSeconds)
 {
-int d = p % roundSeconds;
+int d = point % roundSeconds;
 if (d > 0)
-return p + (roundSeconds - d);
+{
+point += roundSeconds - d;
+if (point == Cell.MAX_DELETION_TIME)
+// the rounding should not be a reason to mark point as 
unexpirable
+point =  Cell.MAX_DELETION_TIME;
--- End diff --

why assign `point =` here, but `return` below?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-28 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r221306464
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int next = begin;
+int count = 0;
+{ updateNext(); }
+void updateNext()
+{
+if (count == limit) next = end;
+

[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher

2018-11-22 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/293#discussion_r235729649
  
--- Diff: conf/cassandra.yaml ---
@@ -1003,3 +1003,9 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former 
Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of immediate flusher for replies to TCP connections. This is 
an alternate simplified flusher that does not
+# depend on any event loop scheduling. Details around why this has been 
backported from trunk: CASSANDRA-14855.
+# Default is false.
+# native_transport_use_immediate_flusher: false
--- End diff --

perhaps native_transport_flush_messages_immediately ?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217112723
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -18,364 +18,296 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
 
 /**
- * Encapsulates knowledge about the ring necessary for performing a 
specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
  *
- * Constitutes:
- *  - the 'natural' replicas replicating the range or token relevant for 
the operation
- *  - if for performing a write, any 'pending' replicas that are taking 
ownership of the range, and must receive updates
- *  - the 'selected' replicas, those that should be targeted for any 
operation
- *  - 'all' replicas represents natural+pending
- *
- * @param  the type of Endpoints this ReplayLayout holds (either 
EndpointsForToken or EndpointsForRange)
- * @param  the type of itself, including its type parameters, for 
return type of modifying methods
+ * @param 
  */
-public abstract class ReplicaLayout, L extends 
ReplicaLayout>
+public abstract class ReplicaLayout>
 {
-private volatile E all;
-protected final E natural;
-protected final E pending;
-protected final E selected;
-
-protected final Keyspace keyspace;
-protected final ConsistencyLevel consistencyLevel;
-
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected)
-{
-this(keyspace, consistencyLevel, natural, pending, selected, null);
-}
+private final E natural;
 
-private ReplicaLayout(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, E natural, E pending, E selected, E all)
+ReplicaLayout(E natural)
 {
-assert selected != null;
-assert pending == null || !Endpoints.haveConflicts(natural, 
pending);
-this.keyspace = keyspace;
-this.consistencyLevel = consistencyLevel;
 this.natural = natural;
-this.pending = pending;
-this.selected = selected;
-// if we logically have no pending endpoints (they are null), then 
'all' our endpoints are natural
-if (all == null && pending == null)
-all = natural;
-this.all = all;
 }
 
-public Replica getReplicaFor(InetAddressAndPort endpoint)
-{
-return natural.byEndpoint().get(endpoint);
-}
-
-public E natural()
+/**
+ * The 'natural' owners of the ring position(s), as implied by the 
current ring layout.
+ * This excludes any pending owners, i.e. those that are in the 
process of taking ownership of a range, but
+ * have not yet finished obtaining their view of the range.
+ */
+public final E natural()
 {
 return natural;
 }
 
-public E all()
-{
-E result = all;
-if (result == null)
-all = result = Endpoints.concat(natural, pending);
-return result;
-}
-
-public E selected()
-{
-return selected;
-}
-
 /**
- * @return the pending replicas - will be null for read layouts
- * TODO: ideally we would enforce at compile time that read layouts 
have no pending to access
+ * All relevant owners of the r

[GitHub] cassandra pull request #265: 14705

2018-09-12 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/265#discussion_r217175374
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
@@ -53,15 +53,16 @@
 protected final Queue repairs = new 
ConcurrentLinkedQueue<>();
 private final int blockFor;
 
-BlockingReadRepair(ReadCommand command, P replicaPlan, long 
queryStartNanoTime)
+BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared 
replicaPlan, long queryStartNanoTime)
 {
 super(command, replicaPlan, queryStartNanoTime);
-this.blockFor = 
replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
+this.blockFor = 
replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
 }
 
 public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicaPlan)
 {
-return new PartitionIteratorMergeListener<>(replicaPlan, command, 
this.replicaPlan.consistencyLevel(), this);
+// TODO: why are we referencing a different replicaPlan here?
--- End diff --

I think that was a stale comment.  The merging never modified the 
replicaPlan, and it should never be modified during the SRP.  Logically, it 
should be a snapshot of only the relevant replicas for the SRP.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #270: 14759

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/270#discussion_r218488201
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
@@ -275,7 +275,24 @@ public ForTokenWrite(EndpointsForToken natural, 
EndpointsForToken pending, Endpo
  */
 private static > E 
resolveWriteConflictsInNatural(E natural, E pending)
 {
-return natural.filter(r -> !r.isTransient() || 
!pending.contains(r.endpoint(), true));
+ReplicaCollection.Mutable resolved = 
natural.newMutable(natural.size());
+for (Replica replica : natural)
+{
+// always prefer the full natural replica, if there is a 
conflict
+if (!replica.isFull())
+{
+Replica conflict = 
pending.byEndpoint().get(replica.endpoint());
+if (conflict != null)
+{
+// If we have any pending transient->full movement, we 
need to move the full replica to our 'natural' bucket
+// to avoid corrupting our count
+if (conflict.isFull()) resolved.add(new 
Replica(replica.endpoint(), replica.range(), true));
--- End diff --

No, you're right, good catch.  Even if they are a different range, either 
should be acceptable for this query (by definition).  I'll push an update.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #270: 14759

2018-09-18 Thread belliottsmith
GitHub user belliottsmith opened a pull request:

https://github.com/apache/cassandra/pull/270

14759



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/belliottsmith/cassandra 14759

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #270


commit 72791132da319188e709e1177817f08c41d9d8c6
Author: Benedict Elliott Smith 
Date:   2018-09-09T22:53:07Z

Transient->Full movements mishandle consistency level upgrade

patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for 
CASSANDRA-14759

commit 4011548a6bf5fd0722f6036c0c8d88b626ad25fc
Author: Benedict Elliott Smith 
Date:   2018-09-18T15:18:37Z

circleci




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218387701
  
--- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
@@ -1288,24 +1288,25 @@ public static synchronized void 
updateAvailableRanges(String keyspace, Collectio
 /**
  * List of the streamed ranges, where transientness is encoded based 
on the source, where range was streamed from.
  */
-public static synchronized RangesAtEndpoint getAvailableRanges(String 
keyspace, IPartitioner partitioner)
+public static synchronized Pair>, Set>> 
getAvailableRanges(String keyspace, IPartitioner partitioner)
--- End diff --

Could we introduce a concrete class for this concept, of full/transient 
ranges?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218383055
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -337,165 +364,167 @@ else if (useStrictConsistency)
   boolean 
useStrictConsistency,
   TokenMetadata tmdBefore,
   TokenMetadata tmdAfter,
-  Predicate 
isAlive,
   String keyspace,
-  
Collection> sourceFilters)
-{
-EndpointsByRange rangeAddresses = 
strat.getRangeAddresses(tmdBefore);
-
-InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
-logger.debug ("Keyspace: {}", keyspace);
-logger.debug("To fetch RN: {}", fetchRanges);
-logger.debug("Fetch ranges: {}", rangeAddresses);
-
-Predicate testSourceFilters = and(sourceFilters);
-Function sorted =
-endpoints -> 
snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
-//This list of replicas is just candidates. With strict 
consistency it's going to be a narrow list.
-EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = 
new EndpointsByReplica.Mutable();
-for (Replica toFetch : fetchRanges)
-{
-//Replica that is sufficient to provide the data we need
-//With strict consistency and transient replication we may end 
up with multiple types
-//so this isn't used with strict consistency
-Predicate isSufficient = r -> (toFetch.isTransient() 
|| r.isFull());
-Predicate accept = r ->
-   isSufficient.test(r) // is 
sufficient
-&& !r.endpoint().equals(localAddress)   // is not self
-&& isAlive.test(r); // is alive
-
-logger.debug("To fetch {}", toFetch);
-for (Range range : rangeAddresses.keySet())
-{
-if (range.contains(toFetch.range()))
-{
-EndpointsForRange oldEndpoints = 
rangeAddresses.get(range);
-
-//Ultimately we populate this with whatever is going 
to be fetched from to satisfy toFetch
-//It could be multiple endpoints and we must fetch 
from all of them if they are there
-//With transient replication and strict consistency 
this is to get the full data from a full replica and
-//transient data from the transient replica losing data
-EndpointsForRange sources;
-if (useStrictConsistency)
-{
-//Start with two sets of who replicates the range 
before and who replicates it after
-EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
-logger.debug("Old endpoints {}", oldEndpoints);
-logger.debug("New endpoints {}", newEndpoints);
-
-//Due to CASSANDRA-5953 we can have a higher RF 
then we have endpoints.
-//So we need to be careful to only be strict when 
endpoints == RF
-if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
-{
-Set 
endpointsStillReplicated = newEndpoints.endpoints();
-// Remove new endpoints from old endpoints 
based on address
-oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
-
-if (!all(oldEndpoints, isAlive))
-throw new IllegalStateException("A node 
required to move the data consistently is down: "
-+ 
oldEndpoints.filter(not(isAlive)));
-
-if (oldEndpoints.size() > 1)
-throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
-
-//If we are transitioning from transient to 
full and and the set of replicas for the range is not changing
-//we might end up with no endpoints to fetch 
from by address.

[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218382891
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -87,8 +85,8 @@
 private final InetAddressAndPort address;
 /* streaming description */
 private final String description;
-private final Multimap> toFetch = HashMultimap.create();
-private final Set> sourceFilters = new HashSet<>();
+private final Map> 
toFetch = new HashMap<>();
+private final Set sourceFilters = new HashSet<>();
--- End diff --

Does this need to be a Set?  (I realise this is old, but it doesn't make 
much sense AFAICT)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218388108
  
--- Diff: src/java/org/apache/cassandra/dht/StreamStateStore.java ---
@@ -54,8 +56,10 @@ public RangesAtEndpoint getAvailableRanges(String 
keyspace, IPartitioner partiti
 @VisibleForTesting
 public boolean isDataAvailable(String keyspace, Token token)
 {
-RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, 
token.getPartitioner());
-return availableRanges.ranges().stream().anyMatch(range -> 
range.contains(token));
+Pair>, Set>> availableRanges = 
getAvailableRanges(keyspace, token.getPartitioner());
+
+return Streams.concat(availableRanges.left.stream(),
+  
availableRanges.right.stream()).anyMatch(range -> range.contains(token));
--- End diff --

It would be clearer IMO to have .anyMatch indent on the newline, preferably 
inline with .concat() - I initially read this as only performing the anyMatch 
on right, and was trying to figure out why


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218383166
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -337,165 +364,167 @@ else if (useStrictConsistency)
   boolean 
useStrictConsistency,
   TokenMetadata tmdBefore,
   TokenMetadata tmdAfter,
-  Predicate 
isAlive,
   String keyspace,
-  
Collection> sourceFilters)
-{
-EndpointsByRange rangeAddresses = 
strat.getRangeAddresses(tmdBefore);
-
-InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
-logger.debug ("Keyspace: {}", keyspace);
-logger.debug("To fetch RN: {}", fetchRanges);
-logger.debug("Fetch ranges: {}", rangeAddresses);
-
-Predicate testSourceFilters = and(sourceFilters);
-Function sorted =
-endpoints -> 
snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
-//This list of replicas is just candidates. With strict 
consistency it's going to be a narrow list.
-EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = 
new EndpointsByReplica.Mutable();
-for (Replica toFetch : fetchRanges)
-{
-//Replica that is sufficient to provide the data we need
-//With strict consistency and transient replication we may end 
up with multiple types
-//so this isn't used with strict consistency
-Predicate isSufficient = r -> (toFetch.isTransient() 
|| r.isFull());
-Predicate accept = r ->
-   isSufficient.test(r) // is 
sufficient
-&& !r.endpoint().equals(localAddress)   // is not self
-&& isAlive.test(r); // is alive
-
-logger.debug("To fetch {}", toFetch);
-for (Range range : rangeAddresses.keySet())
-{
-if (range.contains(toFetch.range()))
-{
-EndpointsForRange oldEndpoints = 
rangeAddresses.get(range);
-
-//Ultimately we populate this with whatever is going 
to be fetched from to satisfy toFetch
-//It could be multiple endpoints and we must fetch 
from all of them if they are there
-//With transient replication and strict consistency 
this is to get the full data from a full replica and
-//transient data from the transient replica losing data
-EndpointsForRange sources;
-if (useStrictConsistency)
-{
-//Start with two sets of who replicates the range 
before and who replicates it after
-EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
-logger.debug("Old endpoints {}", oldEndpoints);
-logger.debug("New endpoints {}", newEndpoints);
-
-//Due to CASSANDRA-5953 we can have a higher RF 
then we have endpoints.
-//So we need to be careful to only be strict when 
endpoints == RF
-if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
-{
-Set 
endpointsStillReplicated = newEndpoints.endpoints();
-// Remove new endpoints from old endpoints 
based on address
-oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
-
-if (!all(oldEndpoints, isAlive))
-throw new IllegalStateException("A node 
required to move the data consistently is down: "
-+ 
oldEndpoints.filter(not(isAlive)));
-
-if (oldEndpoints.size() > 1)
-throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
-
-//If we are transitioning from transient to 
full and and the set of replicas for the range is not changing
-//we might end up with no endpoints to fetch 
from by address.

[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218387370
  
--- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
@@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int 
connectionsPerHost,
 /**
  * Request data in {@code keyspace} and {@code ranges} from specific 
node.
  *
+ * Here, we have to encode both _local_ range transientness (encoded 
in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by 
splitting ranges into full and transient.
+ *
+ * At the other end the distinction between full and transient is 
ignored it just used the transient status
+ * of the Replica objects we send to determine what to send. The real 
reason we have this split down to
--- End diff --

"At the other end the distinction between full and transient is ignored it 
just used the transient status of the Replica objects we send to determine what 
to send"

This isn't terribly clear - perhaps

"At the other end the distinction between full and transient is ignored; it 
is used only to create the Replica objects that are used to determine what to 
send"

This does raise the question of whether we should be sending Replica 
objects at all?  Presumably we *do* look at the transient/full status, in some 
way?  Or do we only care about the Range?  In which case, perhaps we 
should just send that, and avoid the confusion?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218371266
  
--- Diff: test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---
@@ -125,21 +125,19 @@ public boolean isAlive(InetAddressAndPort ep)
 s.addRanges(keyspaceName, 
Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd,
 myToken, myEndpoint));
 
 
-Collection> toFetch = 
s.toFetch().get(keyspaceName);
+Multimap toFetch = 
s.toFetch().get(keyspaceName);
 
 // Check we get get RF new ranges in total
-long rangesCount = toFetch.stream()
-   .map(Multimap::values)
-   .flatMap(Collection::stream)
-   .map(f -> f.remote)
-   .map(Replica::range)
-   .count();
+long rangesCount = toFetch.values().stream()
--- End diff --

surely we can just take toFetch.size()?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218382719
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -328,7 +356,6 @@ else if (useStrictConsistency)
  * Get a map of all ranges and the source that will be cleaned up once 
this bootstrapped node is added for the given ranges.
  * For each range, the list should only contain a single source. This 
allows us to consistently migrate data without violating
  * consistency.
- *
  **/
  public static EndpointsByReplica
  
calculateRangesToFetchWithPreferredEndpoints(BiFunction snitchGetSortedListByProximity,
--- End diff --

A comment specifying the requirement to now provide filters for liveness 
and not-self checks would help prevent future misuse, and make it easier to 
read the logic.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218470693
  
--- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
@@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int 
connectionsPerHost,
 /**
  * Request data in {@code keyspace} and {@code ranges} from specific 
node.
  *
+ * Here, we have to encode both _local_ range transientness (encoded 
in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by 
splitting ranges into full and transient.
+ *
+ * At the other end the distinction between full and transient is 
ignored it just used the transient status
+ * of the Replica objects we send to determine what to send. The real 
reason we have this split down to
--- End diff --

Thanks, I assumed as much.  We should clarify the comment to explain why we 
care then, as it currently reads as though the information may be redundant.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org