[GitHub] cassandra-dtest pull request #9: 10857 trunk
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/9#discussion_r149322322 --- Diff: upgrade_tests/upgrade_compact_storage.py --- @@ -0,0 +1,177 @@ +# coding: utf-8 + +import time + +from cassandra.query import dict_factory +from nose.tools import assert_equal, assert_true +from ccmlib.node import NodeError + +from dtest import Tester, debug +from cassandra.protocol import ConfigurationException +from tools.decorators import since + +VERSION_311 = 'git:cassandra-3.11' +VERSION_TRUNK = 'git:trunk' + + +@since('4.0') +class UpgradeSuperColumnsThrough(Tester): +def upgrade_to_version(self, tag, start_rpc=True, wait=True, nodes=None): +debug('Upgrading to ' + tag) +if nodes is None: +nodes = self.cluster.nodelist() + +for node in nodes: +debug('Shutting down node: ' + node.name) +node.drain() +node.watch_log_for("DRAINED") +node.stop(wait_other_notice=False) + +# Update Cassandra Directory +for node in nodes: +node.set_install_dir(version=tag) +debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) +self.cluster.set_install_dir(version=tag) + +# Restart nodes on new version +for node in nodes: +debug('Starting %s on new version (%s)' % (node.name, tag)) +node.start(wait_other_notice=wait, wait_for_binary_proto=wait) + +def prepare(self, num_nodes=1, cassandra_version="github:apache/cassandra-2.2"): +cluster = self.cluster + +# Forcing cluster version on purpose +cluster.set_install_dir(version=cassandra_version) + +cluster.populate(num_nodes) + +cluster.start() +return cluster + +def upgrade_compact_storage_test(self): +cluster = self.prepare(cassandra_version='github:apache/cassandra-3.0') +node = self.cluster.nodelist()[0] +cursor = self.patient_cql_connection(node, row_factory=dict_factory) + +cursor.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };") +cursor.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE") + +for i in xrange(1, 5): +cursor.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i)) + +self.upgrade_to_version(VERSION_TRUNK, wait=False) +self.allow_log_errors = True + +time.sleep(5) +# After restart, it won't start +errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version")) +assert_true(errors > 0) + +def mixed_cluster_test(self): +cluster = self.prepare(num_nodes=2, cassandra_version=VERSION_311) +node1 = self.cluster.nodelist()[0] +node2 = self.cluster.nodelist()[1] + +node1.drain() +node1.watch_log_for("DRAINED") +node1.stop(wait_other_notice=False) +node1.set_install_dir(version=VERSION_TRUNK) +node1.start(wait_other_notice=True, wait_for_binary_proto=True) + +cursor = self.patient_cql_connection(node2, row_factory=dict_factory) --- End diff -- Right, was a copy-paste from `upgrade_tests/thrift_upgrade_test.py`, you might want to change it there, too. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #9: 10857 trunk
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/9#discussion_r149320134 --- Diff: requirements.txt --- @@ -3,7 +3,7 @@ # http://datastax.github.io/python-driver/installation.html#cython-based-extensions futures six --e git+https://github.com/datastax/python-driver.git@cassandra-test#egg=cassandra-driver +-e git+https://github.com/datastax/python-driver.git@master#egg=cassandra-driver --- End diff -- By now it's already merged, i needed this one to run tests. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #9: 10857 trunk
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra-dtest/pull/9 10857 trunk You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra-dtest 10857-trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra-dtest/pull/9.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9 commit 1a03ee1ff8f3a24aec2a6faba0b54611549bbb2d Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-24T11:41:35Z Fix 4.0 tests commit be4da63d8cba2c970acbbfcb7322ea5abc22aed5 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-24T15:28:47Z Add upgrade compact storage tests commit ef5de97e40f2d1cf5c9bb0ff33f9544169ae2555 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-25T11:25:42Z Fix paths commit 556255f15cb12bae17756ed20b00c92b80029f31 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-30T21:38:21Z more tests commit 2a686cab61786e34c763bd03d18b26ef556562b5 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-31T10:40:16Z Looks like dtest branch is not up to date commit 5b9daaee894640c298d0246ee3eca3b059103c98 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-11-06T08:23:24Z Fix tests commit 502fa6d0f33863f45af2d0ba4a2ab98c7eddd285 Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-10-23T11:29:41Z Add dtest for compact storage flag drop commit 3b9ea6448048de076306ba6c38d4a5a498a9ecfd Author: Alex Petrov <oleksandr.pet...@gmail.com> Date: 2017-11-06T15:46:18Z Fix tests --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197126256 --- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java --- @@ -242,11 +249,11 @@ public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable= blockFor(keyspace); +return Iterables.size(liveReplicas) >= blockFor(keyspace); } } -public void assureSufficientLiveNodes(Keyspace keyspace, Iterable liveEndpoints) throws UnavailableException +public void assureSufficientLiveNodes(Keyspace keyspace, Iterable liveReplicas) throws UnavailableException --- End diff -- We can use `Replicas` here. Another question: should there be a distinction between sufficient live nodes for read and write path? Do we want to make sure there's a sufficient amount of live full nodes here or later? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197156136 --- Diff: src/java/org/apache/cassandra/service/StorageService.java --- @@ -4231,53 +4211,53 @@ private void calculateToFromStreams(Collection newTokens, List ke InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); -// clone to avoid concurrent modification in calculateNaturalEndpoints +// clone to avoid concurrent modification in calculateNaturalReplicas TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); for (String keyspace : keyspaceNames) { // replication strategy of the current keyspace AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); -Multimap> endpointToRanges = strategy.getAddressRanges(); +ReplicaMultimap endpointToRanges = strategy.getAddressReplicas(); logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); for (Token newToken : newTokens) { // getting collection of the currently used ranges by this keyspace -Collection> currentRanges = endpointToRanges.get(localAddress); +ReplicaSet currentReplicas = endpointToRanges.get(localAddress); --- End diff -- `currentReplicas` is used inside the loop only. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197156892 --- Diff: src/java/org/apache/cassandra/locator/TokenMetadata.java --- @@ -733,19 +733,19 @@ public InetAddressAndPort getEndpoint(Token token) return sortedTokens; } -public Multimap, InetAddressAndPort> getPendingRangesMM(String keyspaceName) +public ReplicaMultimap, ReplicaSet> getPendingRangesMM(String keyspaceName) { -Multimap, InetAddressAndPort> map = HashMultimap.create(); +ReplicaMultimap, ReplicaSet> map = ReplicaMultimap.set(); PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps != null) { -for (Map.Entry, List> entry : pendingRangeMaps) +for (Map.Entry, ReplicaList> entry : pendingRangeMaps) { Range range = entry.getKey(); -for (InetAddressAndPort address : entry.getValue()) +for (Replica replica : entry.getValue()) { -map.put(range, address); +map.put(range, replica); --- End diff -- Since we're putting by `range` and iterating over `entry.getValue` here, it seems like we can spare this iteration. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197131214 --- Diff: src/java/org/apache/cassandra/locator/ReplicationFactor.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.Objects; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.config.DatabaseDescriptor; + +public class ReplicationFactor +{ +public static final ReplicationFactor ZERO = new ReplicationFactor(0); + +public final int trans; +public final int replicas; +public transient final int full; + +private ReplicationFactor(int replicas, int trans) +{ +validate(replicas, trans); +this.replicas = replicas; +this.trans = trans; +this.full = replicas - trans; +} + +private ReplicationFactor(int replicas) +{ +this(replicas, 0); +} + +static void validate(int replicas, int trans) +{ +Preconditions.checkArgument(trans == 0 || DatabaseDescriptor.isTransientReplicationEnabled(), +"Transient replication is not enabled on this node"); +Preconditions.checkArgument(replicas >= 0, --- End diff -- To my understanding, replication factor has to be strictly positive (e.g. min of 1). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197130790 --- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class ReplicaList extends Replicas +{ +static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of()); + +private final List replicaList; + +public ReplicaList() +{ +this(new ArrayList<>()); +} + +public ReplicaList(int capacity) +{ +this(new ArrayList<>(capacity)); +} + +public ReplicaList(ReplicaList from) +{ +this(new ArrayList<>(from.replicaList)); +} + +public ReplicaList(Replicas from) +{ +this(new ArrayList<>(from.size())); +addAll(from); +} + +public ReplicaList(Collection from) +{ +this(new ArrayList<>(from)); +} + +private ReplicaList(List replicaList) +{ +this.replicaList = replicaList; +} + +@Override +public String toString() +{ +return replicaList.toString(); +} + +@Override +public boolean add(Replica replica) +{ +Preconditions.checkNotNull(replica); +return replicaList.add(replica); +} + +@Override +public void addAll(Iterable replicas) +{ +Iterables.addAll(replicaList, replicas); +} + +@Override +public int size() +{ +return replicaList.size(); +} + +@Override +protected Collection getUnmodifiableCollection() +{ +return Collections.unmodifiableCollection(replicaList); +} + +@Override +public Iterator iterator() +{ +return replicaList.iterator(); +} + +public Replica get(int idx) +{ +return replicaList.get(idx); +} + +@Override +public void removeEndpoint(InetAddressAndPort endpoint) +{ +for (int i=replicaList.size()-1; i>=0; i--) +{ +if (replicaList.get(i).getEndpoint().equals(endpoint)) +{ +replicaList.remove(i); +} +} +} + +@Override +public void removeReplica(Replica replica) +{ +replicaList.remove(replica); +} + +@Override +public boolean containsEndpoint(InetAddressAndPort endpoint) +{ +for (int i=0; i predicate) +{ +ArrayList newReplicaList = size() < 10 ? new ArrayList<>(size()) : new ArrayList<>(); +for (int i=0; i comparator) +{ +replicaList.sort(comparator); +} + +public static ReplicaList intersectEndpoints(ReplicaList l1, ReplicaList l2) +{ +Replicas.checkFull(l1); +Replicas.checkFull(l2); +// Note: we don't use Guava Sets.intersection() for 3 reasons: +// 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and +// so will be very small (< RF). In that case, retainAll is in fact more efficient. +// 2) we do ultimately need a list so converting everything to sets
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197133241 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -344,47 +343,43 @@ private static void recordCasContention(int contentions) casWriteMetrics.contention.update(contentions); } -private static Predicate sameDCPredicateFor(final String dc) +private static Predicate sameDCPredicateFor(final String dc) { final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); -return new Predicate() -{ -public boolean apply(InetAddressAndPort host) -{ -return dc.equals(snitch.getDatacenter(host)); -} -}; +return replica -> dc.equals(snitch.getDatacenter(replica)); } private static PaxosParticipants getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = key.getToken(); -List naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); -Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); +ReplicaList naturalReplicas = StorageService.instance.getNaturalReplicas(metadata.keyspace, tk); +ReplicaList pendingReplicas = new ReplicaList(StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace)); if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) { -// Restrict naturalEndpoints and pendingEndpoints to node in the local DC only +// Restrict naturalReplicas and pendingReplicas to node in the local DC only String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); -Predicate isLocalDc = sameDCPredicateFor(localDc); -naturalEndpoints = ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc)); -pendingEndpoints = ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc)); +Predicate isLocalDc = sameDCPredicateFor(localDc); +naturalReplicas = ReplicaList.immutableCopyOf(naturalReplicas.filter(isLocalDc)); +pendingReplicas = ReplicaList.immutableCopyOf(pendingReplicas.filter(isLocalDc)); } -int participants = pendingEndpoints.size() + naturalEndpoints.size(); +int participants = pendingReplicas.size() + naturalReplicas.size(); int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 -List liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); -if (liveEndpoints.size() < requiredParticipants) -throw new UnavailableException(consistencyForPaxos, requiredParticipants, liveEndpoints.size()); + +Replicas concatenated = Replicas.concatNaturalAndPending(naturalReplicas, pendingReplicas); +ReplicaList liveReplicas = ReplicaList.immutableCopyOf(Replicas.filter(concatenated, IAsyncCallback.isReplicaAlive)); --- End diff -- Same here (with Immutable). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197149200 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -1275,36 +1272,38 @@ private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation * * @throws OverloadedException if the hints cannot be written/enqueued */ -public static void sendToHintedEndpoints(final Mutation mutation, - Iterable targets, - AbstractWriteResponseHandler responseHandler, - String localDataCenter, - Stage stage) +public static void sendToHintedReplicas(final Mutation mutation, +Iterable targets, + AbstractWriteResponseHandler responseHandler, +String localDataCenter, +Stage stage) throws OverloadedException { int targetsSize = Iterables.size(targets); // this dc replicas: -Collection localDc = null; +Collection localDc = null; --- End diff -- We can use `Replicas` here --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197156422 --- Diff: src/java/org/apache/cassandra/service/StorageService.java --- @@ -4231,53 +4211,53 @@ private void calculateToFromStreams(Collection newTokens, List ke InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); -// clone to avoid concurrent modification in calculateNaturalEndpoints +// clone to avoid concurrent modification in calculateNaturalReplicas TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); for (String keyspace : keyspaceNames) { // replication strategy of the current keyspace AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); -Multimap> endpointToRanges = strategy.getAddressRanges(); +ReplicaMultimap endpointToRanges = strategy.getAddressReplicas(); --- End diff -- add `getAddressReplicas` by `InetAddressAndPort` to avoid materialising map. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197124802 --- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java --- @@ -148,40 +150,45 @@ public boolean isLocal(InetAddressAndPort endpoint) return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); } -public int countLocalEndpoints(Iterable liveEndpoints) +public boolean isLocal(Replica replica) +{ +return isLocal(replica.getEndpoint()); +} + +public int countLocalEndpoints(Iterable liveReplicas) --- End diff -- This can be just `Replicas` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197136141 --- Diff: src/java/org/apache/cassandra/service/StorageService.java --- @@ -4231,53 +4211,53 @@ private void calculateToFromStreams(Collection newTokens, List ke InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); -// clone to avoid concurrent modification in calculateNaturalEndpoints +// clone to avoid concurrent modification in calculateNaturalReplicas TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); for (String keyspace : keyspaceNames) { // replication strategy of the current keyspace AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); -Multimap> endpointToRanges = strategy.getAddressRanges(); +ReplicaMultimap endpointToRanges = strategy.getAddressReplicas(); logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); for (Token newToken : newTokens) { // getting collection of the currently used ranges by this keyspace -Collection> currentRanges = endpointToRanges.get(localAddress); +ReplicaSet currentReplicas = endpointToRanges.get(localAddress); // collection of ranges which this node will serve after move to the new token -Collection> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); +ReplicaSet updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data -Multimap, InetAddressAndPort> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); +ReplicaMultimap, ReplicaSet> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); // calculated parts of the ranges to request/stream from/to nodes in the ring -Pair>, Set>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges); +Pair>, Set>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas); /** * In this loop we are going through all ranges "to fetch" and determining * nodes in the ring responsible for data we are interested in */ -Multimap, InetAddressAndPort> rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create(); +ReplicaMultimap, ReplicaList> rangesToFetchWithPreferredEndpoints = ReplicaMultimap.list(); --- End diff -- We can iterate over `entrySet` rather than call `get` below. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197131513 --- Diff: src/java/org/apache/cassandra/locator/ReplicationFactor.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.Objects; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.config.DatabaseDescriptor; + +public class ReplicationFactor +{ +public static final ReplicationFactor ZERO = new ReplicationFactor(0); + +public final int trans; +public final int replicas; +public transient final int full; + +private ReplicationFactor(int replicas, int trans) +{ +validate(replicas, trans); +this.replicas = replicas; +this.trans = trans; +this.full = replicas - trans; +} + +private ReplicationFactor(int replicas) +{ +this(replicas, 0); +} + +static void validate(int replicas, int trans) +{ +Preconditions.checkArgument(trans == 0 || DatabaseDescriptor.isTransientReplicationEnabled(), +"Transient replication is not enabled on this node"); +Preconditions.checkArgument(replicas >= 0, +"Replication factor must be non-negative, found %s", replicas); +Preconditions.checkArgument(trans == 0 || trans < replicas, --- End diff -- We should also check for nonnegative here, since `full` is calculated by `replicas - trans` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197129915 --- Diff: src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java --- @@ -90,41 +98,53 @@ public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, /** Number of replicas left to fill from this DC. */ int rfLeft; int acceptableRackRepeats; +int transients; -DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set endpoints, Set> racks) +DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, ReplicaSet replicas, Set> racks) { -this.endpoints = endpoints; +this.replicas = replicas; this.racks = racks; // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF. -this.rfLeft = Math.min(rf, nodeCount); +this.rfLeft = Math.min(rf.replicas, nodeCount); // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack, // and the difference is to be filled by the first encountered nodes. -acceptableRackRepeats = rf - rackCount; +acceptableRackRepeats = rf.replicas - rackCount; + +// if we have fewer replicas than rf calls for, reduce transients accordingly +int reduceTransients = rf.replicas - this.rfLeft; +transients = Math.max(rf.trans - reduceTransients, 0); +ReplicationFactor.validate(rfLeft, transients); } /** - * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful. + * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful. * Returns true if the endpoint was added, and this datacenter does not require further replicas. */ -boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair location) +boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair location, Range replicatedRange) { if (done()) return false; +if (replicas.containsEndpoint(ep)) --- End diff -- Previously this has been handled with `endpoints.add` which was using Set as well, and it was throwing with `assert` rather than skipping. Did we change some subtle semantics here? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197126709 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -259,36 +266,36 @@ private boolean useStrictSourcesForRanges(String keyspaceName) * * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found. */ -private Multimap, InetAddressAndPort> getAllRangesWithStrictSourcesFor(String keyspace, Collection> desiredRanges) +private ReplicaMultimap, ReplicaList> getAllRangesWithStrictSourcesFor(String keyspace, Iterable> desiredRanges) { assert tokens != null; AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); // Active ranges TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); -Multimap, InetAddressAndPort> addressRanges = strat.getRangeAddresses(metadataClone); +ReplicaMultimap, ReplicaSet> addressRanges = strat.getRangeAddresses(metadataClone); // Pending ranges metadataClone.updateNormalTokens(tokens, address); -Multimap, InetAddressAndPort> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); +ReplicaMultimap, ReplicaSet> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); // Collects the source that will have its range moved to the new node -Multimap, InetAddressAndPort> rangeSources = ArrayListMultimap.create(); +ReplicaMultimap, ReplicaList> rangeSources = ReplicaMultimap.list(); for (Range desiredRange : desiredRanges) { -for (Map.Entry, Collection> preEntry : addressRanges.asMap().entrySet()) +for (Map.Entry, ReplicaSet> preEntry : addressRanges.asMap().entrySet()) { if (preEntry.getKey().contains(desiredRange)) { -Set oldEndpoints = Sets.newHashSet(preEntry.getValue()); --- End diff -- Here, we're creating a new `Set`, then removing things from it. We could do an equivalent (and avoid copy of `newEndpoints`) by just running `filter` on `preEntry.getValue()`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197125691 --- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java --- @@ -190,50 +197,50 @@ public int countLocalEndpoints(Iterable liveEndpoints) * the blockFor first ones). */ if (isDCLocal) -liveEndpoints.sort(DatabaseDescriptor.getLocalComparator()); +liveReplicas.sort(DatabaseDescriptor.getLocalComparator()); -return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace))); +return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace))); } -private List filterForEachQuorum(Keyspace keyspace, List liveEndpoints) +private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList liveReplicas) --- End diff -- Here, the sorted variant is not preserved (even though we return a sorted collection from `filterForQuery`). We could retain sorted invariant if we used `filter`, maybe something like: ``` private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList liveReplicas) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); Map dcsReplicas = new HashMap<>(); for (String dc : strategy.getDatacenters()) { // we put _up to_ dc replicas only dcsReplicas.put(dc, localQuorumFor(keyspace, dc)); } return liveReplicas.filter((replica) -> { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); int replicas = dcsReplicas.get(dc); if (replicas > 0) { dcsReplicas.put(dc, --replicas); return true; } return false; }); } ``` (also would avoid three iterations in favour of just two). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197161771 --- Diff: test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java --- @@ -97,13 +103,13 @@ public void searchTokenForOldPendingRanges(final Blackhole bh) { int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); -Set endpoints = new HashSet<>(); -for (Map.Entry, Collection> entry : oldPendingRanges.asMap().entrySet()) +Set replicas = new HashSet<>(); +for (Map.Entry, Collection> entry : oldPendingRanges.asMap().entrySet()) --- End diff -- Since all usages of `asMap` are used with `entrySet`, should we just expose `entrySet` instead and keep map private? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197153506 --- Diff: src/java/org/apache/cassandra/db/view/ViewUtils.java --- @@ -58,46 +57,55 @@ private ViewUtils() * * @return Optional.empty() if this method is called using a base token which does not belong to this replica */ -public static Optional getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) +public static Optional getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); -List baseEndpoints = new ArrayList<>(); -List viewEndpoints = new ArrayList<>(); -for (InetAddressAndPort baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) +ReplicaList baseReplicas = new ReplicaList(); +ReplicaList viewReplicas = new ReplicaList(); +for (Replica baseEndpoint : replicationStrategy.getNaturalReplicas(baseToken)) { // An endpoint is local if we're not using Net if (!(replicationStrategy instanceof NetworkTopologyStrategy) || DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter)) -baseEndpoints.add(baseEndpoint); +baseReplicas.add(baseEndpoint); } -for (InetAddressAndPort viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) +for (Replica viewEndpoint : replicationStrategy.getNaturalReplicas(viewToken)) { // If we are a base endpoint which is also a view replica, we use ourselves as our view replica -if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort())) +if (viewEndpoint.isLocal()) return Optional.of(viewEndpoint); // We have to remove any endpoint which is shared between the base and the view, as it will select itself // and throw off the counts otherwise. -if (baseEndpoints.contains(viewEndpoint)) -baseEndpoints.remove(viewEndpoint); +if (baseReplicas.containsEndpoint(viewEndpoint.getEndpoint())) +baseReplicas.removeEndpoint(viewEndpoint.getEndpoint()); else if (!(replicationStrategy instanceof NetworkTopologyStrategy) || DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter)) -viewEndpoints.add(viewEndpoint); +viewReplicas.add(viewEndpoint); } // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace. // Since the same replication strategy is used, the same placement should be used and we should get the same // number of replicas for all of the tokens in the ring. -assert baseEndpoints.size() == viewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view"; -int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort()); +assert baseReplicas.size() == viewReplicas.size() : "Replication strategy should have the same number of endpoints for the base and the view"; --- End diff -- We could simplify this slightly by using set + iterator instead of `get` here: ```diff String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); -ReplicaList baseReplicas = new ReplicaList(); -ReplicaList viewReplicas = new ReplicaList(); +ReplicaSet baseReplicas = new ReplicaSet(); +ReplicaSet viewReplicas = new ReplicaSet(); +// We might add a method that filters natural endpoints by dc for (Replica baseEndpoint : replicationStrategy.getNaturalReplicas(baseToken)) { // An endpoint is local if we're not using Net @@ -92,20 +95,18 @@ public final class ViewUtils // number of replicas for all of the tokens in the ring. assert baseReplicas.size() == viewReplicas.size() : "Replication strategy should have the same number of endpoints for the base and the view"; -int baseIdx = -1; -for (int i=0; i baseReplicaIteraror = baseReplicas.iterator(); +Iterator viewReplicaIteraror = v
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197132925 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -344,47 +343,43 @@ private static void recordCasContention(int contentions) casWriteMetrics.contention.update(contentions); } -private static Predicate sameDCPredicateFor(final String dc) +private static Predicate sameDCPredicateFor(final String dc) { final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); -return new Predicate() -{ -public boolean apply(InetAddressAndPort host) -{ -return dc.equals(snitch.getDatacenter(host)); -} -}; +return replica -> dc.equals(snitch.getDatacenter(replica)); } private static PaxosParticipants getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = key.getToken(); -List naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); -Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); +ReplicaList naturalReplicas = StorageService.instance.getNaturalReplicas(metadata.keyspace, tk); --- End diff -- Looks like we can just concat natural (as list) and pending (as set), without additional copying in-between. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197159962 --- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class ReplicaList extends Replicas +{ +static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of()); + +private final List replicaList; + +public ReplicaList() +{ +this(new ArrayList<>()); +} + +public ReplicaList(int capacity) +{ +this(new ArrayList<>(capacity)); +} + +public ReplicaList(ReplicaList from) +{ +this(new ArrayList<>(from.replicaList)); +} + +public ReplicaList(Replicas from) +{ +this(new ArrayList<>(from.size())); +addAll(from); +} + +public ReplicaList(Collection from) +{ +this(new ArrayList<>(from)); +} + +private ReplicaList(List replicaList) +{ +this.replicaList = replicaList; +} + +@Override +public String toString() +{ +return replicaList.toString(); +} + +@Override +public boolean add(Replica replica) +{ +Preconditions.checkNotNull(replica); +return replicaList.add(replica); +} + +@Override +public void addAll(Iterable replicas) +{ +Iterables.addAll(replicaList, replicas); +} + +@Override +public int size() +{ +return replicaList.size(); +} + +@Override +protected Collection getUnmodifiableCollection() +{ +return Collections.unmodifiableCollection(replicaList); +} + +@Override +public Iterator iterator() +{ +return replicaList.iterator(); +} + +public Replica get(int idx) +{ +return replicaList.get(idx); +} + +@Override +public void removeEndpoint(InetAddressAndPort endpoint) +{ +for (int i=replicaList.size()-1; i>=0; i--) +{ +if (replicaList.get(i).getEndpoint().equals(endpoint)) +{ +replicaList.remove(i); +} +} +} + +@Override +public void removeReplica(Replica replica) +{ +replicaList.remove(replica); +} + +@Override +public boolean containsEndpoint(InetAddressAndPort endpoint) +{ +for (int i=0; i predicate) +{ +ArrayList newReplicaList = size() < 10 ? new ArrayList<>(size()) : new ArrayList<>(); +for (int i=0; i comparator) +{ +replicaList.sort(comparator); +} + +public static ReplicaList intersectEndpoints(ReplicaList l1, ReplicaList l2) +{ +Replicas.checkFull(l1); +Replicas.checkFull(l2); +// Note: we don't use Guava Sets.intersection() for 3 reasons: +// 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and +// so will be very small (< RF). In that case, retainAll is in fact more efficient. +// 2) we do ultimately need a list so converting everything to sets
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197136522 --- Diff: src/java/org/apache/cassandra/service/StorageService.java --- @@ -4231,53 +4211,53 @@ private void calculateToFromStreams(Collection newTokens, List ke InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); -// clone to avoid concurrent modification in calculateNaturalEndpoints +// clone to avoid concurrent modification in calculateNaturalReplicas TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); for (String keyspace : keyspaceNames) { // replication strategy of the current keyspace AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); -Multimap> endpointToRanges = strategy.getAddressRanges(); +ReplicaMultimap endpointToRanges = strategy.getAddressReplicas(); logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); for (Token newToken : newTokens) { // getting collection of the currently used ranges by this keyspace -Collection> currentRanges = endpointToRanges.get(localAddress); +ReplicaSet currentReplicas = endpointToRanges.get(localAddress); // collection of ranges which this node will serve after move to the new token -Collection> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); +ReplicaSet updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data -Multimap, InetAddressAndPort> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); +ReplicaMultimap, ReplicaSet> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); // calculated parts of the ranges to request/stream from/to nodes in the ring -Pair>, Set>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges); +Pair>, Set>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas); /** * In this loop we are going through all ranges "to fetch" and determining * nodes in the ring responsible for data we are interested in */ -Multimap, InetAddressAndPort> rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create(); +ReplicaMultimap, ReplicaList> rangesToFetchWithPreferredEndpoints = ReplicaMultimap.list(); for (Range toFetch : rangesPerKeyspace.right) { for (Range range : rangeAddresses.keySet()) { if (range.contains(toFetch)) { -List endpoints = null; +ReplicaList endpoints = null; if (useStrictConsistency) { -Set oldEndpoints = Sets.newHashSet(rangeAddresses.get(range)); -Set newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled)); +ReplicaSet oldEndpoints = new ReplicaSet(rangeAddresses.get(range)); --- End diff -- Might be better to filter on `oldEndpoints` rather than remove from them (esp. since we already copy) Additional benefit would be if we ever decide to switch to immutable interface, we'll have to do this anyways. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197128495 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java --- @@ -329,6 +335,10 @@ public static void validateReplicationStrategy(String keyspaceName, AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions); strategy.validateExpectedOptions(); strategy.validateOptions(); +if (strategy.getReplicationFactor().trans > 0 && !DatabaseDescriptor.isTransientReplicationEnabled()) --- End diff -- Check for version. Previously we had allNodesAtLeast22/30 for some internal handling, so we might have to implement similar mechanism for that. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197129440 --- Diff: src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java --- @@ -90,41 +98,53 @@ public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, /** Number of replicas left to fill from this DC. */ int rfLeft; int acceptableRackRepeats; +int transients; -DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set endpoints, Set> racks) +DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, ReplicaSet replicas, Set> racks) --- End diff -- Should we call it `DatacenterReplicas` now? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197154559 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java --- @@ -202,61 +204,65 @@ private Keyspace getKeyspace() * * @return the replication factor */ -public abstract int getReplicationFactor(); +public abstract ReplicationFactor getReplicationFactor(); /* * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below. * this is fine as long as we don't use this on any critical path. * (fixing this would probably require merging tokenmetadata into replicationstrategy, * so we could cache/invalidate cleanly.) */ -public Multimap> getAddressRanges(TokenMetadata metadata) +public ReplicaMultimap getAddressReplicas(TokenMetadata metadata) --- End diff -- Because of how it's used, we can add another, non-map variant, since in the majority of usages we're just calling `get` on just-materialised map. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197125767 --- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java --- @@ -190,50 +197,50 @@ public int countLocalEndpoints(Iterable liveEndpoints) * the blockFor first ones). */ if (isDCLocal) -liveEndpoints.sort(DatabaseDescriptor.getLocalComparator()); +liveReplicas.sort(DatabaseDescriptor.getLocalComparator()); -return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace))); +return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace))); } -private List filterForEachQuorum(Keyspace keyspace, List liveEndpoints) +private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList liveReplicas) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); -Map> dcsEndpoints = new HashMap<>(); +Map dcsReplicas = new HashMap<>(); for (String dc: strategy.getDatacenters()) -dcsEndpoints.put(dc, new ArrayList<>()); +dcsReplicas.put(dc, ReplicaList.withMaxSize(liveReplicas.size())); -for (InetAddressAndPort add : liveEndpoints) +for (Replica replica : liveReplicas) { -String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add); -dcsEndpoints.get(dc).add(add); +String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); +dcsReplicas.get(dc).add(replica); } -List waitSet = new ArrayList<>(); -for (Map.Entry> dcEndpoints : dcsEndpoints.entrySet()) +ReplicaList waitSet = new ReplicaList(ReplicaList.withMaxSize(liveReplicas.size())); +for (Map.Entry dcEndpoints : dcsReplicas.entrySet()) { -List dcEndpoint = dcEndpoints.getValue(); +ReplicaList dcEndpoint = dcEndpoints.getValue(); waitSet.addAll(dcEndpoint.subList(0, Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size(; } return waitSet; } -public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable liveEndpoints) +public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable liveReplicas) --- End diff -- We can use `Replicas` here --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #224: 14405 replicas
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/224#discussion_r197130296 --- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class ReplicaList extends Replicas +{ +static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of()); + +private final List replicaList; + +public ReplicaList() +{ +this(new ArrayList<>()); +} + +public ReplicaList(int capacity) +{ +this(new ArrayList<>(capacity)); +} + +public ReplicaList(ReplicaList from) +{ +this(new ArrayList<>(from.replicaList)); +} + +public ReplicaList(Replicas from) +{ +this(new ArrayList<>(from.size())); +addAll(from); +} + +public ReplicaList(Collection from) +{ +this(new ArrayList<>(from)); +} + +private ReplicaList(List replicaList) +{ +this.replicaList = replicaList; +} + +@Override +public String toString() +{ +return replicaList.toString(); +} + +@Override +public boolean add(Replica replica) +{ +Preconditions.checkNotNull(replica); +return replicaList.add(replica); +} + +@Override +public void addAll(Iterable replicas) +{ +Iterables.addAll(replicaList, replicas); +} + +@Override +public int size() +{ +return replicaList.size(); +} + +@Override +protected Collection getUnmodifiableCollection() +{ +return Collections.unmodifiableCollection(replicaList); +} + +@Override +public Iterator iterator() +{ +return replicaList.iterator(); +} + +public Replica get(int idx) +{ +return replicaList.get(idx); +} + +@Override +public void removeEndpoint(InetAddressAndPort endpoint) +{ +for (int i=replicaList.size()-1; i>=0; i--) +{ +if (replicaList.get(i).getEndpoint().equals(endpoint)) +{ +replicaList.remove(i); +} +} +} + +@Override +public void removeReplica(Replica replica) +{ +replicaList.remove(replica); +} + +@Override +public boolean containsEndpoint(InetAddressAndPort endpoint) +{ +for (int i=0; i predicate) +{ +ArrayList newReplicaList = size() < 10 ? new ArrayList<>(size()) : new ArrayList<>(); +for (int i=0; i comparator) +{ +replicaList.sort(comparator); +} + +public static ReplicaList intersectEndpoints(ReplicaList l1, ReplicaList l2) +{ +Replicas.checkFull(l1); +Replicas.checkFull(l2); +// Note: we don't use Guava Sets.intersection() for 3 reasons: +// 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and +// so will be very small (< RF). In that case, retainAll is in fact more efficient. +// 2) we do ultimately need a list so converting everything to sets
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/257 Allow transient range owner to serve as repair coordinator You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra tr-followup-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #257 commit aadc3c6dcd65ef915c1743cb133f1065ee2618c3 Author: Alex Petrov Date: 2018-09-04T17:38:27Z Imrpve DiskBoundaryManager, bring it into consistency with the rest of classes commit c1a73a5eb799ee4d12a580af848d72eda21fd7b2 Author: Alex Petrov Date: 2018-09-04T17:39:19Z Improve repair scheduling commit e47019f838f69694d943220e3fa2f42839dd8019 Author: Alex Petrov Date: 2018-09-04T17:40:08Z Avoid filtering already commit 1d19e26fbbb97d76590d8b164feb61d185d74ba2 Author: Alex Petrov Date: 2018-09-05T08:16:51Z enable dtests commit 3e61a44a6e4bc4ba0dd32b588905b01bb64aff7f Author: Alex Petrov Date: 2018-09-05T11:11:51Z Allow transient range owner to be repair coordinator commit c9d4e02aeb9f527e4584721881de2794085c594f Author: Alex Petrov Date: 2018-09-05T13:14:24Z Switch to branch that has TR tests commit 07acea8e98ca762d06cc61affad371049a49943b Author: Alex Petrov Date: 2018-09-05T15:14:01Z Use Sets#difference instead of per-element equality --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/257#discussion_r215442476 --- Diff: src/java/org/apache/cassandra/repair/RepairRunnable.java --- @@ -651,7 +651,7 @@ private static void addRangeToNeighbors(List neighborRangeList, Ran for (CommonRange cr : neighborRangeList) { // Use strict equality here, as worst thing that can happen is we generate one more stream -if (Iterables.elementsEqual(cr.endpoints, endpoints) && Iterables.elementsEqual(cr.transEndpoints, transEndpoints)) +if (Sets.difference(cr.endpoints, endpoints).isEmpty() && Sets.difference(cr.transEndpoints, transEndpoints).isEmpty()) --- End diff -- Right, `return s.size() == o.size() && s.containsAll(o);` but with quick hashcode path for non-equality. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #261: Tr followup 4
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/261 Tr followup 4 cc @belliottsmith You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra tr-followup-4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/261.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #261 commit 9e5f71b934b5e6ffb30361e4c375d2312cd097f4 Author: Alex Petrov Date: 2018-09-05T08:16:51Z enable dtests commit 362b6e70ca096525def59012c3a27fa9a00abf3e Author: Alex Petrov Date: 2018-09-07T09:02:55Z Validate transient status on query commit 04e3857811e49c39f1c41d0a74c9e7c6a0463f86 Author: Alex Petrov Date: 2018-09-07T11:00:26Z Comment to check tests --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #259: Cleanup repair path after Transient Replication
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/259 Cleanup repair path after Transient Replication You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra tr-followup-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #259 commit 3562933028ba33c2975f6c96a683d2d71eabbb51 Author: Alex Petrov Date: 2018-09-06T10:37:13Z Cleanup repair path after TR commit 8fef57a2c5931891bfe848e3788b3360a2bd4778 Author: Alex Petrov Date: 2018-09-05T08:16:51Z enable dtests --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/257#discussion_r215776606 --- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java --- @@ -39,43 +40,60 @@ import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; /** - * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica. + * LocalSyncTask performs streaming between local(coordinator) node and remote replica. */ -public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler +public class LocalSyncTask extends SymmetricSyncTask implements StreamEventHandler { private final TraceState state = Tracing.instance.get(); -private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class); +private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); -private final boolean remoteIsTransient; private final UUID pendingRepair; -private final boolean pullRepair; +private final boolean requestRanges; +private final boolean transferRanges; -public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) +public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { -super(desc, r1, r2, previewKind); -this.remoteIsTransient = remoteIsTransient; +this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), + pendingRepair, requestRanges, transferRanges, previewKind); +} + +public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, + List> diff, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) +{ +super(desc, local, remote, diff, previewKind); +Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job"); + Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort())); + this.pendingRepair = pendingRepair; -this.pullRepair = pullRepair; +this.requestRanges = requestRanges; +this.transferRanges = transferRanges; } @VisibleForTesting -StreamPlan createStreamPlan(InetAddressAndPort dst, List> differences) +StreamPlan createStreamPlan(InetAddressAndPort remote, List> differences) { StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node + .flushBeforeTransfer(pendingRepair == null); + +if (requestRanges) +{ +// see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here +plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); +} -if (!pullRepair && !remoteIsTransient) +if (transferRanges) { // send ranges to the remote node if we are not performing a pull repair // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here -plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); +plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); --- End diff -- I felt like this is a side-effect generating, imperative method, so ignoring its return is fine as it's done in order to facilitate chaining. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandr
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/257#discussion_r215776645 --- Diff: src/java/org/apache/cassandra/repair/RepairJob.java --- @@ -165,107 +166,116 @@ private boolean isTransient(InetAddressAndPort ep) return session.commonRange.transEndpoints.contains(ep); } -private AsyncFunction, List> standardSyncing() +private ListenableFuture> standardSyncing(List trees) { -return trees -> -{ -InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); +InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); -List syncTasks = new ArrayList<>(); -// We need to difference all trees one against another -for (int i = 0; i < trees.size() - 1; ++i) +List syncTasks = new ArrayList<>(); +// We need to difference all trees one against another +for (int i = 0; i < trees.size() - 1; ++i) +{ +TreeResponse r1 = trees.get(i); +for (int j = i + 1; j < trees.size(); ++j) { -TreeResponse r1 = trees.get(i); -for (int j = i + 1; j < trees.size(); ++j) +TreeResponse r2 = trees.get(j); + +// Avoid streming between two tansient replicas +if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) +continue; + +AbstractSyncTask task; +if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { -TreeResponse r2 = trees.get(j); +TreeResponse self = r1.endpoint.equals(local) ? r1 : r2; +TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2; -if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) +// pull only if local is full +boolean requestRanges = !isTransient(self.endpoint); +// push only if remote is full; additionally check for pull repair +boolean transfterRanges = !isTransient(remote.endpoint) && !session.pullRepair; --- End diff -- Thanks for noticing! --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/257#discussion_r215776814 --- Diff: src/java/org/apache/cassandra/service/ActiveRepairService.java --- @@ -20,10 +20,25 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; --- End diff -- Yes, I'll remove it on commit when it's squashed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #257: Allow transient range owner to serve as repair coordin...
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra/pull/257 @aweisberg fixed imports and spelling. Thank you for spotting those. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #262: Replicalayout followup
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/262 Replicalayout followup You can merge this pull request into a Git repository by running: $ git pull https://github.com/belliottsmith/cassandra replicalayout-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/262.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #262 commit 949ec9e9965c84c3c80daf677422acb23719212f Author: Benedict Elliott Smith Date: 2018-09-07T10:41:28Z [rR]eplicaLayout->[rR]eplicaPlan, selected() -> contact() commit 8190c79d6c8bc3e1cfcc41ea0ee222fb3fc21231 Author: Benedict Elliott Smith Date: 2018-09-07T10:45:28Z assureSufficientLiveNodes -> assureSufficientReplicas commit 70d615f8c1831831d97582b6b526621fc3d0bbf0 Author: Benedict Elliott Smith Date: 2018-09-07T11:28:01Z main refactor commit 45230008ce9f12639d28b304531bd315b5637fd0 Author: Benedict Elliott Smith Date: 2018-09-07T11:38:31Z remove get(Live)?(Sorted)?Natural(AndPending)?Replicas commit e384d189c98877bfbf02cd8f168694bb3c012403 Author: Benedict Elliott Smith Date: 2018-09-07T11:41:10Z fix speculation->rr_extra_read->rr_extra_write commit f4eb2d245cf52f86fd9ef71ca2de0df3037fb3bf Author: Benedict Elliott Smith Date: 2018-09-07T11:43:00Z cleanup BatchLog.sendSingleReplayMutation commit d3eb71cf3265055be739089bf34c0fed64dac24b Author: Benedict Elliott Smith Date: 2018-09-07T11:44:13Z fix: could speculate to nodes that would not increase consistencyLevel, resulting in timeout instead of overload commit f59dfa83ad6de829f79406c46acc82acb84fb645 Author: Benedict Elliott Smith Date: 2018-09-07T11:45:36Z initialise Mutable HashMap with capacity commit 14fc69fe50178fffe6d8c0d79e60cc671aa29a88 Author: Benedict Elliott Smith Date: 2018-09-07T11:47:55Z circleci commit d45a68a319e488ed7ee016a747df1dfd734753bd Author: Benedict Elliott Smith Date: 2018-09-07T13:00:18Z comment nit commit b49a088578292c807bc568e8a6481a7ac75fe489 Author: Benedict Elliott Smith Date: 2018-09-07T13:04:17Z remove unused ForRangeWrite --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220951620 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() --- End diff -- I know these iterators are quite simple to implement, but we can wrap iterator with `Iterator#filter` here, too. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220966312 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() +{ +final int end = begin + size; +int next = begin; +int count = 0; +{ updateNext(); } +void updateNext() +{ +if (count == limit) next = end; +
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220986153 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() --- End diff -- Sure, if this was a deliberate decision, I have nothing against it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra-dtest/pull/39 Add no-read assert to read-repair test You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra-dtest avoid-querying-self-through-ms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra-dtest/pull/39.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #39 commit ed1055ec550a2dcd92424980d5ab8b05d9f3dfb5 Author: Alex Petrov Date: 2018-10-05T14:49:51Z Add no-read assert to read-repair test --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #276: Repair job tests
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra/pull/276 I've fixed your suggestions @krummas, should I commit? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r223013279 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #281: Sep worker shutdown
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/281 Sep worker shutdown You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra sep-worker-shutdown Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #281 commit 4948c8116e0568eeae5d5c4197d4b12e2f6072e6 Author: Alex Petrov Date: 2018-10-11T16:23:20Z SEP Worker shutdown commit daaaeb40adf23647318f2253a26ea010de2c Author: Alex Petrov Date: 2018-10-11T16:27:34Z Get rid of useless latch, cleanup the test commit e2f9392823fe325f20a89c63e49fbeda617c0342 Author: Alex Petrov Date: 2018-10-11T16:48:14Z Get rid of exceptions commit 41f56a0df846cf22bce7291fd8a3258782a31b91 Author: Alex Petrov Date: 2018-10-11T16:50:32Z Cleanup imports, elaborate on âkillâ call commit 92d6a8f80c5133ff4709c961c2c7659c3de2f638 Author: Alex Petrov Date: 2018-10-11T17:26:53Z Cleanup --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #282: Implement in-jvm distributed tests prototype / ...
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/282 Implement in-jvm distributed tests prototype / MVP You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra in-jvm-distributed-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #282 commit d245a777e93a4ec76c88ac6caa6b3051d571474f Author: Alex Petrov Date: 2018-10-01T14:25:28Z Implement in-jvm distributed tests commit 1f5b93bdb9f32108b22c61d1319c5010bfd59b04 Author: Alex Petrov Date: 2018-10-12T14:14:23Z Fixes required to properly shutdown (most of the) threads commit 147520e9c9c0b8055bd165ab455907730c77f597 Author: Alex Petrov Date: 2018-10-12T14:27:40Z Changes required for no-op MBean registration commit 4133ece48aea4d9665bcfa1432cf4a941d68a9cc Author: Alex Petrov Date: 2018-10-12T16:40:01Z Separate loggers per instance --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #278: Avoid running query to self through messaging service
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra/pull/278 Thank you for the review! --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/278 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest issue #39: Add no-read assert to read-repair test
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra-dtest/pull/39 Thank you for the review, committed the latest version manually --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra-dtest/pull/39 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #282: Implement in-jvm distributed tests prototype / ...
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/282 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222952584 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222954234 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222953530 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222953487 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222952614 --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java --- @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ +private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + +static InetAddressAndPort addr1; +static InetAddressAndPort addr2; +static InetAddressAndPort addr3; +static InetAddressAndPort addr4; +static InetAddressAndPort addr5; + +static Range range1 = range(0, 1); +static Range range2 = range(2, 3); +static Range range3 = range(4, 5); +static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + +@AfterClass +public static void reset() +{ +FBUtilities.reset(); +} + +static +{ +try +{ +addr1 = InetAddressAndPort.getByName("127.0.0.1"); +addr2 = InetAddressAndPort.getByName("127.0.0.2"); +addr3 = InetAddressAndPort.getByName("127.0.0.3"); +addr4 = InetAddressAndPort.getByName("127.0.0.4"); +addr5 = InetAddressAndPort.getByName("127.0.0.5"); +DatabaseDescriptor.setBroadcastAddress(addr1.address); +} +catch (UnknownHostException e) +{ +e.printStackTrace(); +} +} + +@Test +public void testCreateStandardSyncTasks() +{ +testCreateStandardSyncTasks(false); +} + +@Test +public void testCreateStandardSyncTasksPullRepair() +{ +testCreateStandardSyncTasks(true); +} + +public static void testCreateStandardSyncTasks(boolean pullRepair) +{ +List treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + +Map tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, +
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/276#discussion_r222952423 --- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java --- @@ -52,15 +52,9 @@ private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); private final UUID pendingRepair; -private final boolean requestRanges; -private final boolean transferRanges; -public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, - boolean requestRanges, boolean transferRanges, PreviewKind previewKind) -{ -this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), - pendingRepair, requestRanges, transferRanges, previewKind); -} +protected final boolean requestRanges; --- End diff -- +1, made a change --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/39#discussion_r223734988 --- Diff: read_repair_test.py --- @@ -495,6 +495,9 @@ def test_normal_read_repair(self): assert storage_proxy.speculated_rr_read == 0 assert storage_proxy.speculated_rr_write == 0 +warn = node2.grep_log("Message-to-self TYPE:READ") --- End diff -- Ultimately - yes. But unfortunately there are still a couple of places we still do that. Maybe we should widen the scope of the ticket and check more places where this is happening. Or open a separate issue. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r221529863 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() +{ +final int end = begin + size; +int next = begin; +int count = 0; +{ updateNext(); } +void updateNext() +{ +if (count == limit) next = end; +
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r221529971 --- Diff: src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java --- @@ -58,45 +63,332 @@ }; } -protected final List list; -protected final boolean isSnapshot; -protected AbstractReplicaCollection(List list, boolean isSnapshot) +/** + * A simple list with no comodification checks and immutability by default (only append permitted, and only one initial copy) + * this permits us to reduce the amount of garbage generated, by not wrapping iterators or unnecessarily copying + * and reduces the amount of indirection necessary, as well as ensuring monomorphic callsites + */ +protected static class ReplicaList implements Iterable { -this.list = list; -this.isSnapshot = isSnapshot; +private static final Replica[] EMPTY = new Replica[0]; +Replica[] contents; +int begin, size; + +public ReplicaList() { this(0); } +public ReplicaList(int capacity) { contents = capacity == 0 ? EMPTY : new Replica[capacity]; } +public ReplicaList(Replica[] contents, int begin, int size) { this.contents = contents; this.begin = begin; this.size = size; } + +public boolean isSubList(ReplicaList subList) +{ +return subList.contents == contents; +} + +public Replica get(int index) +{ +if (index > size) +throw new IndexOutOfBoundsException(); +return contents[begin + index]; +} + +public void add(Replica replica) +{ +// can only add to full array - if we have sliced it, we must be a snapshot +assert begin == 0; +if (size == contents.length) +{ +int newSize; +if (size < 3) newSize = 3; +else if (size < 9) newSize = 9; +else newSize = size * 2; +contents = Arrays.copyOf(contents, newSize); +} +contents[size++] = replica; +} + +public int size() +{ +return size; +} + +public boolean isEmpty() +{ +return size == 0; +} + +public ReplicaList subList(int begin, int end) +{ +if (end > size || begin > end) throw new IndexOutOfBoundsException(); +return new ReplicaList(contents, this.begin + begin, end - begin); +} + +public ReplicaList sorted(Comparator comparator) +{ +Replica[] copy = Arrays.copyOfRange(contents, begin, begin + size); +Arrays.sort(copy, comparator); +return new ReplicaList(copy, 0, copy.length); +} + +public Stream stream() +{ +return Arrays.stream(contents, begin, begin + size); +} + +@Override +public Iterator iterator() +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public Replica next() +{ +return contents[i++]; +} +}; +} + +public Iterator transformIterator(Function function) +{ +return new Iterator() +{ +final int end = begin + size; +int i = begin; +@Override +public boolean hasNext() +{ +return i < end; +} + +@Override +public K next() +{ +return function.apply(contents[i++]); +} +}; +} + +private Iterator filterIterator(Predicate predicate, int limit) +{ +return new Iterator() +{ +final int end = begin + size; +int next = begin; +int count = 0; +{ updateNext(); } +void updateNext() +{ +if (count == limit) next = end; +
[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/278#discussion_r224060251 --- Diff: src/java/org/apache/cassandra/net/MessagingService.java --- @@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); if (to.equals(FBUtilities.getBroadcastAddressAndPort())) -logger.trace("Message-to-self {} going over MessagingService", message); +logger.debug("Message-to-self {} going over MessagingService", message); --- End diff -- To be honest, I think relying on the logging might have been a bad idea. I've left this statement with `trace` and have rewritten the tests instead. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/276 Repair job tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra repair-job-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #276 commit 31df761c82855522bf56f371da7b2f475fe413f2 Author: Alex Petrov Date: 2018-10-01T13:30:58Z Add tests for repair job commit 79c7574d68c22faec67673fa425b710c9cb594e9 Author: Alex Petrov Date: 2018-10-01T14:25:28Z Enable dtests commit 992baf72ad140aa10a9433e51457b7be5f383639 Author: Alex Petrov Date: 2018-10-01T14:31:45Z Remove debug statement commit dcbf317273b09cf8a3fc810351d046addb4379ff Author: Alex Petrov Date: 2018-10-01T15:22:44Z Fix tests --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/267 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/278#discussion_r223421365 --- Diff: src/java/org/apache/cassandra/net/MessagingService.java --- @@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); if (to.equals(FBUtilities.getBroadcastAddressAndPort())) -logger.trace("Message-to-self {} going over MessagingService", message); +logger.warn("Message-to-self {} going over MessagingService", message); --- End diff -- Right, we can do debug here. I wanted to first throw in this case, but then thought that it's more useful to find all the cases where we still do that and eliminate those, since failing in that case brings more or less nothing. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/278#discussion_r223868092 --- Diff: src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java --- @@ -102,12 +104,24 @@ void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) else type = to.isFull() ? "full" : "transient"; Tracing.trace("Enqueuing {} data read to {}", type, to); } -MessageOut message = command.createMessage(); -// if enabled, request additional info about repaired data from any full replicas -if (command.isTrackingRepairedStatus() && to.isFull()) -message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); -MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback); +if (to.isSelf()) +{ +try (ReadExecutionController executionController = command.executionController(); --- End diff -- Yes, you're absolutely right: it is wrong to block this thread for I/O. We in fact have a pattern in order to deal with these things on local execution path: https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java#L157 https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java#L186 Thanks for pointing that out! --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/276 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219410357 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -446,16 +447,14 @@ else if (useStrictConsistency) //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. //So it's an error if we don't find what we need. if (oldEndpoints.isEmpty() && toFetch.isTransient()) - { throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); - } - if (!any(oldEndpoints, isSufficient)) + if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient())) { // need an additional replica EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range)); // include all our filters, to ensure we include a matching node - Optional fullReplica = Iterables.tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil(); + Optional fullReplica = Iterables.tryFind(endpointsForRange, and(Replica::isFull, testSourceFilters)).toJavaUtil(); --- End diff -- Switched back to `isSufficient` here --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219488342 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -436,6 +436,7 @@ else if (useStrictConsistency) Set endpointsStillReplicated = newEndpoints.endpoints(); // Remove new endpoints from old endpoints based on address oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); + oldEndpoints.filter(testSourceFilters); --- End diff -- Fixed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219488550 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -446,16 +447,14 @@ else if (useStrictConsistency) //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. //So it's an error if we don't find what we need. if (oldEndpoints.isEmpty() && toFetch.isTransient()) - { throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); - } - if (!any(oldEndpoints, isSufficient)) + if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient())) --- End diff -- Ok, reverted to the old one --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219488307 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -265,10 +318,11 @@ public void addRanges(String keyspaceName, ReplicaCollection replicas) workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName); } -toFetch.put(keyspaceName, workMap); -for (Map.Entry> entry : workMap.asMap().entrySet()) +assert toFetch.put(keyspaceName, workMap) == null : "Keyspace is already added to fetch map"; --- End diff -- Fixed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #266: Add a check for receiving digest response from ...
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/266 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219488654 --- Diff: src/java/org/apache/cassandra/service/RangeRelocator.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Multimap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.RangeStreamer; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +@VisibleForTesting +public class RangeRelocator +{ +private static final Logger logger = LoggerFactory.getLogger(StorageService.class); + +private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION); +private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +private final TokenMetadata tokenMetaCloneAllSettled; +// clone to avoid concurrent modification in calculateNaturalReplicas +private final TokenMetadata tokenMetaClone; +private final Collection tokens; +private final List keyspaceNames; + + +RangeRelocator(Collection tokens, List keyspaceNames, TokenMetadata tmd) +{ +this.tokens = tokens; +this.keyspaceNames = keyspaceNames; +this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled(); +// clone to avoid concurrent modification in calculateNaturalReplicas +this.tokenMetaClone = tmd.cloneOnlyTokenMap(); +} + +@VisibleForTesting +public RangeRelocator() +{ +this.tokens = null; +this.keyspaceNames = null; +this.tokenMetaCloneAllSettled = null; +this.tokenMetaClone = null; +} + +/** + * Wrapper that supplies accessors to the real implementations of the various dependencies for this method + */ +private static Multimap calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges, + AbstractReplicationStrategy strategy, + String keyspace, + TokenMetadata tmdBefore, + TokenMetadata tmdAfter) +{ +EndpointsByReplica preferredEndpoints = + RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch(
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219488682 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -338,165 +386,152 @@ else if (useStrictConsistency) boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, - Predicate isAlive, String keyspace, - Collection> sourceFilters) -{ -EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - -InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); -logger.debug ("Keyspace: {}", keyspace); -logger.debug("To fetch RN: {}", fetchRanges); -logger.debug("Fetch ranges: {}", rangeAddresses); - -Predicate testSourceFilters = and(sourceFilters); -Function sorted = -endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - -//This list of replicas is just candidates. With strict consistency it's going to be a narrow list. -EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); -for (Replica toFetch : fetchRanges) -{ -//Replica that is sufficient to provide the data we need -//With strict consistency and transient replication we may end up with multiple types -//so this isn't used with strict consistency -Predicate isSufficient = r -> (toFetch.isTransient() || r.isFull()); -Predicate accept = r -> - isSufficient.test(r) // is sufficient -&& !r.endpoint().equals(localAddress) // is not self -&& isAlive.test(r); // is alive - -logger.debug("To fetch {}", toFetch); -for (Range range : rangeAddresses.keySet()) -{ -if (range.contains(toFetch.range())) -{ -EndpointsForRange oldEndpoints = rangeAddresses.get(range); - -//Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch -//It could be multiple endpoints and we must fetch from all of them if they are there -//With transient replication and strict consistency this is to get the full data from a full replica and -//transient data from the transient replica losing data -EndpointsForRange sources; -if (useStrictConsistency) -{ -//Start with two sets of who replicates the range before and who replicates it after -EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); -logger.debug("Old endpoints {}", oldEndpoints); -logger.debug("New endpoints {}", newEndpoints); - -//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. -//So we need to be careful to only be strict when endpoints == RF -if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) -{ -Set endpointsStillReplicated = newEndpoints.endpoints(); -// Remove new endpoints from old endpoints based on address -oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - -if (!all(oldEndpoints, isAlive)) -throw new IllegalStateException("A node required to move the data consistently is down: " -+ oldEndpoints.filter(not(isAlive))); - -if (oldEndpoints.size() > 1) -throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); - -//If we are transitioning from transient to full and and the set of replicas for the range is not changing -//we might end up with no endpoints to fetch from by address. In that
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219508033 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException --- End diff -- > Either way, while we're here we should document in the caller the fact that the provided consistencyLevel is not used, and that the batchlogConsistencyLevel is used to clear the entries from the remote batch logs once the real write has reached that consistency level. not sure I understand what you mean, as we're using it both for cleanup and for writing to batchlog. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219508092 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -780,8 +780,9 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally -final Collection batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); -BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); +ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); +BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), --- End diff -- Fixed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219518994 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); --- End diff -- Moved it --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219518928 --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java --- @@ -1021,18 +1021,18 @@ private static void syncWriteToBatchlog(Collection mutations, Collecti handler.get(); } -private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) +private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); -for (InetAddressAndPort target : endpoints) +for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); -if (target.equals(FBUtilities.getBroadcastAddressAndPort())) -performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); +if (target.isLocal()) --- End diff -- Renamed --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #267: Consolidate batch write code
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra/pull/267 @belliottsmith I've addressed your comments, rebased and pushed for one more round of CI. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/269 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #275: 14727
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/275 14727 You can merge this pull request into a Git repository by running: $ git pull https://github.com/belliottsmith/cassandra 14727 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #275 commit 150f4236f43593ed6a832ada5490c5b6ea64b88c Author: Benedict Elliott Smith Date: 2018-09-19T11:52:27Z Transient Replication support for EACH_QUORUM, and correction of behaviour for LOCAL_QUORUM commit 3bd3f7b8ede99c69a810e75eff762deffdc8cf72 Author: Benedict Elliott Smith Date: 2018-09-26T10:08:37Z circleci --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r220455656 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -424,62 +440,58 @@ else if (useStrictConsistency) EndpointsForRange sources; if (useStrictConsistency) { - //Start with two sets of who replicates the range before and who replicates it after - EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); - logger.debug("Old endpoints {}", oldEndpoints); - logger.debug("New endpoints {}", newEndpoints); - + EndpointsForRange strictEndpoints; //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. //So we need to be careful to only be strict when endpoints == RF if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) { - Set endpointsStillReplicated = newEndpoints.endpoints(); + //Start with two sets of who replicates the range before and who replicates it after + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); + logger.debug("Old endpoints {}", oldEndpoints); + logger.debug("New endpoints {}", newEndpoints); + // Remove new endpoints from old endpoints based on address - oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - oldEndpoints.filter(testSourceFilters); + strictEndpoints = oldEndpoints.without(newEndpoints.endpoints()); - if (oldEndpoints.size() > 1) - throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); + //We have to check the source filters here to see if they will remove any replicas + //required for strict consistency + if (!all(strictEndpoints, testSourceFilters)) + throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints)); + + if (strictEndpoints.size() > 1) --- End diff -- We're not really filtering anymore at all. We can get here only in case test source filters didn't filter any nodes (otherwise we'd fail with the same assertion later). We've asserted that the resulting endpoint is not going to be yanked from under us. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219137918 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); --- End diff -- We don't strictly need it: included it only to short-circuit. We can skip it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #267: Consolidate batch write code
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/267#discussion_r219140446 --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java --- @@ -61,26 +73,62 @@ return forSingleReplicaWrite(keyspace, token, replica); } +public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() +{ +Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); +Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + +ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( +EndpointsForToken.of(token, localSystemReplica), +EndpointsForToken.empty(token) +); + +return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); +} + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ -public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException +public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); +TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); +Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); +String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + +// Replicas are picked manually: +// - replicas should be alive according to the failure detector +// - replicas should be in the local datacenter +// - choose min(2, number of qualifying candiates above) +// - allow the local node to be the only replica only if it's a single-node DC +Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + +if (chosenEndpoints.isEmpty()) +{ +if (consistencyLevel == ConsistencyLevel.ANY) +chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); +else +throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); +} + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); -ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + +// Batchlog is hosted by either one node or two nodes from different racks. +consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; --- End diff -- Right, I had similar thoughts when writing that. I've also been thinking if it's enough to only have two nodes in local DC blocking for batchlog writes. I wanted to make a change to original CL, but that'd might reduce availability (for instance when it's a quorum). But from looking at batch replay, we should be able to do it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220869142 --- Diff: src/java/org/apache/cassandra/locator/EndpointsForRange.java --- @@ -40,13 +37,13 @@ public class EndpointsForRange extends Endpoints { private final Range range; -private EndpointsForRange(Range range, List list, boolean isSnapshot) +private EndpointsForRange(Range range, ReplicaList list) --- End diff -- Looks like the intention was to make `byEndpoint` calculated lazily --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220874274 --- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java --- @@ -141,28 +147,37 @@ public void testOrderOfIteration() { Assert.assertEquals(canonicalList, ImmutableList.copyOf(test)); Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList())); -Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints()); +Assert.assertTrue(Iterables.elementsEqual(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints())); } private void assertSubList(C subCollection, int from, int to) { -Assert.assertTrue(subCollection.isSnapshot); if (from == to) { Assert.assertTrue(subCollection.isEmpty()); } else { -List subList = this.test.list.subList(from, to); -if (test.isSnapshot) -Assert.assertSame(subList.getClass(), subCollection.list.getClass()); +AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); +if (!(test instanceof ReplicaCollection.Builder)) +Assert.assertSame(subList.contents, subCollection.list.contents); Assert.assertEquals(subList, subCollection.list); } } +private void assertSubSequence(Iterable subSequence, int from, int to) +{ +AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); +if (!elementsEqual(subList, subSequence)) +{ +elementsEqual(subList, subSequence); +} +Assert.assertTrue(elementsEqual(subList, subSequence)); +} + void testSubList(int subListDepth, int filterDepth, int sortDepth) { -if (test.isSnapshot) +if (!(test instanceof ReplicaCollection.Builder)) --- End diff -- should we make it `isBuilder`?.. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220871228 --- Diff: src/java/org/apache/cassandra/locator/Endpoints.java --- @@ -36,13 +33,16 @@ */ public abstract class Endpoints> extends AbstractReplicaCollection { -static final Map EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>()); +static final ReplicaMap endpointMap(ReplicaList list) { return new ReplicaMap<>(list, Replica::endpoint); } --- End diff -- This is a method, we can skip `final` here --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #271: 14726
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/271#discussion_r220871785 --- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java --- @@ -141,28 +147,37 @@ public void testOrderOfIteration() { Assert.assertEquals(canonicalList, ImmutableList.copyOf(test)); Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList())); -Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints()); +Assert.assertTrue(Iterables.elementsEqual(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints())); } private void assertSubList(C subCollection, int from, int to) { -Assert.assertTrue(subCollection.isSnapshot); if (from == to) { Assert.assertTrue(subCollection.isEmpty()); } else { -List subList = this.test.list.subList(from, to); -if (test.isSnapshot) -Assert.assertSame(subList.getClass(), subCollection.list.getClass()); +AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); +if (!(test instanceof ReplicaCollection.Builder)) +Assert.assertSame(subList.contents, subCollection.list.contents); Assert.assertEquals(subList, subCollection.list); } } +private void assertSubSequence(Iterable subSequence, int from, int to) +{ +AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); +if (!elementsEqual(subList, subSequence)) +{ +elementsEqual(subList, subSequence); --- End diff -- Did you mean something like `Assert.assertSame(subList.contents, subSequence);` ? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #281: Sep worker shutdown
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/281 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #259: Cleanup repair path after Transient Replication
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/259 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #261: Tr followup 4
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/261 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/257 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #262: Replicalayout followup
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/262 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/269 Review tr range movements CASSANDRA-14756 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra review-tr-range-movements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #269 commit e51899d09a254e4a4d86c4fea26b6ada7eaebbcd Author: Alex Petrov Date: 2018-09-17T09:51:56Z Enable dtests commit 60927ae3a8c5c54b273a519110cc4c0540826d48 Author: Alex Petrov Date: 2018-09-13T08:39:01Z Simplify iteration in calculateRangesToFetchWithPreferredEndpoints Extract RangeRelocator Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve readability: * short-circuit range.contains(), remove one level of nesting * remove and use isSufficient instead since it duplicates the source filters that are already applied (see usages of RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter()) * use explicit fullness checks instead of âanyâ check Simplify RangeRelocator code Further simplify calculateRangesToStreamWithEndpoints after Benedictâs comment Fix range relocation Simplify calculateStreamAndFetchRanges Unify request/transfer ranges interface (Added benefit of this change is that we have a check for non-intersecting ranges) Simplify iteration in calculateRangesToFetchWithPreferredEndpoints Extract RangeRelocator Simplify RangeRelocator code Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve readability: * short-circuit range.contains(), remove one level of nesting * remove and use isSufficient instead since it duplicates the source filters that are already applied (see usages of RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter()) * use explicit fullness checks instead of âanyâ check Simplify calculateStreamAndFetchRanges Unify request/transfer ranges interface (Added benefit of this change is that we have a check for non-intersecting ranges) Improve error messages commit 2b13c4a34b19a4de80eb60c9cd059b674a4b19d3 Author: Alex Petrov Date: 2018-09-17T12:17:33Z Switch dtest branch --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #38: Transient Replication and Cheap Quorums te...
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra-dtest/pull/38 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest issue #38: Transient Replication and Cheap Quorums tests
Github user ifesdjeen commented on the issue: https://github.com/apache/cassandra-dtest/pull/38 Merged, thank you! --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #266: Add a check for receiving digest response from ...
GitHub user ifesdjeen opened a pull request: https://github.com/apache/cassandra/pull/266 Add a check for receiving digest response from transient node for CASSANDRA-14750 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifesdjeen/cassandra CASSANDRA-14750 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #266 commit 735b8da5379c7e3556b05dc450741bf6cae09f28 Author: Alex Petrov Date: 2018-09-12T21:20:34Z Add a check for receiving digest response from transient node Patch by Alex Petrov; reviewed by Benedict Elliot Smith for CASSANDRA-14750 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #269: Review tr range movements
Github user ifesdjeen commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r218432647 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -87,8 +85,8 @@ private final InetAddressAndPort address; /* streaming description */ private final String description; -private final Multimap> toFetch = HashMultimap.create(); -private final Set> sourceFilters = new HashSet<>(); +private final Map> toFetch = new HashMap<>(); +private final Set sourceFilters = new HashSet<>(); --- End diff -- You're right it doesn't. Filtering is idempotent (e.g. first op will filter the element out and subsequent ones will just be no-op) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org