[GitHub] cassandra-dtest pull request #9: 10857 trunk

2017-11-07 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/9#discussion_r149322322
  
--- Diff: upgrade_tests/upgrade_compact_storage.py ---
@@ -0,0 +1,177 @@
+# coding: utf-8
+
+import time
+
+from cassandra.query import dict_factory
+from nose.tools import assert_equal, assert_true
+from ccmlib.node import NodeError
+
+from dtest import Tester, debug
+from cassandra.protocol import ConfigurationException
+from tools.decorators import since
+
+VERSION_311 = 'git:cassandra-3.11'
+VERSION_TRUNK = 'git:trunk'
+
+
+@since('4.0')
+class UpgradeSuperColumnsThrough(Tester):
+def upgrade_to_version(self, tag, start_rpc=True, wait=True, 
nodes=None):
+debug('Upgrading to ' + tag)
+if nodes is None:
+nodes = self.cluster.nodelist()
+
+for node in nodes:
+debug('Shutting down node: ' + node.name)
+node.drain()
+node.watch_log_for("DRAINED")
+node.stop(wait_other_notice=False)
+
+# Update Cassandra Directory
+for node in nodes:
+node.set_install_dir(version=tag)
+debug("Set new cassandra dir for %s: %s" % (node.name, 
node.get_install_dir()))
+self.cluster.set_install_dir(version=tag)
+
+# Restart nodes on new version
+for node in nodes:
+debug('Starting %s on new version (%s)' % (node.name, tag))
+node.start(wait_other_notice=wait, wait_for_binary_proto=wait)
+
+def prepare(self, num_nodes=1, 
cassandra_version="github:apache/cassandra-2.2"):
+cluster = self.cluster
+
+# Forcing cluster version on purpose
+cluster.set_install_dir(version=cassandra_version)
+
+cluster.populate(num_nodes)
+
+cluster.start()
+return cluster
+
+def upgrade_compact_storage_test(self):
+cluster = 
self.prepare(cassandra_version='github:apache/cassandra-3.0')
+node = self.cluster.nodelist()[0]
+cursor = self.patient_cql_connection(node, 
row_factory=dict_factory)
+
+cursor.execute("CREATE KEYSPACE ks WITH replication = {'class': 
'SimpleStrategy','replication_factor': '1' };")
+cursor.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, 
col1 int, col2 int) WITH COMPACT STORAGE")
+
+for i in xrange(1, 5):
+cursor.execute("INSERT INTO ks.compact_table (pk, col1, col2) 
VALUES ({i}, {i}, {i})".format(i=i))
+
+self.upgrade_to_version(VERSION_TRUNK, wait=False)
+self.allow_log_errors = True
+
+time.sleep(5)
+# After restart, it won't start
+errors = len(node.grep_log("Compact Tables are not allowed in 
Cassandra starting with 4.0 version"))
+assert_true(errors > 0)
+
+def mixed_cluster_test(self):
+cluster = self.prepare(num_nodes=2, cassandra_version=VERSION_311)
+node1 = self.cluster.nodelist()[0]
+node2 = self.cluster.nodelist()[1]
+
+node1.drain()
+node1.watch_log_for("DRAINED")
+node1.stop(wait_other_notice=False)
+node1.set_install_dir(version=VERSION_TRUNK)
+node1.start(wait_other_notice=True, wait_for_binary_proto=True)
+
+cursor = self.patient_cql_connection(node2, 
row_factory=dict_factory)
--- End diff --

Right, was a copy-paste from `upgrade_tests/thrift_upgrade_test.py`, you 
might want to change it there, too.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #9: 10857 trunk

2017-11-07 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/9#discussion_r149320134
  
--- Diff: requirements.txt ---
@@ -3,7 +3,7 @@
 # 
http://datastax.github.io/python-driver/installation.html#cython-based-extensions
 futures
 six
--e 
git+https://github.com/datastax/python-driver.git@cassandra-test#egg=cassandra-driver
+-e 
git+https://github.com/datastax/python-driver.git@master#egg=cassandra-driver
--- End diff --

By now it's already merged, i needed this one to run tests.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #9: 10857 trunk

2017-11-06 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra-dtest/pull/9

10857 trunk



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra-dtest 10857-trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra-dtest/pull/9.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9


commit 1a03ee1ff8f3a24aec2a6faba0b54611549bbb2d
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-24T11:41:35Z

Fix 4.0 tests

commit be4da63d8cba2c970acbbfcb7322ea5abc22aed5
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-24T15:28:47Z

Add upgrade compact storage tests

commit ef5de97e40f2d1cf5c9bb0ff33f9544169ae2555
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-25T11:25:42Z

Fix paths

commit 556255f15cb12bae17756ed20b00c92b80029f31
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-30T21:38:21Z

more tests

commit 2a686cab61786e34c763bd03d18b26ef556562b5
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-31T10:40:16Z

Looks like dtest branch is not up to date

commit 5b9daaee894640c298d0246ee3eca3b059103c98
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-11-06T08:23:24Z

Fix tests

commit 502fa6d0f33863f45af2d0ba4a2ab98c7eddd285
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-10-23T11:29:41Z

Add dtest for compact storage flag drop

commit 3b9ea6448048de076306ba6c38d4a5a498a9ecfd
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Date:   2017-11-06T15:46:18Z

Fix tests




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197126256
  
--- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java ---
@@ -242,11 +249,11 @@ public boolean isSufficientLiveNodes(Keyspace 
keyspace, Iterable= blockFor(keyspace);
+return Iterables.size(liveReplicas) >= blockFor(keyspace);
 }
 }
 
-public void assureSufficientLiveNodes(Keyspace keyspace, 
Iterable liveEndpoints) throws UnavailableException
+public void assureSufficientLiveNodes(Keyspace keyspace, 
Iterable liveReplicas) throws UnavailableException
--- End diff --

We can use `Replicas` here.

Another question: should there be a distinction between sufficient live 
nodes for read and write path? Do we want to make sure there's a sufficient 
amount of live full nodes here or later?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197156136
  
--- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
@@ -4231,53 +4211,53 @@ private void 
calculateToFromStreams(Collection newTokens, List ke
 InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
 IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
 TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
-// clone to avoid concurrent modification in 
calculateNaturalEndpoints
+// clone to avoid concurrent modification in 
calculateNaturalReplicas
 TokenMetadata tokenMetaClone = 
tokenMetadata.cloneOnlyTokenMap();
 
 for (String keyspace : keyspaceNames)
 {
 // replication strategy of the current keyspace
 AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
-Multimap> 
endpointToRanges = strategy.getAddressRanges();
+ReplicaMultimap 
endpointToRanges = strategy.getAddressReplicas();
 
 logger.debug("Calculating ranges to stream and request for 
keyspace {}", keyspace);
 for (Token newToken : newTokens)
 {
 // getting collection of the currently used ranges by 
this keyspace
-Collection> currentRanges = 
endpointToRanges.get(localAddress);
+ReplicaSet currentReplicas = 
endpointToRanges.get(localAddress);
--- End diff --

`currentReplicas` is used inside the loop only. 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197156892
  
--- Diff: src/java/org/apache/cassandra/locator/TokenMetadata.java ---
@@ -733,19 +733,19 @@ public InetAddressAndPort getEndpoint(Token token)
 return sortedTokens;
 }
 
-public Multimap, InetAddressAndPort> 
getPendingRangesMM(String keyspaceName)
+public ReplicaMultimap, ReplicaSet> 
getPendingRangesMM(String keyspaceName)
 {
-Multimap, InetAddressAndPort> map = 
HashMultimap.create();
+ReplicaMultimap, ReplicaSet> map = 
ReplicaMultimap.set();
 PendingRangeMaps pendingRangeMaps = 
this.pendingRanges.get(keyspaceName);
 
 if (pendingRangeMaps != null)
 {
-for (Map.Entry, List> entry : 
pendingRangeMaps)
+for (Map.Entry, ReplicaList> entry : 
pendingRangeMaps)
 {
 Range range = entry.getKey();
-for (InetAddressAndPort address : entry.getValue())
+for (Replica replica : entry.getValue())
 {
-map.put(range, address);
+map.put(range, replica);
--- End diff --

Since we're putting by `range` and iterating over `entry.getValue` here, it 
seems like we can spare this iteration.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197131214
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicationFactor.java ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class ReplicationFactor
+{
+public static final ReplicationFactor ZERO = new ReplicationFactor(0);
+
+public final int trans;
+public final int replicas;
+public transient final int full;
+
+private ReplicationFactor(int replicas, int trans)
+{
+validate(replicas, trans);
+this.replicas = replicas;
+this.trans = trans;
+this.full = replicas - trans;
+}
+
+private ReplicationFactor(int replicas)
+{
+this(replicas, 0);
+}
+
+static void validate(int replicas, int trans)
+{
+Preconditions.checkArgument(trans == 0 || 
DatabaseDescriptor.isTransientReplicationEnabled(),
+"Transient replication is not enabled 
on this node");
+Preconditions.checkArgument(replicas >= 0,
--- End diff --

To my understanding, replication factor has to be strictly positive (e.g. 
min of 1).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197130790
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java ---
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class ReplicaList extends Replicas
+{
+static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of());
+
+private final List replicaList;
+
+public ReplicaList()
+{
+this(new ArrayList<>());
+}
+
+public ReplicaList(int capacity)
+{
+this(new ArrayList<>(capacity));
+}
+
+public ReplicaList(ReplicaList from)
+{
+this(new ArrayList<>(from.replicaList));
+}
+
+public ReplicaList(Replicas from)
+{
+this(new ArrayList<>(from.size()));
+addAll(from);
+}
+
+public ReplicaList(Collection from)
+{
+this(new ArrayList<>(from));
+}
+
+private ReplicaList(List replicaList)
+{
+this.replicaList = replicaList;
+}
+
+@Override
+public String toString()
+{
+return replicaList.toString();
+}
+
+@Override
+public boolean add(Replica replica)
+{
+Preconditions.checkNotNull(replica);
+return replicaList.add(replica);
+}
+
+@Override
+public void addAll(Iterable replicas)
+{
+Iterables.addAll(replicaList, replicas);
+}
+
+@Override
+public int size()
+{
+return replicaList.size();
+}
+
+@Override
+protected Collection getUnmodifiableCollection()
+{
+return Collections.unmodifiableCollection(replicaList);
+}
+
+@Override
+public Iterator iterator()
+{
+return replicaList.iterator();
+}
+
+public Replica get(int idx)
+{
+return replicaList.get(idx);
+}
+
+@Override
+public void removeEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=replicaList.size()-1; i>=0; i--)
+{
+if (replicaList.get(i).getEndpoint().equals(endpoint))
+{
+replicaList.remove(i);
+}
+}
+}
+
+@Override
+public void removeReplica(Replica replica)
+{
+replicaList.remove(replica);
+}
+
+@Override
+public boolean containsEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=0; i predicate)
+{
+ArrayList newReplicaList = size() < 10 ? new 
ArrayList<>(size()) : new ArrayList<>();
+for (int i=0; i comparator)
+{
+replicaList.sort(comparator);
+}
+
+public static ReplicaList intersectEndpoints(ReplicaList l1, 
ReplicaList l2)
+{
+Replicas.checkFull(l1);
+Replicas.checkFull(l2);
+// Note: we don't use Guava Sets.intersection() for 3 reasons:
+//   1) retainAll would be inefficient if l1 and l2 are large but 
in practice both are the replicas for a range and
+//   so will be very small (< RF). In that case, retainAll is in 
fact more efficient.
+//   2) we do ultimately need a list so converting everything to 
sets 

[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197133241
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -344,47 +343,43 @@ private static void recordCasContention(int 
contentions)
 casWriteMetrics.contention.update(contentions);
 }
 
-private static Predicate sameDCPredicateFor(final 
String dc)
+private static Predicate sameDCPredicateFor(final String dc)
 {
 final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
-return new Predicate()
-{
-public boolean apply(InetAddressAndPort host)
-{
-return dc.equals(snitch.getDatacenter(host));
-}
-};
+return replica -> dc.equals(snitch.getDatacenter(replica));
 }
 
 private static PaxosParticipants getPaxosParticipants(TableMetadata 
metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
 {
 Token tk = key.getToken();
-List naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
-Collection pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
+ReplicaList naturalReplicas = 
StorageService.instance.getNaturalReplicas(metadata.keyspace, tk);
+ReplicaList pendingReplicas = new 
ReplicaList(StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace));
 if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
 {
-// Restrict naturalEndpoints and pendingEndpoints to node in 
the local DC only
+// Restrict naturalReplicas and pendingReplicas to node in the 
local DC only
 String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-Predicate isLocalDc = 
sameDCPredicateFor(localDc);
-naturalEndpoints = 
ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
-pendingEndpoints = 
ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
+Predicate isLocalDc = sameDCPredicateFor(localDc);
+naturalReplicas = 
ReplicaList.immutableCopyOf(naturalReplicas.filter(isLocalDc));
+pendingReplicas = 
ReplicaList.immutableCopyOf(pendingReplicas.filter(isLocalDc));
 }
-int participants = pendingEndpoints.size() + 
naturalEndpoints.size();
+int participants = pendingReplicas.size() + naturalReplicas.size();
 int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
-List liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
-if (liveEndpoints.size() < requiredParticipants)
-throw new UnavailableException(consistencyForPaxos, 
requiredParticipants, liveEndpoints.size());
+
+Replicas concatenated = 
Replicas.concatNaturalAndPending(naturalReplicas, pendingReplicas);
+ReplicaList liveReplicas = 
ReplicaList.immutableCopyOf(Replicas.filter(concatenated, 
IAsyncCallback.isReplicaAlive));
--- End diff --

Same here (with Immutable).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197149200
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1275,36 +1272,38 @@ private static WriteResponseHandlerWrapper 
wrapViewBatchResponseHandler(Mutation
  *
  * @throws OverloadedException if the hints cannot be written/enqueued
  */
-public static void sendToHintedEndpoints(final Mutation mutation,
- Iterable 
targets,
- 
AbstractWriteResponseHandler responseHandler,
- String localDataCenter,
- Stage stage)
+public static void sendToHintedReplicas(final Mutation mutation,
+Iterable targets,
+
AbstractWriteResponseHandler responseHandler,
+String localDataCenter,
+Stage stage)
 throws OverloadedException
 {
 int targetsSize = Iterables.size(targets);
 
 // this dc replicas:
-Collection localDc = null;
+Collection localDc = null;
--- End diff --

We can use `Replicas` here 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197156422
  
--- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
@@ -4231,53 +4211,53 @@ private void 
calculateToFromStreams(Collection newTokens, List ke
 InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
 IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
 TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
-// clone to avoid concurrent modification in 
calculateNaturalEndpoints
+// clone to avoid concurrent modification in 
calculateNaturalReplicas
 TokenMetadata tokenMetaClone = 
tokenMetadata.cloneOnlyTokenMap();
 
 for (String keyspace : keyspaceNames)
 {
 // replication strategy of the current keyspace
 AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
-Multimap> 
endpointToRanges = strategy.getAddressRanges();
+ReplicaMultimap 
endpointToRanges = strategy.getAddressReplicas();
--- End diff --

add `getAddressReplicas` by `InetAddressAndPort` to avoid materialising map.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197124802
  
--- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java ---
@@ -148,40 +150,45 @@ public boolean isLocal(InetAddressAndPort endpoint)
 return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
 }
 
-public int countLocalEndpoints(Iterable 
liveEndpoints)
+public boolean isLocal(Replica replica)
+{
+return isLocal(replica.getEndpoint());
+}
+
+public int countLocalEndpoints(Iterable liveReplicas)
--- End diff --

This can be just `Replicas`


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197136141
  
--- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
@@ -4231,53 +4211,53 @@ private void 
calculateToFromStreams(Collection newTokens, List ke
 InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
 IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
 TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
-// clone to avoid concurrent modification in 
calculateNaturalEndpoints
+// clone to avoid concurrent modification in 
calculateNaturalReplicas
 TokenMetadata tokenMetaClone = 
tokenMetadata.cloneOnlyTokenMap();
 
 for (String keyspace : keyspaceNames)
 {
 // replication strategy of the current keyspace
 AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
-Multimap> 
endpointToRanges = strategy.getAddressRanges();
+ReplicaMultimap 
endpointToRanges = strategy.getAddressReplicas();
 
 logger.debug("Calculating ranges to stream and request for 
keyspace {}", keyspace);
 for (Token newToken : newTokens)
 {
 // getting collection of the currently used ranges by 
this keyspace
-Collection> currentRanges = 
endpointToRanges.get(localAddress);
+ReplicaSet currentReplicas = 
endpointToRanges.get(localAddress);
 // collection of ranges which this node will serve 
after move to the new token
-Collection> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+ReplicaSet updatedReplicas = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
 
 // ring ranges and endpoints associated with them
 // this used to determine what nodes should we ping 
about range data
-Multimap, InetAddressAndPort> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
+ReplicaMultimap, ReplicaSet> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
 
 // calculated parts of the ranges to request/stream 
from/to nodes in the ring
-Pair>, Set>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+Pair>, Set>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentReplicas, 
updatedReplicas);
 
 /**
  * In this loop we are going through all ranges "to 
fetch" and determining
  * nodes in the ring responsible for data we are 
interested in
  */
-Multimap, InetAddressAndPort> 
rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create();
+ReplicaMultimap, ReplicaList> 
rangesToFetchWithPreferredEndpoints = ReplicaMultimap.list();
--- End diff --

We can iterate over `entrySet` rather than call `get` below.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197131513
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicationFactor.java ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class ReplicationFactor
+{
+public static final ReplicationFactor ZERO = new ReplicationFactor(0);
+
+public final int trans;
+public final int replicas;
+public transient final int full;
+
+private ReplicationFactor(int replicas, int trans)
+{
+validate(replicas, trans);
+this.replicas = replicas;
+this.trans = trans;
+this.full = replicas - trans;
+}
+
+private ReplicationFactor(int replicas)
+{
+this(replicas, 0);
+}
+
+static void validate(int replicas, int trans)
+{
+Preconditions.checkArgument(trans == 0 || 
DatabaseDescriptor.isTransientReplicationEnabled(),
+"Transient replication is not enabled 
on this node");
+Preconditions.checkArgument(replicas >= 0,
+"Replication factor must be 
non-negative, found %s", replicas);
+Preconditions.checkArgument(trans == 0 || trans < replicas,
--- End diff --

We should also check for nonnegative here, since `full` is calculated by 
`replicas - trans` 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197129915
  
--- Diff: 
src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---
@@ -90,41 +98,53 @@ public NetworkTopologyStrategy(String keyspaceName, 
TokenMetadata tokenMetadata,
 /** Number of replicas left to fill from this DC. */
 int rfLeft;
 int acceptableRackRepeats;
+int transients;
 
-DatacenterEndpoints(int rf, int rackCount, int nodeCount, 
Set endpoints, Set> racks)
+DatacenterEndpoints(ReplicationFactor rf, int rackCount, int 
nodeCount, ReplicaSet replicas, Set> racks)
 {
-this.endpoints = endpoints;
+this.replicas = replicas;
 this.racks = racks;
 // If there aren't enough nodes in this DC to fill the RF, the 
number of nodes is the effective RF.
-this.rfLeft = Math.min(rf, nodeCount);
+this.rfLeft = Math.min(rf.replicas, nodeCount);
 // If there aren't enough racks in this DC to fill the RF, 
we'll still use at least one node from each rack,
 // and the difference is to be filled by the first encountered 
nodes.
-acceptableRackRepeats = rf - rackCount;
+acceptableRackRepeats = rf.replicas - rackCount;
+
+// if we have fewer replicas than rf calls for, reduce 
transients accordingly
+int reduceTransients = rf.replicas - this.rfLeft;
+transients = Math.max(rf.trans - reduceTransients, 0);
+ReplicationFactor.validate(rfLeft, transients);
 }
 
 /**
- * Attempts to add an endpoint to the replicas for this 
datacenter, adding to the endpoints set if successful.
+ * Attempts to add an endpoint to the replicas for this 
datacenter, adding to the replicas set if successful.
  * Returns true if the endpoint was added, and this datacenter 
does not require further replicas.
  */
-boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, 
Pair location)
+boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, 
Pair location, Range replicatedRange)
 {
 if (done())
 return false;
 
+if (replicas.containsEndpoint(ep))
--- End diff --

Previously this has been handled with `endpoints.add` which was using Set 
as well, and it was throwing with `assert` rather than skipping. 

Did we change some subtle semantics here?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197126709
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -259,36 +266,36 @@ private boolean useStrictSourcesForRanges(String 
keyspaceName)
  *
  * @throws java.lang.IllegalStateException when there is no source to 
get data streamed, or more than 1 source found.
  */
-private Multimap, InetAddressAndPort> 
getAllRangesWithStrictSourcesFor(String keyspace, Collection> 
desiredRanges)
+private ReplicaMultimap, ReplicaList> 
getAllRangesWithStrictSourcesFor(String keyspace, Iterable> 
desiredRanges)
 {
 assert tokens != null;
 AbstractReplicationStrategy strat = 
Keyspace.open(keyspace).getReplicationStrategy();
 
 // Active ranges
 TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
-Multimap, InetAddressAndPort> addressRanges = 
strat.getRangeAddresses(metadataClone);
+ReplicaMultimap, ReplicaSet> addressRanges = 
strat.getRangeAddresses(metadataClone);
 
 // Pending ranges
 metadataClone.updateNormalTokens(tokens, address);
-Multimap, InetAddressAndPort> pendingRangeAddresses = 
strat.getRangeAddresses(metadataClone);
+ReplicaMultimap, ReplicaSet> pendingRangeAddresses = 
strat.getRangeAddresses(metadataClone);
 
 // Collects the source that will have its range moved to the new 
node
-Multimap, InetAddressAndPort> rangeSources = 
ArrayListMultimap.create();
+ReplicaMultimap, ReplicaList> rangeSources = 
ReplicaMultimap.list();
 
 for (Range desiredRange : desiredRanges)
 {
-for (Map.Entry, Collection> 
preEntry : addressRanges.asMap().entrySet())
+for (Map.Entry, ReplicaSet> preEntry : 
addressRanges.asMap().entrySet())
 {
 if (preEntry.getKey().contains(desiredRange))
 {
-Set oldEndpoints = 
Sets.newHashSet(preEntry.getValue());
--- End diff --

Here, we're creating a new `Set`, then removing things from it. We could do 
an equivalent (and avoid copy of `newEndpoints`) by just running `filter` on 
`preEntry.getValue()`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197125691
  
--- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java ---
@@ -190,50 +197,50 @@ public int 
countLocalEndpoints(Iterable liveEndpoints)
  * the blockFor first ones).
  */
 if (isDCLocal)
-liveEndpoints.sort(DatabaseDescriptor.getLocalComparator());
+liveReplicas.sort(DatabaseDescriptor.getLocalComparator());
 
-return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), 
blockFor(keyspace)));
+return liveReplicas.subList(0, Math.min(liveReplicas.size(), 
blockFor(keyspace)));
 }
 
-private List filterForEachQuorum(Keyspace 
keyspace, List liveEndpoints)
+private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList 
liveReplicas)
--- End diff --

Here, the sorted variant is not preserved (even though we return a sorted 
collection from `filterForQuery`).

We could retain sorted invariant if we used `filter`, maybe something like:

```
private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList 
liveReplicas)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
Map dcsReplicas = new HashMap<>();
for (String dc : strategy.getDatacenters())
{
// we put _up to_ dc replicas only
dcsReplicas.put(dc, localQuorumFor(keyspace, dc));
}

return liveReplicas.filter((replica) -> {
String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
int replicas = dcsReplicas.get(dc);
if (replicas > 0)
{
dcsReplicas.put(dc, --replicas);
return true;
}
return false;
});
}
```

(also would avoid three iterations in favour of just two).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197161771
  
--- Diff: 
test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java ---
@@ -97,13 +103,13 @@ public void searchTokenForOldPendingRanges(final 
Blackhole bh)
 {
 int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 
10 + 5);
 Token searchToken = new 
RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
-Set endpoints = new HashSet<>();
-for (Map.Entry, Collection> entry 
: oldPendingRanges.asMap().entrySet())
+Set replicas = new HashSet<>();
+for (Map.Entry, Collection> entry : 
oldPendingRanges.asMap().entrySet())
--- End diff --

Since all usages of `asMap` are used with `entrySet`, should we just expose 
`entrySet` instead and keep map private?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197153506
  
--- Diff: src/java/org/apache/cassandra/db/view/ViewUtils.java ---
@@ -58,46 +57,55 @@ private ViewUtils()
  *
  * @return Optional.empty() if this method is called using a base 
token which does not belong to this replica
  */
-public static Optional 
getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+public static Optional getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
 {
 AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
 
 String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-List baseEndpoints = new ArrayList<>();
-List viewEndpoints = new ArrayList<>();
-for (InetAddressAndPort baseEndpoint : 
replicationStrategy.getNaturalEndpoints(baseToken))
+ReplicaList baseReplicas = new ReplicaList();
+ReplicaList viewReplicas = new ReplicaList();
+for (Replica baseEndpoint : 
replicationStrategy.getNaturalReplicas(baseToken))
 {
 // An endpoint is local if we're not using Net
 if (!(replicationStrategy instanceof NetworkTopologyStrategy) 
||
 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
-baseEndpoints.add(baseEndpoint);
+baseReplicas.add(baseEndpoint);
 }
 
-for (InetAddressAndPort viewEndpoint : 
replicationStrategy.getNaturalEndpoints(viewToken))
+for (Replica viewEndpoint : 
replicationStrategy.getNaturalReplicas(viewToken))
 {
 // If we are a base endpoint which is also a view replica, we 
use ourselves as our view replica
-if 
(viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+if (viewEndpoint.isLocal())
 return Optional.of(viewEndpoint);
 
 // We have to remove any endpoint which is shared between the 
base and the view, as it will select itself
 // and throw off the counts otherwise.
-if (baseEndpoints.contains(viewEndpoint))
-baseEndpoints.remove(viewEndpoint);
+if (baseReplicas.containsEndpoint(viewEndpoint.getEndpoint()))
+baseReplicas.removeEndpoint(viewEndpoint.getEndpoint());
 else if (!(replicationStrategy instanceof 
NetworkTopologyStrategy) ||
  
DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
-viewEndpoints.add(viewEndpoint);
+viewReplicas.add(viewEndpoint);
 }
 
 // The replication strategy will be the same for the base and the 
view, as they must belong to the same keyspace.
 // Since the same replication strategy is used, the same placement 
should be used and we should get the same
 // number of replicas for all of the tokens in the ring.
-assert baseEndpoints.size() == viewEndpoints.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";
-int baseIdx = 
baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort());
+assert baseReplicas.size() == viewReplicas.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";
--- End diff --

We could simplify this slightly by using set + iterator instead of `get` 
here: 

```diff
 String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-ReplicaList baseReplicas = new ReplicaList();
-ReplicaList viewReplicas = new ReplicaList();
+ReplicaSet baseReplicas = new ReplicaSet();
+ReplicaSet viewReplicas = new ReplicaSet();
+// We might add a method that filters natural endpoints by dc
 for (Replica baseEndpoint : 
replicationStrategy.getNaturalReplicas(baseToken))
 {
 // An endpoint is local if we're not using Net
@@ -92,20 +95,18 @@ public final class ViewUtils
 // number of replicas for all of the tokens in the ring.
 assert baseReplicas.size() == viewReplicas.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";

-int baseIdx = -1;
-for (int i=0; i baseReplicaIteraror = baseReplicas.iterator();
+Iterator viewReplicaIteraror = v

[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197132925
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -344,47 +343,43 @@ private static void recordCasContention(int 
contentions)
 casWriteMetrics.contention.update(contentions);
 }
 
-private static Predicate sameDCPredicateFor(final 
String dc)
+private static Predicate sameDCPredicateFor(final String dc)
 {
 final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
-return new Predicate()
-{
-public boolean apply(InetAddressAndPort host)
-{
-return dc.equals(snitch.getDatacenter(host));
-}
-};
+return replica -> dc.equals(snitch.getDatacenter(replica));
 }
 
 private static PaxosParticipants getPaxosParticipants(TableMetadata 
metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
 {
 Token tk = key.getToken();
-List naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
-Collection pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
+ReplicaList naturalReplicas = 
StorageService.instance.getNaturalReplicas(metadata.keyspace, tk);
--- End diff --

Looks like we can just concat natural (as list) and pending (as set), 
without additional copying in-between.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197159962
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java ---
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class ReplicaList extends Replicas
+{
+static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of());
+
+private final List replicaList;
+
+public ReplicaList()
+{
+this(new ArrayList<>());
+}
+
+public ReplicaList(int capacity)
+{
+this(new ArrayList<>(capacity));
+}
+
+public ReplicaList(ReplicaList from)
+{
+this(new ArrayList<>(from.replicaList));
+}
+
+public ReplicaList(Replicas from)
+{
+this(new ArrayList<>(from.size()));
+addAll(from);
+}
+
+public ReplicaList(Collection from)
+{
+this(new ArrayList<>(from));
+}
+
+private ReplicaList(List replicaList)
+{
+this.replicaList = replicaList;
+}
+
+@Override
+public String toString()
+{
+return replicaList.toString();
+}
+
+@Override
+public boolean add(Replica replica)
+{
+Preconditions.checkNotNull(replica);
+return replicaList.add(replica);
+}
+
+@Override
+public void addAll(Iterable replicas)
+{
+Iterables.addAll(replicaList, replicas);
+}
+
+@Override
+public int size()
+{
+return replicaList.size();
+}
+
+@Override
+protected Collection getUnmodifiableCollection()
+{
+return Collections.unmodifiableCollection(replicaList);
+}
+
+@Override
+public Iterator iterator()
+{
+return replicaList.iterator();
+}
+
+public Replica get(int idx)
+{
+return replicaList.get(idx);
+}
+
+@Override
+public void removeEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=replicaList.size()-1; i>=0; i--)
+{
+if (replicaList.get(i).getEndpoint().equals(endpoint))
+{
+replicaList.remove(i);
+}
+}
+}
+
+@Override
+public void removeReplica(Replica replica)
+{
+replicaList.remove(replica);
+}
+
+@Override
+public boolean containsEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=0; i predicate)
+{
+ArrayList newReplicaList = size() < 10 ? new 
ArrayList<>(size()) : new ArrayList<>();
+for (int i=0; i comparator)
+{
+replicaList.sort(comparator);
+}
+
+public static ReplicaList intersectEndpoints(ReplicaList l1, 
ReplicaList l2)
+{
+Replicas.checkFull(l1);
+Replicas.checkFull(l2);
+// Note: we don't use Guava Sets.intersection() for 3 reasons:
+//   1) retainAll would be inefficient if l1 and l2 are large but 
in practice both are the replicas for a range and
+//   so will be very small (< RF). In that case, retainAll is in 
fact more efficient.
+//   2) we do ultimately need a list so converting everything to 
sets 

[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197136522
  
--- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
@@ -4231,53 +4211,53 @@ private void 
calculateToFromStreams(Collection newTokens, List ke
 InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
 IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
 TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
-// clone to avoid concurrent modification in 
calculateNaturalEndpoints
+// clone to avoid concurrent modification in 
calculateNaturalReplicas
 TokenMetadata tokenMetaClone = 
tokenMetadata.cloneOnlyTokenMap();
 
 for (String keyspace : keyspaceNames)
 {
 // replication strategy of the current keyspace
 AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
-Multimap> 
endpointToRanges = strategy.getAddressRanges();
+ReplicaMultimap 
endpointToRanges = strategy.getAddressReplicas();
 
 logger.debug("Calculating ranges to stream and request for 
keyspace {}", keyspace);
 for (Token newToken : newTokens)
 {
 // getting collection of the currently used ranges by 
this keyspace
-Collection> currentRanges = 
endpointToRanges.get(localAddress);
+ReplicaSet currentReplicas = 
endpointToRanges.get(localAddress);
 // collection of ranges which this node will serve 
after move to the new token
-Collection> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+ReplicaSet updatedReplicas = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
 
 // ring ranges and endpoints associated with them
 // this used to determine what nodes should we ping 
about range data
-Multimap, InetAddressAndPort> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
+ReplicaMultimap, ReplicaSet> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
 
 // calculated parts of the ranges to request/stream 
from/to nodes in the ring
-Pair>, Set>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+Pair>, Set>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentReplicas, 
updatedReplicas);
 
 /**
  * In this loop we are going through all ranges "to 
fetch" and determining
  * nodes in the ring responsible for data we are 
interested in
  */
-Multimap, InetAddressAndPort> 
rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create();
+ReplicaMultimap, ReplicaList> 
rangesToFetchWithPreferredEndpoints = ReplicaMultimap.list();
 for (Range toFetch : rangesPerKeyspace.right)
 {
 for (Range range : rangeAddresses.keySet())
 {
 if (range.contains(toFetch))
 {
-List endpoints = null;
+ReplicaList endpoints = null;
 
 if (useStrictConsistency)
 {
-Set oldEndpoints = 
Sets.newHashSet(rangeAddresses.get(range));
-Set newEndpoints = 
Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, 
tokenMetaCloneAllSettled));
+ReplicaSet oldEndpoints = new 
ReplicaSet(rangeAddresses.get(range));
--- End diff --

Might be better to filter on `oldEndpoints` rather than remove from them 
(esp. since we already copy)

Additional benefit would be if we ever decide to switch to immutable 
interface, we'll have to do this anyways.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197128495
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---
@@ -329,6 +335,10 @@ public static void validateReplicationStrategy(String 
keyspaceName,
 AbstractReplicationStrategy strategy = 
createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, 
strategyOptions);
 strategy.validateExpectedOptions();
 strategy.validateOptions();
+if (strategy.getReplicationFactor().trans > 0 && 
!DatabaseDescriptor.isTransientReplicationEnabled())
--- End diff --

Check for version. Previously we had allNodesAtLeast22/30 for some internal 
handling, so we might have to implement similar mechanism for that.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197129440
  
--- Diff: 
src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---
@@ -90,41 +98,53 @@ public NetworkTopologyStrategy(String keyspaceName, 
TokenMetadata tokenMetadata,
 /** Number of replicas left to fill from this DC. */
 int rfLeft;
 int acceptableRackRepeats;
+int transients;
 
-DatacenterEndpoints(int rf, int rackCount, int nodeCount, 
Set endpoints, Set> racks)
+DatacenterEndpoints(ReplicationFactor rf, int rackCount, int 
nodeCount, ReplicaSet replicas, Set> racks)
--- End diff --

Should we call it `DatacenterReplicas` now?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197154559
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---
@@ -202,61 +204,65 @@ private Keyspace getKeyspace()
  *
  * @return the replication factor
  */
-public abstract int getReplicationFactor();
+public abstract ReplicationFactor getReplicationFactor();
 
 /*
  * NOTE: this is pretty inefficient. also the inverse 
(getRangeAddresses) below.
  * this is fine as long as we don't use this on any critical path.
  * (fixing this would probably require merging tokenmetadata into 
replicationstrategy,
  * so we could cache/invalidate cleanly.)
  */
-public Multimap> 
getAddressRanges(TokenMetadata metadata)
+public ReplicaMultimap 
getAddressReplicas(TokenMetadata metadata)
--- End diff --

Because of how it's used, we can add another, non-map variant, since in the 
majority of usages we're just calling `get` on just-materialised map.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197125767
  
--- Diff: src/java/org/apache/cassandra/db/ConsistencyLevel.java ---
@@ -190,50 +197,50 @@ public int 
countLocalEndpoints(Iterable liveEndpoints)
  * the blockFor first ones).
  */
 if (isDCLocal)
-liveEndpoints.sort(DatabaseDescriptor.getLocalComparator());
+liveReplicas.sort(DatabaseDescriptor.getLocalComparator());
 
-return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), 
blockFor(keyspace)));
+return liveReplicas.subList(0, Math.min(liveReplicas.size(), 
blockFor(keyspace)));
 }
 
-private List filterForEachQuorum(Keyspace 
keyspace, List liveEndpoints)
+private ReplicaList filterForEachQuorum(Keyspace keyspace, ReplicaList 
liveReplicas)
 {
 NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
 
-Map> dcsEndpoints = new 
HashMap<>();
+Map dcsReplicas = new HashMap<>();
 for (String dc: strategy.getDatacenters())
-dcsEndpoints.put(dc, new ArrayList<>());
+dcsReplicas.put(dc, 
ReplicaList.withMaxSize(liveReplicas.size()));
 
-for (InetAddressAndPort add : liveEndpoints)
+for (Replica replica : liveReplicas)
 {
-String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(add);
-dcsEndpoints.get(dc).add(add);
+String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+dcsReplicas.get(dc).add(replica);
 }
 
-List waitSet = new ArrayList<>();
-for (Map.Entry> dcEndpoints : 
dcsEndpoints.entrySet())
+ReplicaList waitSet = new 
ReplicaList(ReplicaList.withMaxSize(liveReplicas.size()));
+for (Map.Entry dcEndpoints : 
dcsReplicas.entrySet())
 {
-List dcEndpoint = dcEndpoints.getValue();
+ReplicaList dcEndpoint = dcEndpoints.getValue();
 waitSet.addAll(dcEndpoint.subList(0, 
Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size(;
 }
 
 return waitSet;
 }
 
-public boolean isSufficientLiveNodes(Keyspace keyspace, 
Iterable liveEndpoints)
+public boolean isSufficientLiveNodes(Keyspace keyspace, 
Iterable liveReplicas)
--- End diff --

We can use `Replicas` here


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #224: 14405 replicas

2018-06-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/224#discussion_r197130296
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaList.java ---
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class ReplicaList extends Replicas
+{
+static final ReplicaList EMPTY = new ReplicaList(ImmutableList.of());
+
+private final List replicaList;
+
+public ReplicaList()
+{
+this(new ArrayList<>());
+}
+
+public ReplicaList(int capacity)
+{
+this(new ArrayList<>(capacity));
+}
+
+public ReplicaList(ReplicaList from)
+{
+this(new ArrayList<>(from.replicaList));
+}
+
+public ReplicaList(Replicas from)
+{
+this(new ArrayList<>(from.size()));
+addAll(from);
+}
+
+public ReplicaList(Collection from)
+{
+this(new ArrayList<>(from));
+}
+
+private ReplicaList(List replicaList)
+{
+this.replicaList = replicaList;
+}
+
+@Override
+public String toString()
+{
+return replicaList.toString();
+}
+
+@Override
+public boolean add(Replica replica)
+{
+Preconditions.checkNotNull(replica);
+return replicaList.add(replica);
+}
+
+@Override
+public void addAll(Iterable replicas)
+{
+Iterables.addAll(replicaList, replicas);
+}
+
+@Override
+public int size()
+{
+return replicaList.size();
+}
+
+@Override
+protected Collection getUnmodifiableCollection()
+{
+return Collections.unmodifiableCollection(replicaList);
+}
+
+@Override
+public Iterator iterator()
+{
+return replicaList.iterator();
+}
+
+public Replica get(int idx)
+{
+return replicaList.get(idx);
+}
+
+@Override
+public void removeEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=replicaList.size()-1; i>=0; i--)
+{
+if (replicaList.get(i).getEndpoint().equals(endpoint))
+{
+replicaList.remove(i);
+}
+}
+}
+
+@Override
+public void removeReplica(Replica replica)
+{
+replicaList.remove(replica);
+}
+
+@Override
+public boolean containsEndpoint(InetAddressAndPort endpoint)
+{
+for (int i=0; i predicate)
+{
+ArrayList newReplicaList = size() < 10 ? new 
ArrayList<>(size()) : new ArrayList<>();
+for (int i=0; i comparator)
+{
+replicaList.sort(comparator);
+}
+
+public static ReplicaList intersectEndpoints(ReplicaList l1, 
ReplicaList l2)
+{
+Replicas.checkFull(l1);
+Replicas.checkFull(l2);
+// Note: we don't use Guava Sets.intersection() for 3 reasons:
+//   1) retainAll would be inefficient if l1 and l2 are large but 
in practice both are the replicas for a range and
+//   so will be very small (< RF). In that case, retainAll is in 
fact more efficient.
+//   2) we do ultimately need a list so converting everything to 
sets 

[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-05 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/257

Allow transient range owner to serve as repair coordinator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra tr-followup-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #257


commit aadc3c6dcd65ef915c1743cb133f1065ee2618c3
Author: Alex Petrov 
Date:   2018-09-04T17:38:27Z

Imrpve DiskBoundaryManager, bring it into consistency with the rest of 
classes

commit c1a73a5eb799ee4d12a580af848d72eda21fd7b2
Author: Alex Petrov 
Date:   2018-09-04T17:39:19Z

Improve repair scheduling

commit e47019f838f69694d943220e3fa2f42839dd8019
Author: Alex Petrov 
Date:   2018-09-04T17:40:08Z

Avoid filtering already

commit 1d19e26fbbb97d76590d8b164feb61d185d74ba2
Author: Alex Petrov 
Date:   2018-09-05T08:16:51Z

enable dtests

commit 3e61a44a6e4bc4ba0dd32b588905b01bb64aff7f
Author: Alex Petrov 
Date:   2018-09-05T11:11:51Z

Allow transient range owner to be repair coordinator

commit c9d4e02aeb9f527e4584721881de2794085c594f
Author: Alex Petrov 
Date:   2018-09-05T13:14:24Z

Switch to branch that has TR tests

commit 07acea8e98ca762d06cc61affad371049a49943b
Author: Alex Petrov 
Date:   2018-09-05T15:14:01Z

Use Sets#difference instead of per-element equality




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/257#discussion_r215442476
  
--- Diff: src/java/org/apache/cassandra/repair/RepairRunnable.java ---
@@ -651,7 +651,7 @@ private static void 
addRangeToNeighbors(List neighborRangeList, Ran
 for (CommonRange cr : neighborRangeList)
 {
 // Use strict equality here, as worst thing that can happen is 
we generate one more stream
-if (Iterables.elementsEqual(cr.endpoints, endpoints) && 
Iterables.elementsEqual(cr.transEndpoints, transEndpoints))
+if (Sets.difference(cr.endpoints, endpoints).isEmpty() && 
Sets.difference(cr.transEndpoints, transEndpoints).isEmpty())
--- End diff --

Right, `return s.size() == o.size() && s.containsAll(o);` but with quick 
hashcode path for non-equality.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #261: Tr followup 4

2018-09-07 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/261

Tr followup 4

cc @belliottsmith 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra tr-followup-4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #261


commit 9e5f71b934b5e6ffb30361e4c375d2312cd097f4
Author: Alex Petrov 
Date:   2018-09-05T08:16:51Z

enable dtests

commit 362b6e70ca096525def59012c3a27fa9a00abf3e
Author: Alex Petrov 
Date:   2018-09-07T09:02:55Z

Validate transient status on query

commit 04e3857811e49c39f1c41d0a74c9e7c6a0463f86
Author: Alex Petrov 
Date:   2018-09-07T11:00:26Z

Comment to check tests




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #259: Cleanup repair path after Transient Replication

2018-09-06 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/259

Cleanup repair path after Transient Replication



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra tr-followup-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/259.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #259


commit 3562933028ba33c2975f6c96a683d2d71eabbb51
Author: Alex Petrov 
Date:   2018-09-06T10:37:13Z

Cleanup repair path after TR

commit 8fef57a2c5931891bfe848e3788b3360a2bd4778
Author: Alex Petrov 
Date:   2018-09-05T08:16:51Z

enable dtests




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-06 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/257#discussion_r215776606
  
--- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
@@ -39,43 +40,60 @@
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
- * SymmetricLocalSyncTask performs streaming between local(coordinator) 
node and remote replica.
+ * LocalSyncTask performs streaming between local(coordinator) node and 
remote replica.
  */
-public class SymmetricLocalSyncTask extends SymmetricSyncTask implements 
StreamEventHandler
+public class LocalSyncTask extends SymmetricSyncTask implements 
StreamEventHandler
 {
 private final TraceState state = Tracing.instance.get();
 
-private static final Logger logger = 
LoggerFactory.getLogger(SymmetricLocalSyncTask.class);
+private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 
-private final boolean remoteIsTransient;
 private final UUID pendingRepair;
-private final boolean pullRepair;
+private final boolean requestRanges;
+private final boolean transferRanges;
 
-public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, 
TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean 
pullRepair, PreviewKind previewKind)
+public LocalSyncTask(RepairJobDesc desc, TreeResponse local, 
TreeResponse remote, UUID pendingRepair,
+ boolean requestRanges, boolean transferRanges, 
PreviewKind previewKind)
 {
-super(desc, r1, r2, previewKind);
-this.remoteIsTransient = remoteIsTransient;
+this(desc, local.endpoint, remote.endpoint, 
MerkleTrees.difference(local.trees, remote.trees),
+ pendingRepair, requestRanges, transferRanges, previewKind);
+}
+
+public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, 
InetAddressAndPort remote,
+ List> diff, UUID pendingRepair,
+ boolean requestRanges, boolean transferRanges, 
PreviewKind previewKind)
+{
+super(desc, local, remote, diff, previewKind);
+Preconditions.checkArgument(requestRanges || transferRanges, 
"Nothing to do in a sync job");
+
Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort()));
+
 this.pendingRepair = pendingRepair;
-this.pullRepair = pullRepair;
+this.requestRanges = requestRanges;
+this.transferRanges = transferRanges;
 }
 
 @VisibleForTesting
-StreamPlan createStreamPlan(InetAddressAndPort dst, List> 
differences)
+StreamPlan createStreamPlan(InetAddressAndPort remote, 
List> differences)
 {
 StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
pendingRepair, previewKind)
   .listeners(this)
-  .flushBeforeTransfer(pendingRepair == null)
-  // see comment on RangesAtEndpoint.toDummyList 
for why we synthesize replicas here
-  .requestRanges(dst, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences),
-  
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);  // 
request ranges from the remote node
+  .flushBeforeTransfer(pendingRepair == null);
+
+if (requestRanges)
+{
+// see comment on RangesAtEndpoint.toDummyList for why we 
synthesize replicas here
+plan.requestRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences),
+   
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
+}
 
-if (!pullRepair && !remoteIsTransient)
+if (transferRanges)
 {
 // send ranges to the remote node if we are not performing a 
pull repair
 // see comment on RangesAtEndpoint.toDummyList for why we 
synthesize replicas here
-plan.transferRanges(dst, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
+plan.transferRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
--- End diff --

I felt like this is a side-effect generating, imperative method, so 
ignoring its return is fine as it's done in order to facilitate chaining.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandr

[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-06 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/257#discussion_r215776645
  
--- Diff: src/java/org/apache/cassandra/repair/RepairJob.java ---
@@ -165,107 +166,116 @@ private boolean isTransient(InetAddressAndPort ep)
 return session.commonRange.transEndpoints.contains(ep);
 }
 
-private AsyncFunction, List> 
standardSyncing()
+private ListenableFuture> 
standardSyncing(List trees)
 {
-return trees ->
-{
-InetAddressAndPort local = 
FBUtilities.getLocalAddressAndPort();
+InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
-List syncTasks = new ArrayList<>();
-// We need to difference all trees one against another
-for (int i = 0; i < trees.size() - 1; ++i)
+List syncTasks = new ArrayList<>();
+// We need to difference all trees one against another
+for (int i = 0; i < trees.size() - 1; ++i)
+{
+TreeResponse r1 = trees.get(i);
+for (int j = i + 1; j < trees.size(); ++j)
 {
-TreeResponse r1 = trees.get(i);
-for (int j = i + 1; j < trees.size(); ++j)
+TreeResponse r2 = trees.get(j);
+
+// Avoid streming between two tansient replicas
+if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+continue;
+
+AbstractSyncTask task;
+if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
 {
-TreeResponse r2 = trees.get(j);
+TreeResponse self = r1.endpoint.equals(local) ? r1 : 
r2;
+TreeResponse remote = r2.endpoint.equals(local) ? r1 : 
r2;
 
-if (isTransient(r1.endpoint) && 
isTransient(r2.endpoint))
+// pull only if local is full
+boolean requestRanges = !isTransient(self.endpoint);
+// push only if remote is full; additionally check for 
pull repair
+boolean transfterRanges = 
!isTransient(remote.endpoint) && !session.pullRepair;
--- End diff --

Thanks for noticing! 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-06 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/257#discussion_r215776814
  
--- Diff: src/java/org/apache/cassandra/service/ActiveRepairService.java ---
@@ -20,10 +20,25 @@
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
--- End diff --

Yes, I'll remove it on commit when it's squashed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #257: Allow transient range owner to serve as repair coordin...

2018-09-06 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra/pull/257
  
@aweisberg fixed imports and spelling. Thank you for spotting those.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #262: Replicalayout followup

2018-09-07 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/262

Replicalayout followup



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/belliottsmith/cassandra replicalayout-followup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/262.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #262


commit 949ec9e9965c84c3c80daf677422acb23719212f
Author: Benedict Elliott Smith 
Date:   2018-09-07T10:41:28Z

[rR]eplicaLayout->[rR]eplicaPlan, selected() -> contact()

commit 8190c79d6c8bc3e1cfcc41ea0ee222fb3fc21231
Author: Benedict Elliott Smith 
Date:   2018-09-07T10:45:28Z

assureSufficientLiveNodes -> assureSufficientReplicas

commit 70d615f8c1831831d97582b6b526621fc3d0bbf0
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:28:01Z

main refactor

commit 45230008ce9f12639d28b304531bd315b5637fd0
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:38:31Z

remove get(Live)?(Sorted)?Natural(AndPending)?Replicas

commit e384d189c98877bfbf02cd8f168694bb3c012403
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:41:10Z

fix speculation->rr_extra_read->rr_extra_write

commit f4eb2d245cf52f86fd9ef71ca2de0df3037fb3bf
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:43:00Z

cleanup BatchLog.sendSingleReplayMutation

commit d3eb71cf3265055be739089bf34c0fed64dac24b
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:44:13Z

fix: could speculate to nodes that would not increase consistencyLevel, 
resulting in timeout instead of overload

commit f59dfa83ad6de829f79406c46acc82acb84fb645
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:45:36Z

initialise Mutable HashMap with capacity

commit 14fc69fe50178fffe6d8c0d79e60cc671aa29a88
Author: Benedict Elliott Smith 
Date:   2018-09-07T11:47:55Z

circleci

commit d45a68a319e488ed7ee016a747df1dfd734753bd
Author: Benedict Elliott Smith 
Date:   2018-09-07T13:00:18Z

comment nit

commit b49a088578292c807bc568e8a6481a7ac75fe489
Author: Benedict Elliott Smith 
Date:   2018-09-07T13:04:17Z

remove unused ForRangeWrite




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220951620
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
--- End diff --

I know these iterators are quite simple to implement, but we can wrap 
iterator with `Iterator#filter` here, too.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220966312
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int next = begin;
+int count = 0;
+{ updateNext(); }
+void updateNext()
+{
+if (count == limit) next = end;
+

[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220986153
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
--- End diff --

Sure, if this was a deliberate decision, I have nothing against it. 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test

2018-10-05 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra-dtest/pull/39

Add no-read assert to read-repair test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra-dtest 
avoid-querying-self-through-ms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra-dtest/pull/39.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #39


commit ed1055ec550a2dcd92424980d5ab8b05d9f3dfb5
Author: Alex Petrov 
Date:   2018-10-05T14:49:51Z

Add no-read assert to read-repair test




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra/pull/276
  
I've fixed your suggestions @krummas, should I commit?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #276: Repair job tests

2018-10-08 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r223013279
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #281: Sep worker shutdown

2018-10-11 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/281

Sep worker shutdown



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra sep-worker-shutdown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #281


commit 4948c8116e0568eeae5d5c4197d4b12e2f6072e6
Author: Alex Petrov 
Date:   2018-10-11T16:23:20Z

SEP Worker shutdown

commit daaaeb40adf23647318f2253a26ea010de2c
Author: Alex Petrov 
Date:   2018-10-11T16:27:34Z

Get rid of useless latch, cleanup the test

commit e2f9392823fe325f20a89c63e49fbeda617c0342
Author: Alex Petrov 
Date:   2018-10-11T16:48:14Z

Get rid of exceptions

commit 41f56a0df846cf22bce7291fd8a3258782a31b91
Author: Alex Petrov 
Date:   2018-10-11T16:50:32Z

Cleanup imports, elaborate on “kill” call

commit 92d6a8f80c5133ff4709c961c2c7659c3de2f638
Author: Alex Petrov 
Date:   2018-10-11T17:26:53Z

Cleanup




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #282: Implement in-jvm distributed tests prototype / ...

2018-10-12 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/282

Implement in-jvm distributed tests prototype / MVP



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra in-jvm-distributed-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #282


commit d245a777e93a4ec76c88ac6caa6b3051d571474f
Author: Alex Petrov 
Date:   2018-10-01T14:25:28Z

Implement in-jvm distributed tests

commit 1f5b93bdb9f32108b22c61d1319c5010bfd59b04
Author: Alex Petrov 
Date:   2018-10-12T14:14:23Z

Fixes required to properly shutdown (most of the) threads

commit 147520e9c9c0b8055bd165ab455907730c77f597
Author: Alex Petrov 
Date:   2018-10-12T14:27:40Z

Changes required for no-op MBean registration

commit 4133ece48aea4d9665bcfa1432cf4a941d68a9cc
Author: Alex Petrov 
Date:   2018-10-12T16:40:01Z

Separate loggers per instance




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #278: Avoid running query to self through messaging service

2018-10-12 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra/pull/278
  
Thank you for the review!


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...

2018-10-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/278


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest issue #39: Add no-read assert to read-repair test

2018-10-12 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra-dtest/pull/39
  
Thank you for the review, committed the latest version manually


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test

2018-10-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra-dtest/pull/39


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #282: Implement in-jvm distributed tests prototype / ...

2018-10-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/282


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222952584
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222954234
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222953530
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222953487
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222952614
  
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+static InetAddressAndPort addr1;
+static InetAddressAndPort addr2;
+static InetAddressAndPort addr3;
+static InetAddressAndPort addr4;
+static InetAddressAndPort addr5;
+
+static Range range1 = range(0, 1);
+static Range range2 = range(2, 3);
+static Range range3 = range(4, 5);
+static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+@AfterClass
+public static void reset()
+{
+FBUtilities.reset();
+}
+
+static
+{
+try
+{
+addr1 = InetAddressAndPort.getByName("127.0.0.1");
+addr2 = InetAddressAndPort.getByName("127.0.0.2");
+addr3 = InetAddressAndPort.getByName("127.0.0.3");
+addr4 = InetAddressAndPort.getByName("127.0.0.4");
+addr5 = InetAddressAndPort.getByName("127.0.0.5");
+DatabaseDescriptor.setBroadcastAddress(addr1.address);
+}
+catch (UnknownHostException e)
+{
+e.printStackTrace();
+}
+}
+
+@Test
+public void testCreateStandardSyncTasks()
+{
+testCreateStandardSyncTasks(false);
+}
+
+@Test
+public void testCreateStandardSyncTasksPullRepair()
+{
+testCreateStandardSyncTasks(true);
+}
+
+public static void testCreateStandardSyncTasks(boolean pullRepair)
+{
+List treeResponses = 
Arrays.asList(treeResponse(addr1, range1, "same",  range2, "same", range3, 
"same"),
+ 
treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ 
treeResponse(addr3, range1, "same",  range2, "same", range3, "same"));
+
+Map tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+   
 treeResponses,
+  

[GitHub] cassandra pull request #276: Repair job tests

2018-10-05 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/276#discussion_r222952423
  
--- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
@@ -52,15 +52,9 @@
 private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 
 private final UUID pendingRepair;
-private final boolean requestRanges;
-private final boolean transferRanges;
 
-public LocalSyncTask(RepairJobDesc desc, TreeResponse local, 
TreeResponse remote, UUID pendingRepair,
- boolean requestRanges, boolean transferRanges, 
PreviewKind previewKind)
-{
-this(desc, local.endpoint, remote.endpoint, 
MerkleTrees.difference(local.trees, remote.trees),
- pendingRepair, requestRanges, transferRanges, previewKind);
-}
+protected final boolean requestRanges;
--- End diff --

+1, made a change 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #39: Add no-read assert to read-repair test

2018-10-09 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/39#discussion_r223734988
  
--- Diff: read_repair_test.py ---
@@ -495,6 +495,9 @@ def test_normal_read_repair(self):
 assert storage_proxy.speculated_rr_read == 0
 assert storage_proxy.speculated_rr_write == 0
 
+warn = node2.grep_log("Message-to-self TYPE:READ")
--- End diff --

Ultimately - yes. But unfortunately there are still a couple of places we 
still do that. Maybe we should widen the scope of the ticket and check more 
places where this is happening. Or open a separate issue.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-10-01 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r221529863
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int next = begin;
+int count = 0;
+{ updateNext(); }
+void updateNext()
+{
+if (count == limit) next = end;
+

[GitHub] cassandra pull request #271: 14726

2018-10-01 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r221529971
  
--- Diff: 
src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---
@@ -58,45 +63,332 @@
 };
 }
 
-protected final List list;
-protected final boolean isSnapshot;
-protected AbstractReplicaCollection(List list, boolean 
isSnapshot)
+/**
+ * A simple list with no comodification checks and immutability by 
default (only append permitted, and only one initial copy)
+ * this permits us to reduce the amount of garbage generated, by not 
wrapping iterators or unnecessarily copying
+ * and reduces the amount of indirection necessary, as well as 
ensuring monomorphic callsites
+ */
+protected static class ReplicaList implements Iterable
 {
-this.list = list;
-this.isSnapshot = isSnapshot;
+private static final Replica[] EMPTY = new Replica[0];
+Replica[] contents;
+int begin, size;
+
+public ReplicaList() { this(0); }
+public ReplicaList(int capacity) { contents = capacity == 0 ? 
EMPTY : new Replica[capacity]; }
+public ReplicaList(Replica[] contents, int begin, int size) { 
this.contents = contents; this.begin = begin; this.size = size; }
+
+public boolean isSubList(ReplicaList subList)
+{
+return subList.contents == contents;
+}
+
+public Replica get(int index)
+{
+if (index > size)
+throw new IndexOutOfBoundsException();
+return contents[begin + index];
+}
+
+public void add(Replica replica)
+{
+// can only add to full array - if we have sliced it, we must 
be a snapshot
+assert begin == 0;
+if (size == contents.length)
+{
+int newSize;
+if (size < 3) newSize = 3;
+else if (size < 9) newSize = 9;
+else newSize = size * 2;
+contents = Arrays.copyOf(contents, newSize);
+}
+contents[size++] = replica;
+}
+
+public int size()
+{
+return size;
+}
+
+public boolean isEmpty()
+{
+return size == 0;
+}
+
+public ReplicaList subList(int begin, int end)
+{
+if (end > size || begin > end) throw new 
IndexOutOfBoundsException();
+return new ReplicaList(contents, this.begin + begin, end - 
begin);
+}
+
+public ReplicaList sorted(Comparator comparator)
+{
+Replica[] copy = Arrays.copyOfRange(contents, begin, begin + 
size);
+Arrays.sort(copy, comparator);
+return new ReplicaList(copy, 0, copy.length);
+}
+
+public Stream stream()
+{
+return Arrays.stream(contents, begin, begin + size);
+}
+
+@Override
+public Iterator iterator()
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public Replica next()
+{
+return contents[i++];
+}
+};
+}
+
+public  Iterator transformIterator(Function 
function)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int i = begin;
+@Override
+public boolean hasNext()
+{
+return i < end;
+}
+
+@Override
+public K next()
+{
+return function.apply(contents[i++]);
+}
+};
+}
+
+private Iterator filterIterator(Predicate 
predicate, int limit)
+{
+return new Iterator()
+{
+final int end = begin + size;
+int next = begin;
+int count = 0;
+{ updateNext(); }
+void updateNext()
+{
+if (count == limit) next = end;
+

[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...

2018-10-10 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/278#discussion_r224060251
  
--- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
@@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, 
InetAddressAndPort to)
 logger.trace("{} sending {} to {}@{}", 
FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to);
 
 if (to.equals(FBUtilities.getBroadcastAddressAndPort()))
-logger.trace("Message-to-self {} going over MessagingService", 
message);
+logger.debug("Message-to-self {} going over MessagingService", 
message);
--- End diff --

To be honest, I think relying on the logging might have been a bad idea. 
I've left this statement with `trace` and have rewritten the tests instead.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #276: Repair job tests

2018-10-02 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/276

Repair job tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra repair-job-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #276


commit 31df761c82855522bf56f371da7b2f475fe413f2
Author: Alex Petrov 
Date:   2018-10-01T13:30:58Z

Add tests for repair job

commit 79c7574d68c22faec67673fa425b710c9cb594e9
Author: Alex Petrov 
Date:   2018-10-01T14:25:28Z

Enable dtests

commit 992baf72ad140aa10a9433e51457b7be5f383639
Author: Alex Petrov 
Date:   2018-10-01T14:31:45Z

Remove debug statement

commit dcbf317273b09cf8a3fc810351d046addb4379ff
Author: Alex Petrov 
Date:   2018-10-01T15:22:44Z

Fix tests




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-10-04 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/267


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...

2018-10-08 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/278#discussion_r223421365
  
--- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
@@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, 
InetAddressAndPort to)
 logger.trace("{} sending {} to {}@{}", 
FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to);
 
 if (to.equals(FBUtilities.getBroadcastAddressAndPort()))
-logger.trace("Message-to-self {} going over MessagingService", 
message);
+logger.warn("Message-to-self {} going over MessagingService", 
message);
--- End diff --

Right, we can do debug here. I wanted to first throw in this case, but then 
thought that it's more useful to find all the cases where we still do that and 
eliminate those, since failing in that case brings more or less nothing.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #278: Avoid running query to self through messaging s...

2018-10-09 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/278#discussion_r223868092
  
--- Diff: 
src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java ---
@@ -102,12 +104,24 @@ void sendReadCommand(Replica to, ReadCallback 
readCallback, boolean speculative)
 else type = to.isFull() ? "full" : "transient";
 Tracing.trace("Enqueuing {} data read to {}", type, to);
 }
-MessageOut message = command.createMessage();
-// if enabled, request additional info about repaired data from 
any full replicas
-if (command.isTrackingRepairedStatus() && to.isFull())
-message = 
message.withParameter(ParameterType.TRACK_REPAIRED_DATA, 
MessagingService.ONE_BYTE);
 
-MessagingService.instance().sendRRWithFailure(message, 
to.endpoint(), readCallback);
+if (to.isSelf())
+{
+try (ReadExecutionController executionController = 
command.executionController();
--- End diff --

Yes, you're absolutely right: it is wrong to block this thread for I/O. We 
in fact have a pattern in order to deal with these things on local execution 
path: 
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java#L157
 
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java#L186
 

Thanks for pointing that out!


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #276: Repair job tests

2018-10-09 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/276


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219410357
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -446,16 +447,14 @@ else if (useStrictConsistency)
  //The old behavior where we might be asked to 
fetch ranges we don't need shouldn't occur anymore.
  //So it's an error if we don't find what we need.
  if (oldEndpoints.isEmpty() && 
toFetch.isTransient())
- {
  throw new AssertionError("If there are no 
endpoints to fetch from then we must be transitioning from transient to full 
for range " + toFetch);
- }
 
- if (!any(oldEndpoints, isSufficient))
+ if (toFetch.isFull() && (oldEndpoints.isEmpty() 
|| oldEndpoints.get(0).isTransient()))
  {
  // need an additional replica
  EndpointsForRange endpointsForRange = 
sorted.apply(rangeAddresses.get(range));
  // include all our filters, to ensure we 
include a matching node
- Optional fullReplica = 
Iterables.tryFind(endpointsForRange, and(isSufficient, 
testSourceFilters)).toJavaUtil();
+ Optional fullReplica = 
Iterables.tryFind(endpointsForRange, and(Replica::isFull, 
testSourceFilters)).toJavaUtil();
--- End diff --

Switched back to `isSufficient` here


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219488342
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -436,6 +436,7 @@ else if (useStrictConsistency)
  Set endpointsStillReplicated 
= newEndpoints.endpoints();
  // Remove new endpoints from old endpoints based 
on address
  oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
+ oldEndpoints.filter(testSourceFilters);
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219488550
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -446,16 +447,14 @@ else if (useStrictConsistency)
  //The old behavior where we might be asked to 
fetch ranges we don't need shouldn't occur anymore.
  //So it's an error if we don't find what we need.
  if (oldEndpoints.isEmpty() && 
toFetch.isTransient())
- {
  throw new AssertionError("If there are no 
endpoints to fetch from then we must be transitioning from transient to full 
for range " + toFetch);
- }
 
- if (!any(oldEndpoints, isSufficient))
+ if (toFetch.isFull() && (oldEndpoints.isEmpty() 
|| oldEndpoints.get(0).isTransient()))
--- End diff --

Ok, reverted to the old one


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219488307
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -265,10 +318,11 @@ public void addRanges(String keyspaceName, 
ReplicaCollection replicas)
 workMap = getOptimizedWorkMap(fetchMap, sourceFilters, 
keyspaceName);
 }
 
-toFetch.put(keyspaceName, workMap);
-for (Map.Entry> entry 
: workMap.asMap().entrySet())
+assert toFetch.put(keyspaceName, workMap) == null : "Keyspace is 
already added to fetch map";
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #266: Add a check for receiving digest response from ...

2018-09-21 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/266


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219488654
  
--- Diff: src/java/org/apache/cassandra/service/RangeRelocator.java ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+@VisibleForTesting
+public class RangeRelocator
+{
+private static final Logger logger = 
LoggerFactory.getLogger(StorageService.class);
+
+private final StreamPlan streamPlan = new 
StreamPlan(StreamOperation.RELOCATION);
+private final InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+private final TokenMetadata tokenMetaCloneAllSettled;
+// clone to avoid concurrent modification in calculateNaturalReplicas
+private final TokenMetadata tokenMetaClone;
+private final Collection tokens;
+private final List keyspaceNames;
+
+
+RangeRelocator(Collection tokens, List keyspaceNames, 
TokenMetadata tmd)
+{
+this.tokens = tokens;
+this.keyspaceNames = keyspaceNames;
+this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+// clone to avoid concurrent modification in 
calculateNaturalReplicas
+this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+}
+
+@VisibleForTesting
+public RangeRelocator()
+{
+this.tokens = null;
+this.keyspaceNames = null;
+this.tokenMetaCloneAllSettled = null;
+this.tokenMetaClone = null;
+}
+
+/**
+ * Wrapper that supplies accessors to the real implementations of the 
various dependencies for this method
+ */
+private static Multimap 
calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
+   
  AbstractReplicationStrategy 
strategy,
+   
  String keyspace,
+   
  TokenMetadata tmdBefore,
+   
  TokenMetadata tmdAfter)
+{
+EndpointsByReplica preferredEndpoints =
+
RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch(

[GitHub] cassandra pull request #269: Review tr range movements

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r219488682
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -338,165 +386,152 @@ else if (useStrictConsistency)
   boolean 
useStrictConsistency,
   TokenMetadata tmdBefore,
   TokenMetadata tmdAfter,
-  Predicate 
isAlive,
   String keyspace,
-  
Collection> sourceFilters)
-{
-EndpointsByRange rangeAddresses = 
strat.getRangeAddresses(tmdBefore);
-
-InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
-logger.debug ("Keyspace: {}", keyspace);
-logger.debug("To fetch RN: {}", fetchRanges);
-logger.debug("Fetch ranges: {}", rangeAddresses);
-
-Predicate testSourceFilters = and(sourceFilters);
-Function sorted =
-endpoints -> 
snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
-//This list of replicas is just candidates. With strict 
consistency it's going to be a narrow list.
-EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = 
new EndpointsByReplica.Mutable();
-for (Replica toFetch : fetchRanges)
-{
-//Replica that is sufficient to provide the data we need
-//With strict consistency and transient replication we may end 
up with multiple types
-//so this isn't used with strict consistency
-Predicate isSufficient = r -> (toFetch.isTransient() 
|| r.isFull());
-Predicate accept = r ->
-   isSufficient.test(r) // is 
sufficient
-&& !r.endpoint().equals(localAddress)   // is not self
-&& isAlive.test(r); // is alive
-
-logger.debug("To fetch {}", toFetch);
-for (Range range : rangeAddresses.keySet())
-{
-if (range.contains(toFetch.range()))
-{
-EndpointsForRange oldEndpoints = 
rangeAddresses.get(range);
-
-//Ultimately we populate this with whatever is going 
to be fetched from to satisfy toFetch
-//It could be multiple endpoints and we must fetch 
from all of them if they are there
-//With transient replication and strict consistency 
this is to get the full data from a full replica and
-//transient data from the transient replica losing data
-EndpointsForRange sources;
-if (useStrictConsistency)
-{
-//Start with two sets of who replicates the range 
before and who replicates it after
-EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
-logger.debug("Old endpoints {}", oldEndpoints);
-logger.debug("New endpoints {}", newEndpoints);
-
-//Due to CASSANDRA-5953 we can have a higher RF 
then we have endpoints.
-//So we need to be careful to only be strict when 
endpoints == RF
-if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
-{
-Set 
endpointsStillReplicated = newEndpoints.endpoints();
-// Remove new endpoints from old endpoints 
based on address
-oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
-
-if (!all(oldEndpoints, isAlive))
-throw new IllegalStateException("A node 
required to move the data consistently is down: "
-+ 
oldEndpoints.filter(not(isAlive)));
-
-if (oldEndpoints.size() > 1)
-throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
-
-//If we are transitioning from transient to 
full and and the set of replicas for the range is not changing
-//we might end up with no endpoints to fetch 
from by address. In that

[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219508033
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
--- End diff --

> Either way, while we're here we should document in the caller the fact 
that the provided consistencyLevel is not used, and that the 
batchlogConsistencyLevel is used to clear the entries from the remote batch 
logs once the real write has reached that consistency level.

not sure I understand what you mean, as we're using it both for cleanup and 
for writing to batchlog. 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219508092
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -780,8 +780,9 @@ public static void mutateMV(ByteBuffer dataKey, 
Collection mutations,
 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
 
 //Since the base -> view replication is 1:1 we only need 
to store the BL locally
-final Collection batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
-BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> 
asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forLocalBatchlogWrite();
+BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
--- End diff --


Fixed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219518994
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
--- End diff --

Moved it


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-21 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219518928
  
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1021,18 +1021,18 @@ private static void 
syncWriteToBatchlog(Collection mutations, Collecti
 handler.get();
 }
 
-private static void 
asyncRemoveFromBatchlog(Collection endpoints, UUID uuid)
+private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite 
replicaPlan, UUID uuid)
 {
 MessageOut message = new 
MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, 
UUIDSerializer.serializer);
-for (InetAddressAndPort target : endpoints)
+for (Replica target : replicaPlan.contacts())
 {
 if (logger.isTraceEnabled())
 logger.trace("Sending batchlog remove request {} to {}", 
uuid, target);
 
-if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
-performLocally(Stage.MUTATION, 
SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+if (target.isLocal())
--- End diff --

Renamed


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #267: Consolidate batch write code

2018-09-24 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra/pull/267
  
@belliottsmith I've addressed your comments, rebased and pushed for one 
more round of CI.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-26 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/269


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #275: 14727

2018-09-26 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/275

14727



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/belliottsmith/cassandra 14727

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #275


commit 150f4236f43593ed6a832ada5490c5b6ea64b88c
Author: Benedict Elliott Smith 
Date:   2018-09-19T11:52:27Z

Transient Replication support for EACH_QUORUM, and correction of behaviour 
for LOCAL_QUORUM

commit 3bd3f7b8ede99c69a810e75eff762deffdc8cf72
Author: Benedict Elliott Smith 
Date:   2018-09-26T10:08:37Z

circleci




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-26 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r220455656
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -424,62 +440,58 @@ else if (useStrictConsistency)
  EndpointsForRange sources;
  if (useStrictConsistency)
  {
- //Start with two sets of who replicates the range 
before and who replicates it after
- EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
- logger.debug("Old endpoints {}", oldEndpoints);
- logger.debug("New endpoints {}", newEndpoints);
-
+ EndpointsForRange strictEndpoints;
  //Due to CASSANDRA-5953 we can have a higher RF then 
we have endpoints.
  //So we need to be careful to only be strict when 
endpoints == RF
  if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
  {
- Set endpointsStillReplicated 
= newEndpoints.endpoints();
+ //Start with two sets of who replicates the range 
before and who replicates it after
+ EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
  // Remove new endpoints from old endpoints based 
on address
- oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
- oldEndpoints.filter(testSourceFilters);
+ strictEndpoints = 
oldEndpoints.without(newEndpoints.endpoints());
 
- if (oldEndpoints.size() > 1)
- throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
+ //We have to check the source filters here to see 
if they will remove any replicas
+ //required for strict consistency
+ if (!all(strictEndpoints, testSourceFilters))
+ throw new IllegalStateException("Necessary 
replicas for strict consistency were removed by source filters: " + 
buildErrorMessage(sourceFilters, strictEndpoints));
+
+ if (strictEndpoints.size() > 1)
--- End diff --

We're not really filtering anymore at all. We can get here only in case 
test source filters didn't filter any nodes (otherwise we'd fail with the same 
assertion later). We've asserted that the resulting endpoint is not going to be 
yanked from under us.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219137918
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
--- End diff --

We don't strictly need it: included it only to short-circuit. We can skip 
it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #267: Consolidate batch write code

2018-09-20 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/267#discussion_r219140446
  
--- Diff: src/java/org/apache/cassandra/locator/ReplicaPlans.java ---
@@ -61,26 +73,62 @@
 return forSingleReplicaWrite(keyspace, token, replica);
 }
 
+public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+{
+Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
+EndpointsForToken.of(token, localSystemReplica),
+EndpointsForToken.empty(token)
+);
+
+return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
+}
+
 /**
  * Requires that the provided endpoints are alive.  Converts them to 
their relevant system replicas.
  * Note that the liveAndDown collection and live are equal to the 
provided endpoints.
- *
- * The semantics are a bit weird, in that CL=ONE iff we have one node 
provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live 
node is available, are both unclear.
  */
-public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace 
keyspace, Collection endpoints) throws UnavailableException
+public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String 
localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
 {
 // A single case we write not for range or token, but multiple 
mutations to many tokens
 Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+Multimap localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+
+// Replicas are picked manually:
+//  - replicas should be alive according to the failure detector
+//  - replicas should be in the local datacenter
+//  - choose min(2, number of qualifying candiates above)
+//  - allow the local node to be the only replica only if it's a 
single-node DC
+Collection chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+
+if (chosenEndpoints.isEmpty())
+{
+if (consistencyLevel == ConsistencyLevel.ANY)
+chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+else
+throw UnavailableException.create(ConsistencyLevel.ONE, 1, 
0);
+}
+
 ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(
-
SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
 EndpointsForToken.empty(token)
 );
-ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 
? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+// Batchlog is hosted by either one node or two nodes from 
different racks.
+consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
--- End diff --

Right, I had similar thoughts when writing that. I've also been thinking if 
it's enough to only have two nodes in local DC blocking for batchlog writes. I 
wanted to make a change to original CL, but that'd might reduce availability 
(for instance when it's a quorum). But from looking at batch replay, we should 
be able to do it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220869142
  
--- Diff: src/java/org/apache/cassandra/locator/EndpointsForRange.java ---
@@ -40,13 +37,13 @@
 public class EndpointsForRange extends Endpoints
 {
 private final Range range;
-private EndpointsForRange(Range range, List list, 
boolean isSnapshot)
+private EndpointsForRange(Range range, ReplicaList list)
--- End diff --

Looks like the intention was to make `byEndpoint` calculated lazily 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220874274
  
--- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
---
@@ -141,28 +147,37 @@ public void testOrderOfIteration()
 {
 Assert.assertEquals(canonicalList, ImmutableList.copyOf(test));
 Assert.assertEquals(canonicalList, 
test.stream().collect(Collectors.toList()));
-Assert.assertEquals(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints());
+Assert.assertTrue(Iterables.elementsEqual(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints()));
 }
 
 private void assertSubList(C subCollection, int from, int to)
 {
-Assert.assertTrue(subCollection.isSnapshot);
 if (from == to)
 {
 Assert.assertTrue(subCollection.isEmpty());
 }
 else
 {
-List subList = this.test.list.subList(from, to);
-if (test.isSnapshot)
-Assert.assertSame(subList.getClass(), 
subCollection.list.getClass());
+AbstractReplicaCollection.ReplicaList subList = 
this.test.list.subList(from, to);
+if (!(test instanceof ReplicaCollection.Builder))
+Assert.assertSame(subList.contents, 
subCollection.list.contents);
 Assert.assertEquals(subList, subCollection.list);
 }
 }
 
+private void assertSubSequence(Iterable subSequence, int 
from, int to)
+{
+AbstractReplicaCollection.ReplicaList subList = 
this.test.list.subList(from, to);
+if (!elementsEqual(subList, subSequence))
+{
+elementsEqual(subList, subSequence);
+}
+Assert.assertTrue(elementsEqual(subList, subSequence));
+}
+
 void testSubList(int subListDepth, int filterDepth, int sortDepth)
 {
-if (test.isSnapshot)
+if (!(test instanceof ReplicaCollection.Builder))
--- End diff --

should we make it `isBuilder`?.. 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220871228
  
--- Diff: src/java/org/apache/cassandra/locator/Endpoints.java ---
@@ -36,13 +33,16 @@
  */
 public abstract class Endpoints> extends 
AbstractReplicaCollection
 {
-static final Map EMPTY_MAP = 
Collections.unmodifiableMap(new LinkedHashMap<>());
+static final ReplicaMap endpointMap(ReplicaList 
list) { return new ReplicaMap<>(list, Replica::endpoint); }
--- End diff --

This is a method, we can skip `final` here


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #271: 14726

2018-09-27 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/271#discussion_r220871785
  
--- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
---
@@ -141,28 +147,37 @@ public void testOrderOfIteration()
 {
 Assert.assertEquals(canonicalList, ImmutableList.copyOf(test));
 Assert.assertEquals(canonicalList, 
test.stream().collect(Collectors.toList()));
-Assert.assertEquals(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints());
+Assert.assertTrue(Iterables.elementsEqual(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints()));
 }
 
 private void assertSubList(C subCollection, int from, int to)
 {
-Assert.assertTrue(subCollection.isSnapshot);
 if (from == to)
 {
 Assert.assertTrue(subCollection.isEmpty());
 }
 else
 {
-List subList = this.test.list.subList(from, to);
-if (test.isSnapshot)
-Assert.assertSame(subList.getClass(), 
subCollection.list.getClass());
+AbstractReplicaCollection.ReplicaList subList = 
this.test.list.subList(from, to);
+if (!(test instanceof ReplicaCollection.Builder))
+Assert.assertSame(subList.contents, 
subCollection.list.contents);
 Assert.assertEquals(subList, subCollection.list);
 }
 }
 
+private void assertSubSequence(Iterable subSequence, int 
from, int to)
+{
+AbstractReplicaCollection.ReplicaList subList = 
this.test.list.subList(from, to);
+if (!elementsEqual(subList, subSequence))
+{
+elementsEqual(subList, subSequence);
--- End diff --

Did you mean something like `Assert.assertSame(subList.contents, 
subSequence);` ?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #281: Sep worker shutdown

2019-01-08 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/281


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #259: Cleanup repair path after Transient Replication

2018-09-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/259


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #261: Tr followup 4

2018-09-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/261


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #257: Allow transient range owner to serve as repair ...

2018-09-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/257


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #262: Replicalayout followup

2018-09-12 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/262


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-17 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/269

Review tr range movements

CASSANDRA-14756

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra review-tr-range-movements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #269


commit e51899d09a254e4a4d86c4fea26b6ada7eaebbcd
Author: Alex Petrov 
Date:   2018-09-17T09:51:56Z

Enable dtests

commit 60927ae3a8c5c54b273a519110cc4c0540826d48
Author: Alex Petrov 
Date:   2018-09-13T08:39:01Z

Simplify iteration in calculateRangesToFetchWithPreferredEndpoints

Extract RangeRelocator

Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve 
readability:

  * short-circuit range.contains(), remove one level of nesting
  * remove and use isSufficient instead since it duplicates the source 
filters that are already applied (see usages of 
RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter())
  * use explicit fullness checks instead of “any” check

Simplify RangeRelocator code

Further simplify calculateRangesToStreamWithEndpoints after Benedict’s 
comment

Fix range relocation

Simplify calculateStreamAndFetchRanges

Unify request/transfer ranges interface (Added benefit of this change is 
that we have a check for non-intersecting ranges)

Simplify iteration in calculateRangesToFetchWithPreferredEndpoints

Extract RangeRelocator
Simplify RangeRelocator code

Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve 
readability:

  * short-circuit range.contains(), remove one level of nesting
  * remove and use isSufficient instead since it duplicates the source 
filters that are already applied (see usages of 
RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter())
  * use explicit fullness checks instead of “any” check

Simplify calculateStreamAndFetchRanges

Unify request/transfer ranges interface (Added benefit of this change is 
that we have a check for non-intersecting ranges)

Improve error messages

commit 2b13c4a34b19a4de80eb60c9cd059b674a4b19d3
Author: Alex Petrov 
Date:   2018-09-17T12:17:33Z

Switch dtest branch




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #38: Transient Replication and Cheap Quorums te...

2018-09-17 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra-dtest/pull/38


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest issue #38: Transient Replication and Cheap Quorums tests

2018-09-17 Thread ifesdjeen
Github user ifesdjeen commented on the issue:

https://github.com/apache/cassandra-dtest/pull/38
  
Merged, thank you!


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #266: Add a check for receiving digest response from ...

2018-09-14 Thread ifesdjeen
GitHub user ifesdjeen opened a pull request:

https://github.com/apache/cassandra/pull/266

Add a check for receiving digest response from transient node

for CASSANDRA-14750

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ifesdjeen/cassandra CASSANDRA-14750

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #266


commit 735b8da5379c7e3556b05dc450741bf6cae09f28
Author: Alex Petrov 
Date:   2018-09-12T21:20:34Z

Add a check for receiving digest response from transient node

Patch by Alex Petrov; reviewed by Benedict Elliot Smith for CASSANDRA-14750




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #269: Review tr range movements

2018-09-18 Thread ifesdjeen
Github user ifesdjeen commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/269#discussion_r218432647
  
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -87,8 +85,8 @@
 private final InetAddressAndPort address;
 /* streaming description */
 private final String description;
-private final Multimap> toFetch = HashMultimap.create();
-private final Set> sourceFilters = new HashSet<>();
+private final Map> 
toFetch = new HashMap<>();
+private final Set sourceFilters = new HashSet<>();
--- End diff --

You're right it doesn't. Filtering is idempotent (e.g. first op will filter 
the element out and subsequent ones will just be no-op)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org