[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/267 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219518994 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- Moved it --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219518928 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -1021,18 +1021,18 @@ private static void syncWriteToBatchlog(Collection mutations, Collecti handler.get(); } -private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) +private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); -for (InetAddressAndPort target : endpoints) +for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); -if (target.equals(FBUtilities.getBroadcastAddressAndPort())) -performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); +if (target.isLocal()) --- End diff -- Renamed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219508092 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -780,8 +780,9 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally -final Collection batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); -BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); +ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); +BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), --- End diff -- Fixed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219508033 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException --- End diff -- > Either way, while we're here we should document in the caller the fact that the provided consistencyLevel is not used, and that the batchlogConsistencyLevel is used to clear the entries from the remote batch logs once the real write has reached that consistency level. not sure I understand what you mean, as we're using it both for cleanup and for writing to batchlog. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219140446 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); +} + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); -ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + +// Batchlog is hosted by either one node or two nodes from different racks. +consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; --- End diff -- Right, I had similar thoughts when writing that. I've also been thinking if it's enough to only have two nodes in local DC blocking for batchlog writes. I wanted to make a change to original CL, but that'd might reduce availability (for instance when it's a quorum). But from looking at batch replay, we should be able to do it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219138039 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- I don't love it either, but it should live together. Perhaps we can clean it up, and we can certainly at least make it a static method instead of a class. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219137918 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); --- End diff -- We don't strictly need it: included it only to short-circuit. We can skip it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219131794 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -1021,18 +1021,18 @@ private static void syncWriteToBatchlog(Collection mutations, Collecti handler.get(); } -private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) +private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); -for (InetAddressAndPort target : endpoints) +for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); -if (target.equals(FBUtilities.getBroadcastAddressAndPort())) -performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); +if (target.isLocal()) --- End diff -- I thought we had ended up calling this `isSelf` because `isLocal` is overloaded in this code (meaning, variably, is this instance (has our broadcast address); is the same DC as this instance; has the 'local' address). Not strictly related to this patch, but perhaps we should rename to `isSelf` while we're here and remember? This is consistent with the nomenclature amongst the TR patch, and in ReplicaCollection (`withoutSelf` and `selfIfPresent`) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219133467 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); +} + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); -ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + +// Batchlog is hosted by either one node or two nodes from different racks. +consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; --- End diff -- Are we sure it's OK (although consistent with prior behaviour), to only require CL.ONE if all nodes besides us are down? The logic above suggests we only require CL.ONE if we're a single-node DC, but we could have multiple failing nodes in our DC, and be the only one left. It seems like we should probably claim insufficient consistency in this case, though this question probably deserves a separate ticket for discussion. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219128835 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), --- End diff -- indentation? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219129563 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- should EndpointFilter be brought into ReplicaPlans? It's not used anywhere else, and it seems that the two pieces of logic should be proximal --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org