Cache local ranges when calculating repair neighbors patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f Branch: refs/heads/cassandra-2.2 Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed Parents: 1d2d074 Author: Mahdi Mohammadi <mah...@gmail.com> Authored: Wed Jun 15 11:43:27 2016 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Wed Jun 15 11:43:27 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/ActiveRepairService.java | 8 +++++--- .../org/apache/cassandra/service/StorageService.java | 6 +++++- .../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------ 4 files changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d70902..ec2b48e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Cache local ranges when calculating repair neighbors (CASSANDRA-11933) * Allow LWT operation on static column with only partition keys (CASSANDRA-10532) * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index f8975f9..4c83c48 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai RepairFuture submitArtificialRepairSession(RepairJobDesc desc) { Set<InetAddress> neighbours = new HashSet<>(); - neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null)); + Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace); + neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null)); RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily}); sessions.put(session.getId(), session); RepairFuture futureTask = new RepairFuture(session); @@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * Return all of the neighbors with whom we share the provided range. * * @param keyspaceName keyspace to repair + * @param keyspaceLocalRanges local-range for given keyspaceName * @param toRepair token to repair * @param dataCenters the data centers to involve in the repair * * @return neighbors with whom we share the provided range */ - public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) + public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) { StorageService ss = StorageService.instance; Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName); Range<Token> rangeSuperSet = null; - for (Range<Token> range : ss.getLocalRanges(keyspaceName)) + for (Range<Token> range : keyspaceLocalRanges) { if (range.contains(toRepair)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index eea4556..27939f9 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return; } + //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent + //calculation multiple times + Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace); + Set<InetAddress> allNeighbors = new HashSet<>(); Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); for (Range<Token> range : ranges) { try { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts); + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts); rangeToNeighbors.put(range, neighbors); allNeighbors.addAll(neighbors); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index ac39de6..21eb492 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null)); } assertEquals(expected, neighbors); } @@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null)); } assertEquals(expected, neighbors); } @@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); } assertEquals(expected, neighbors); } @@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); } assertEquals(expected, neighbors); } @@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader expected.remove(FBUtilities.getBroadcastAddress()); Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName()); - assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName); + assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next()); } @Test(expected = IllegalArgumentException.class) @@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor()); //Dont give local endpoint Collection<String> hosts = Arrays.asList("127.0.0.3"); - ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName); + ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts); } Set<InetAddress> addTokens(int max) throws Throwable