[GitHub] cassandra pull request #267: Consolidate batch write code

2018-10-04 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/267


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219518994
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

Moved it


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219518928
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1021,18 +1021,18 @@ private static void 
syncWriteToBatchlog(Collection mutations, Collecti
 handler.get();
 }
 
-private static void 
asyncRemoveFromBatchlog(Collection endpoints, UUID uuid)
+private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite 
replicaPlan, UUID uuid)
 {
 MessageOut message = new 
MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, 
UUIDSerializer.serializer);
-for (InetAddressAndPort target : endpoints)
+for (Replica target : replicaPlan.contacts())
 {
 if (logger.isTraceEnabled())
 logger.trace("Sending batchlog remove request {} to {}", 
uuid, target);
 
-if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
-performLocally(Stage.MUTATION, 
SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+if (target.isLocal())
--- End diff --

Renamed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219508092
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -780,8 +780,9 @@ public static void mutateMV(ByteBuffer dataKey, 
Collection mutations,
 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
 
 //Since the base -> view replication is 1:1 we only need 
to store the BL locally
-final Collection batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
-BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> 
asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forLocalBatchlogWrite();
+BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
--- End diff --


Fixed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219508033
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
--- End diff --

> Either way, while we're here we should document in the caller the fact 
that the provided consistencyLevel is not used, and that the 
batchlogConsistencyLevel is used to clear the entries from the remote batch 
logs once the real write has reached that consistency level.

not sure I understand what you mean, as we're using it both for cleanup and 
for writing to batchlog. 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219140446
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
+}
+
 ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
-
SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
 EndpointsForToken.empty(token)
 );
-ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 
? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+// Batchlog is hosted by either one node or two nodes from 
different racks.
+consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
--- End diff --

Right, I had similar thoughts when writing that. I've also been thinking if 
it's enough to only have two nodes in local DC blocking for batchlog writes. I 
wanted to make a change to original CL, but that'd might reduce availability 
(for instance when it's a quorum). But from looking at batch replay, we should 
be able to do it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219138039
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

I don't love it either, but it should live together.  Perhaps we can clean 
it up, and we can certainly at least make it a static method instead of a class.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219137918
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
--- End diff --

We don't strictly need it: included it only to short-circuit. We can skip 
it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219131794
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1021,18 +1021,18 @@ private static void 
syncWriteToBatchlog(Collection mutations, Collecti
 handler.get();
 }
 
-private static void 
asyncRemoveFromBatchlog(Collection endpoints, UUID uuid)
+private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite 
replicaPlan, UUID uuid)
 {
 MessageOut message = new 
MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, 
UUIDSerializer.serializer);
-for (InetAddressAndPort target : endpoints)
+for (Replica target : replicaPlan.contacts())
 {
 if (logger.isTraceEnabled())
 logger.trace("Sending batchlog remove request {} to {}", 
uuid, target);
 
-if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
-performLocally(Stage.MUTATION, 
SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+if (target.isLocal())
--- End diff --

I thought we had ended up calling this `isSelf` because `isLocal` is 
overloaded in this code (meaning, variably, is this instance (has our broadcast 
address); is the same DC as this instance; has the 'local' address).  Not 
strictly related to this patch, but perhaps we should rename to `isSelf` while 
we're here and remember?  This is consistent with the nomenclature amongst the 
TR patch, and in ReplicaCollection (`withoutSelf` and `selfIfPresent`)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219133467
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
+}
+
 ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
-
SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
 EndpointsForToken.empty(token)
 );
-ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 
? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+// Batchlog is hosted by either one node or two nodes from 
different racks.
+consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
--- End diff --

Are we sure it's OK (although consistent with prior behaviour), to only 
require CL.ONE if all nodes besides us are down?  The logic above suggests we 
only require CL.ONE if we're a single-node DC, but we could have multiple 
failing nodes in our DC, and be the only one left.  It seems like we should 
probably claim insufficient consistency in this case, though this question 
probably deserves a separate ticket for discussion.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219128835
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
--- End diff --

indentation?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219129563
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

should EndpointFilter be brought into ReplicaPlans?  It's not used anywhere 
else, and it seems that the two pieces of logic should be proximal


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org