[GitHub] cassandra pull request #239: Optimize Streaming
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r201463505 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -249,4 +253,50 @@ public ByteBufAllocator getAllocator() { return channelConfig.getAllocator(); } + +/** + * Consumes bytes in the stream until the given length + * + * @param writer + * @param len + * @return + * @throws IOException + */ +public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException +{ +long copied = 0; // number of bytes copied +while (copied < len) +{ +int position = buffer.position(); +int remaining = buffer.remaining(); +if (remaining == 0) +{ +try +{ +reBuffer(); +} catch (EOFException e) +{ +throw new EOFException("EOF after " + copied + " bytes out of " + len); +} +position = buffer.position(); +remaining = buffer.remaining(); +if (remaining == 0) --- End diff -- Even you've done it! :) https://github.com/aweisberg/cassandra/commit/1491a40b7b4ea2723bcf22d870ee514b47ea901b#diff-baae9b53db48ff1ed346e469f7506545R192 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: Optimize Streaming
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r201461568 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -249,4 +253,50 @@ public ByteBufAllocator getAllocator() { return channelConfig.getAllocator(); } + +/** + * Consumes bytes in the stream until the given length + * + * @param writer + * @param len + * @return + * @throws IOException + */ +public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException +{ +long copied = 0; // number of bytes copied +while (copied < len) +{ +int position = buffer.position(); +int remaining = buffer.remaining(); +if (remaining == 0) +{ +try +{ +reBuffer(); +} catch (EOFException e) +{ +throw new EOFException("EOF after " + copied + " bytes out of " + len); +} +position = buffer.position(); +remaining = buffer.remaining(); +if (remaining == 0) --- End diff -- I think we need to raise this on the mailing list to possibly modify the code style, because for a long time it has been standard practice for many contributors (myself included) to elide the braces for single line statements, because otherwise the code becomes illegible. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216913492 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } --- End diff -- It does; in general the non-pluralised form of contact doesn't work well with the method names. Perhaps I should just pluralise and be done with it? Not sure why I was averse to this. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216913748 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java --- @@ -249,12 +229,32 @@ public int requiredParticipants() * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write) * @param */ -public static class Shared> +public interface Shared, P extends ReplicaPlan> { -private P replicaPlan; -public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; } -public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; } -public P get() { return replicaPlan; } +public void addToContact(Replica replica); +public P get(); +public abstract P getWithContact(E endpoints); } +public static class SharedForTokenRead implements Shared +{ +private ForTokenRead replicaPlan; +public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForTokenRead get() { return replicaPlan; } +public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); } +} + +public static class SharedForRangeRead implements Shared +{ +private ForRangeRead replicaPlan; +public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; } +public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); } +public ForRangeRead get() { return replicaPlan; } +public ForRangeRead getWithContact(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); } --- End diff -- This is a getter for the case where we don't want to update the reference. I will remove it to avoid any ambiguity. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216916456 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -194,14 +198,97 @@ public Token token() public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending) { -if (Endpoints.haveConflicts(natural, pending)) +if (haveWriteConflicts(natural, pending)) { -natural = Endpoints.resolveConflictsInNatural(natural, pending); -pending = Endpoints.resolveConflictsInPending(natural, pending); +natural = resolveWriteConflictsInNatural(natural, pending); +pending = resolveWriteConflictsInPending(natural, pending); } return new ReplicaLayout.ForTokenWrite(natural, pending); } +/** + * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation) + * or because an endpoint is transitioning between full and transient replication status. + * + * We essentially always prefer the full version for writes, because this is stricter. + * + * For transient->full transitions: + * + * Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration, + * it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form, + * as there is always a quorum of nodes receiving the write. However, ring ownership changes are not atomic or + * consistent across the cluster, and it is possible for writers to see different ring states. + * + * Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair + * (as the normal contract dictates) when it completes its transition. + * + * While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative + * available to us today to mitigate, and (we think) the easiest to reason about. + * + * For full->transient transitions: + * + * In this case, things are dicier, because in theory we can trigger this change instantly. All we need to do is + * drop some data, surely? + * + * Ring movements can put is in a pickle; any other node could believe us to be full when we have become transient, + * and perform a full data request to us that we believe ourselves capable of answering, but that we are not. + * If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing + * its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware + * of the data inconsistency. + * + * This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when + * the transition takes effect. As such, a node can hold an incorrect belief about its own ownership ranges. + * + * This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it. + * It is a little more dangerous with transient replication, however, because we can completely answer a request without + * ever touching a digest, meaning we are less likely to attempt to repair any inconsistency. + * + * We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance. + * + * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket + * to avoid corrupting our count. This is fine for writes, all we're doing is ensuring we always write to the node, + * instead of selectively. + * + * @param natural + * @param pending + * @param + * @return + */ +public static > boolean haveWriteConflicts(E natural, E pending) +{ +Set naturalEndpoints = natural.endpoints(); +for (InetAddressAndPort pendingEndpoint : pending.endpoints()) +{ +if (naturalEndpoints.contains(pendingEndpoint)) +return true; +} +return false; +} + +/** + * MUST APPLY FIRST + * See {@link ReplicaLayout#haveWriteConflicts} + * @return a 'natural' replica collection, that has had its conflicts with pending repaired + */ +public static > E resolveWriteConflictsInNa
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r216912914 --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java --- @@ -184,30 +185,28 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; -// Endpoints for Token -ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); +ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry); // Speculative retry is disabled *OR* // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) -// TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0 -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false); // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas -if (replicaLayout.selected().size() == replicaLayout.all().size()) +if (replicaPlan.contact().size() == replicaPlan.candidates().size()) { boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL; -return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation); +return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation); } // If CL.ALL, upgrade to AlwaysSpeculating; --- End diff -- No idea. I don't understand either, but as you say, it looks like the comment is anyway stale. I'll remove it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #288: In BufferPool, make allocating thread receive a...
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/288#discussion_r228448169 --- Diff: src/java/org/apache/cassandra/utils/memory/BufferPool.java --- @@ -237,23 +237,25 @@ void check() /** Return a chunk, the caller will take owership of the parent chunk. */ public Chunk get() { -while (true) -{ -Chunk chunk = chunks.poll(); -if (chunk != null) -return chunk; +Chunk chunk = chunks.poll(); +if (chunk != null) +return chunk; -if (!allocateMoreChunks()) -// give it one last attempt, in case someone else allocated before us -return chunks.poll(); -} +chunk = allocateMoreChunks(); +if (chunk != null) +return chunk; + +/* another thread may have just allocated last macro chunk, so +** make one final attempt before returning null --- End diff -- Super minor nit, with way more verbiage than necessary to follow (on my part), but this is the first time I've seen this style of code comment in the codebase. The norm is usually to use one of the two basic forms - short info comments just using // for each line, and larger explanatory comments using the standard: /** (this line typically left blank, for balance with terminating line, but not essential) * * */ I'm sure we don't stick to this everywhere, and it's not super important which style of code comment you use, but it would be preferable to stick to one of the two most standard formats. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r221526704 --- Diff: src/java/org/apache/cassandra/locator/ReplicaMultimap.java --- @@ -39,21 +39,28 @@ public abstract C get(K key); public C getIfPresent(K key) { return map.get(key); } -public static abstract class Mutable --- End diff -- It's necessary to be visible for testing, so I've simply added the annotation --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r221530400 --- Diff: src/java/org/apache/cassandra/locator/EndpointsByRange.java --- @@ -54,9 +55,13 @@ public void putAll(Range range, EndpointsForRange replicas, Conflict igno get(range).addAll(replicas, ignoreConflicts); } -public EndpointsByRange asImmutableView() +public EndpointsByRange build() { -return new EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView))); +Map, EndpointsForRange> map = +Collections.unmodifiableMap( +new HashMap<>( --- End diff -- No, it is not. Good point - thanks! --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219131794 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -1021,18 +1021,18 @@ private static void syncWriteToBatchlog(Collection mutations, Collecti handler.get(); } -private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) +private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); -for (InetAddressAndPort target : endpoints) +for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); -if (target.equals(FBUtilities.getBroadcastAddressAndPort())) -performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); +if (target.isLocal()) --- End diff -- I thought we had ended up calling this `isSelf` because `isLocal` is overloaded in this code (meaning, variably, is this instance (has our broadcast address); is the same DC as this instance; has the 'local' address). Not strictly related to this patch, but perhaps we should rename to `isSelf` while we're here and remember? This is consistent with the nomenclature amongst the TR patch, and in ReplicaCollection (`withoutSelf` and `selfIfPresent`) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219133467 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); +} + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); -ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + +// Batchlog is hosted by either one node or two nodes from different racks. +consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; --- End diff -- Are we sure it's OK (although consistent with prior behaviour), to only require CL.ONE if all nodes besides us are down? The logic above suggests we only require CL.ONE if we're a single-node DC, but we could have multiple failing nodes in our DC, and be the only one left. It seems like we should probably claim insufficient consistency in this case, though this question probably deserves a separate ticket for discussion. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219129563 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- should EndpointFilter be brought into ReplicaPlans? It's not used anywhere else, and it seems that the two pieces of logic should be proximal --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219128835 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), --- End diff -- indentation? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219138039 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- I don't love it either, but it should live together. Perhaps we can clean it up, and we can certainly at least make it a static method instead of a class. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #273: Fix overflow of 32-bit integer during compactio...
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/273#discussion_r219557162 --- Diff: src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java --- @@ -595,12 +600,22 @@ private boolean tryCell(int cell, int point, int delta) } } -private static int roundKey(int p, int roundSeconds) +private static int roundKey(int point, int roundSeconds) { -int d = p % roundSeconds; +int d = point % roundSeconds; if (d > 0) -return p + (roundSeconds - d); +{ +point += roundSeconds - d; +if (point == Cell.MAX_DELETION_TIME) --- End diff -- This should be > --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #273: Fix overflow of 32-bit integer during compactio...
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/273#discussion_r219557250 --- Diff: src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java --- @@ -595,12 +600,22 @@ private boolean tryCell(int cell, int point, int delta) } } -private static int roundKey(int p, int roundSeconds) +private static int roundKey(int point, int roundSeconds) { -int d = p % roundSeconds; +int d = point % roundSeconds; if (d > 0) -return p + (roundSeconds - d); +{ +point += roundSeconds - d; +if (point == Cell.MAX_DELETION_TIME) +// the rounding should not be a reason to mark point as unexpirable +point = Cell.MAX_DELETION_TIME; --- End diff -- why assign `point =` here, but `return` below? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r221306464 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() +{ +final int end = begin + size; +int next = begin; +int count = 0; +{ updateNext(); } +void updateNext() +{ +if (count == limit) next = end; +
[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/293#discussion_r235729649 --- Diff: conf/cassandra.yaml --- @@ -1003,3 +1003,9 @@ windows_timer_interval: 1 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions. # # otc_backlog_expiration_interval_ms: 200 + +# Define use of immediate flusher for replies to TCP connections. This is an alternate simplified flusher that does not +# depend on any event loop scheduling. Details around why this has been backported from trunk: CASSANDRA-14855. +# Default is false. +# native_transport_use_immediate_flusher: false --- End diff -- perhaps native_transport_flush_messages_immediately ? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217112723 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -18,364 +18,296 @@ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.utils.FBUtilities; -import static com.google.common.collect.Iterables.any; +import java.util.Set; +import java.util.function.Predicate; /** - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors - * for building the relevant layout. + * The relevant replicas for an operation over a given range or token. * - * Constitutes: - * - the 'natural' replicas replicating the range or token relevant for the operation - * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates - * - the 'selected' replicas, those that should be targeted for any operation - * - 'all' replicas represents natural+pending - * - * @param the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange) - * @param the type of itself, including its type parameters, for return type of modifying methods + * @param */ -public abstract class ReplicaLayout, L extends ReplicaLayout> +public abstract class ReplicaLayout> { -private volatile E all; -protected final E natural; -protected final E pending; -protected final E selected; - -protected final Keyspace keyspace; -protected final ConsistencyLevel consistencyLevel; - -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected) -{ -this(keyspace, consistencyLevel, natural, pending, selected, null); -} +private final E natural; -private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all) +ReplicaLayout(E natural) { -assert selected != null; -assert pending == null || !Endpoints.haveConflicts(natural, pending); -this.keyspace = keyspace; -this.consistencyLevel = consistencyLevel; this.natural = natural; -this.pending = pending; -this.selected = selected; -// if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural -if (all == null && pending == null) -all = natural; -this.all = all; } -public Replica getReplicaFor(InetAddressAndPort endpoint) -{ -return natural.byEndpoint().get(endpoint); -} - -public E natural() +/** + * The 'natural' owners of the ring position(s), as implied by the current ring layout. + * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but + * have not yet finished obtaining their view of the range. + */ +public final E natural() { return natural; } -public E all() -{ -E result = all; -if (result == null) -all = result = Endpoints.concat(natural, pending); -return result; -} - -public E selected() -{ -return selected; -} - /** - * @return the pending replicas - will be null for read layouts - * TODO: ideally we would enforce at compile time that read layouts have no pending to access + * All relevant owners of the r
[GitHub] cassandra pull request #265: 14705
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/265#discussion_r217175374 --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java --- @@ -53,15 +53,16 @@ protected final Queue repairs = new ConcurrentLinkedQueue<>(); private final int blockFor; -BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime) +BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { super(command, replicaPlan, queryStartNanoTime); -this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace); +this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace); } public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan) { -return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this); +// TODO: why are we referencing a different replicaPlan here? --- End diff -- I think that was a stale comment. The merging never modified the replicaPlan, and it should never be modified during the SRP. Logically, it should be a snapshot of only the relevant replicas for the SRP. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #270: 14759
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/270#discussion_r218488201 --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java --- @@ -275,7 +275,24 @@ public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending, Endpo */ private static > E resolveWriteConflictsInNatural(E natural, E pending) { -return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true)); +ReplicaCollection.Mutable resolved = natural.newMutable(natural.size()); +for (Replica replica : natural) +{ +// always prefer the full natural replica, if there is a conflict +if (!replica.isFull()) +{ +Replica conflict = pending.byEndpoint().get(replica.endpoint()); +if (conflict != null) +{ +// If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket +// to avoid corrupting our count +if (conflict.isFull()) resolved.add(new Replica(replica.endpoint(), replica.range(), true)); --- End diff -- No, you're right, good catch. Even if they are a different range, either should be acceptable for this query (by definition). I'll push an update. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #270: 14759
GitHub user belliottsmith opened a pull request: https://github.com/apache/cassandra/pull/270 14759 You can merge this pull request into a Git repository by running: $ git pull https://github.com/belliottsmith/cassandra 14759 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/270.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #270 commit 72791132da319188e709e1177817f08c41d9d8c6 Author: Benedict Elliott Smith Date: 2018-09-09T22:53:07Z Transient->Full movements mishandle consistency level upgrade patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14759 commit 4011548a6bf5fd0722f6036c0c8d88b626ad25fc Author: Benedict Elliott Smith Date: 2018-09-18T15:18:37Z circleci --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218387701 --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java --- @@ -1288,24 +1288,25 @@ public static synchronized void updateAvailableRanges(String keyspace, Collectio /** * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from. */ -public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner) +public static synchronized Pair>, Set>> getAvailableRanges(String keyspace, IPartitioner partitioner) --- End diff -- Could we introduce a concrete class for this concept, of full/transient ranges? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218383055 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -337,165 +364,167 @@ else if (useStrictConsistency) boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, - Predicate isAlive, String keyspace, - Collection> sourceFilters) -{ -EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - -InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); -logger.debug ("Keyspace: {}", keyspace); -logger.debug("To fetch RN: {}", fetchRanges); -logger.debug("Fetch ranges: {}", rangeAddresses); - -Predicate testSourceFilters = and(sourceFilters); -Function sorted = -endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - -//This list of replicas is just candidates. With strict consistency it's going to be a narrow list. -EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); -for (Replica toFetch : fetchRanges) -{ -//Replica that is sufficient to provide the data we need -//With strict consistency and transient replication we may end up with multiple types -//so this isn't used with strict consistency -Predicate isSufficient = r -> (toFetch.isTransient() || r.isFull()); -Predicate accept = r -> - isSufficient.test(r) // is sufficient -&& !r.endpoint().equals(localAddress) // is not self -&& isAlive.test(r); // is alive - -logger.debug("To fetch {}", toFetch); -for (Range range : rangeAddresses.keySet()) -{ -if (range.contains(toFetch.range())) -{ -EndpointsForRange oldEndpoints = rangeAddresses.get(range); - -//Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch -//It could be multiple endpoints and we must fetch from all of them if they are there -//With transient replication and strict consistency this is to get the full data from a full replica and -//transient data from the transient replica losing data -EndpointsForRange sources; -if (useStrictConsistency) -{ -//Start with two sets of who replicates the range before and who replicates it after -EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); -logger.debug("Old endpoints {}", oldEndpoints); -logger.debug("New endpoints {}", newEndpoints); - -//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. -//So we need to be careful to only be strict when endpoints == RF -if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) -{ -Set endpointsStillReplicated = newEndpoints.endpoints(); -// Remove new endpoints from old endpoints based on address -oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - -if (!all(oldEndpoints, isAlive)) -throw new IllegalStateException("A node required to move the data consistently is down: " -+ oldEndpoints.filter(not(isAlive))); - -if (oldEndpoints.size() > 1) -throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); - -//If we are transitioning from transient to full and and the set of replicas for the range is not changing -//we might end up with no endpoints to fetch from by address.
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218382891 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -87,8 +85,8 @@ private final InetAddressAndPort address; /* streaming description */ private final String description; -private final Multimap> toFetch = HashMultimap.create(); -private final Set> sourceFilters = new HashSet<>(); +private final Map> toFetch = new HashMap<>(); +private final Set sourceFilters = new HashSet<>(); --- End diff -- Does this need to be a Set? (I realise this is old, but it doesn't make much sense AFAICT) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218388108 --- Diff: src/java/org/apache/cassandra/dht/StreamStateStore.java --- @@ -54,8 +56,10 @@ public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partiti @VisibleForTesting public boolean isDataAvailable(String keyspace, Token token) { -RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); -return availableRanges.ranges().stream().anyMatch(range -> range.contains(token)); +Pair>, Set>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); + +return Streams.concat(availableRanges.left.stream(), + availableRanges.right.stream()).anyMatch(range -> range.contains(token)); --- End diff -- It would be clearer IMO to have .anyMatch indent on the newline, preferably inline with .concat() - I initially read this as only performing the anyMatch on right, and was trying to figure out why --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218383166 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -337,165 +364,167 @@ else if (useStrictConsistency) boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, - Predicate isAlive, String keyspace, - Collection> sourceFilters) -{ -EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - -InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); -logger.debug ("Keyspace: {}", keyspace); -logger.debug("To fetch RN: {}", fetchRanges); -logger.debug("Fetch ranges: {}", rangeAddresses); - -Predicate testSourceFilters = and(sourceFilters); -Function sorted = -endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - -//This list of replicas is just candidates. With strict consistency it's going to be a narrow list. -EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); -for (Replica toFetch : fetchRanges) -{ -//Replica that is sufficient to provide the data we need -//With strict consistency and transient replication we may end up with multiple types -//so this isn't used with strict consistency -Predicate isSufficient = r -> (toFetch.isTransient() || r.isFull()); -Predicate accept = r -> - isSufficient.test(r) // is sufficient -&& !r.endpoint().equals(localAddress) // is not self -&& isAlive.test(r); // is alive - -logger.debug("To fetch {}", toFetch); -for (Range range : rangeAddresses.keySet()) -{ -if (range.contains(toFetch.range())) -{ -EndpointsForRange oldEndpoints = rangeAddresses.get(range); - -//Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch -//It could be multiple endpoints and we must fetch from all of them if they are there -//With transient replication and strict consistency this is to get the full data from a full replica and -//transient data from the transient replica losing data -EndpointsForRange sources; -if (useStrictConsistency) -{ -//Start with two sets of who replicates the range before and who replicates it after -EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); -logger.debug("Old endpoints {}", oldEndpoints); -logger.debug("New endpoints {}", newEndpoints); - -//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. -//So we need to be careful to only be strict when endpoints == RF -if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) -{ -Set endpointsStillReplicated = newEndpoints.endpoints(); -// Remove new endpoints from old endpoints based on address -oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - -if (!all(oldEndpoints, isAlive)) -throw new IllegalStateException("A node required to move the data consistently is down: " -+ oldEndpoints.filter(not(isAlive))); - -if (oldEndpoints.size() > 1) -throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); - -//If we are transitioning from transient to full and and the set of replicas for the range is not changing -//we might end up with no endpoints to fetch from by address.
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218387370 --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java --- @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, /** * Request data in {@code keyspace} and {@code ranges} from specific node. * + * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint) + * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient. + * + * At the other end the distinction between full and transient is ignored it just used the transient status + * of the Replica objects we send to determine what to send. The real reason we have this split down to --- End diff -- "At the other end the distinction between full and transient is ignored it just used the transient status of the Replica objects we send to determine what to send" This isn't terribly clear - perhaps "At the other end the distinction between full and transient is ignored; it is used only to create the Replica objects that are used to determine what to send" This does raise the question of whether we should be sending Replica objects at all? Presumably we *do* look at the transient/full status, in some way? Or do we only care about the Range? In which case, perhaps we should just send that, and avoid the confusion? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218371266 --- Diff: test/unit/org/apache/cassandra/dht/BootStrapperTest.java --- @@ -125,21 +125,19 @@ public boolean isAlive(InetAddressAndPort ep) s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); -Collection> toFetch = s.toFetch().get(keyspaceName); +Multimap toFetch = s.toFetch().get(keyspaceName); // Check we get get RF new ranges in total -long rangesCount = toFetch.stream() - .map(Multimap::values) - .flatMap(Collection::stream) - .map(f -> f.remote) - .map(Replica::range) - .count(); +long rangesCount = toFetch.values().stream() --- End diff -- surely we can just take toFetch.size()? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218382719 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -328,7 +356,6 @@ else if (useStrictConsistency) * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. - * **/ public static EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(BiFunction snitchGetSortedListByProximity, --- End diff -- A comment specifying the requirement to now provide filters for liveness and not-self checks would help prevent future misuse, and make it easier to read the logic. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218470693 --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java --- @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, /** * Request data in {@code keyspace} and {@code ranges} from specific node. * + * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint) + * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient. + * + * At the other end the distinction between full and transient is ignored it just used the transient status + * of the Replica objects we send to determine what to send. The real reason we have this split down to --- End diff -- Thanks, I assumed as much. We should clarify the comment to explain why we care then, as it currently reads as though the information may be redundant. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org