Calculate pending ranges asynchronously. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-5135
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/713bba5a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/713bba5a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/713bba5a Branch: refs/heads/cassandra-1.2 Commit: 713bba5af085273650b8202b4a7c225b5a094b5f Parents: 3cc8656 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Jan 14 06:26:22 2013 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Jan 14 06:26:22 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../service/PendingRangeCalculatorService.java | 194 +++++++++++++++ .../PendingRangeCalculatorServiceMBean.java | 23 ++ .../apache/cassandra/service/StorageService.java | 135 ++--------- .../cassandra/locator/SimpleStrategyTest.java | 3 +- 5 files changed, 236 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/713bba5a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6c76151..0b3ab2b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Fix CQL3 BATCH authorization caching (CASSANDRA-5145) * fix get_count returns incorrect value with TTL (CASSANDRA-5099) * better handling for amid compaction failure (CASSANDRA-5137) + * calculate pending ranges asynchronously (CASSANDRA-5135) 1.1.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/713bba5a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java new file mode 100644 index 0000000..d06f2d8 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + + +public class PendingRangeCalculatorService extends PendingRangeCalculatorServiceMBean +{ + public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService(); + + private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class); + private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal"); + + public PendingRangeCalculatorService() + { + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + } + + private class PendingRangeTask implements Runnable + { + public void run() + { + long start = System.currentTimeMillis(); + for (String table : Schema.instance.getNonSystemTables()) + { + calculatePendingRanges(Table.open(table).getReplicationStrategy(), table); + } + logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemTables().size(), System.currentTimeMillis() - start); + } + } + + public Future<?> update() + { + return executor.submit(new PendingRangeTask()); + } + + public void blockUntilFinished() + { + while (true) + { + if (executor.getActiveCount() + executor.getPendingTasks() == 0) + break; + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + /** + * Calculate pending ranges according to bootstrapping and leaving nodes. Reasoning is: + * + * (1) When in doubt, it is better to write too much to a node than too little. That is, if + * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning + * up unneeded data afterwards is better than missing writes during movement. + * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional + * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore + * we will first remove _all_ leaving tokens for the sake of calculation and then check what + * ranges would go where if all nodes are to leave. This way we get the biggest possible + * ranges with regard current leave operations, covering all subsets of possible final range + * values. + * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing + * complex calculations to see if multiple bootstraps overlap, we simply base calculations + * on the same token ring used before (reflecting situation after all leave operations have + * completed). Bootstrapping nodes will be added and removed one by one to that metadata and + * checked what their ranges would be. This will give us the biggest possible ranges the + * node could have. It might be that other bootstraps make our actual final ranges smaller, + * but it does not matter as we can clean up the data afterwards. + * + * NOTE: This is heavy and ineffective operation. This will be done only once when a node + * changes state in the cluster, so it should be manageable. + */ + + // public & static for testing purposes + public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table) + { + TokenMetadata tm = StorageService.instance.getTokenMetadata(); + Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); + Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); + Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); + + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty()) + { + if (logger.isDebugEnabled()) + logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", table); + tm.setPendingRanges(table, pendingRanges); + return; + } + + Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); + + // Copy of metadata reflecting the situation after all leave operations are finished. + TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); + + // get all ranges that will be affected by leaving nodes + Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); + for (InetAddress endpoint : leavingEndpoints) + affectedRanges.addAll(addressRanges.get(endpoint)); + + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + for (Range<Token> range : affectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); + } + + // At this stage pendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + synchronized (bootstrapTokens) + { + for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) + { + InetAddress endpoint = entry.getValue(); + + allLeftMetadata.updateNormalToken(entry.getKey(), endpoint); + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + pendingRanges.put(range, endpoint); + allLeftMetadata.removeEndpoint(endpoint); + } + } + + // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving nodes. + + // For each of the moving nodes, we do the same thing we did for bootstrapping: + // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. + for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) + { + InetAddress endpoint = moving.right; // address of the moving node + + // moving.left is a new token of the endpoint + allLeftMetadata.updateNormalToken(moving.left, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + pendingRanges.put(range, endpoint); + } + + allLeftMetadata.removeEndpoint(endpoint); + } + + tm.setPendingRanges(table, pendingRanges); + + if (logger.isDebugEnabled()) + logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/713bba5a/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java new file mode 100644 index 0000000..c9b04f0 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +public class PendingRangeCalculatorServiceMBean +{ +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/713bba5a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ad05ce2..54d1c0b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -625,7 +625,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new AssertionError(e); } } - setMode(Mode.JOINING, "schema complete, ready to bootstrap", true); + setMode(Mode.JOINING, "schema complete", true); + setMode(Mode.JOINING, "waiting for pending range calculation", true); + PendingRangeCalculatorService.instance.blockUntilFinished(); + setMode(Mode.JOINING, "calculation complete, ready to bootstrap", true); if (logger_.isDebugEnabled()) @@ -1108,7 +1111,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } tokenMetadata_.addBootstrapToken(token, endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1162,7 +1165,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token tokenMetadata_.removeFromMoving(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1197,7 +1200,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // at this point the endpoint is certainly a member with this token, so let's proceed // normally tokenMetadata_.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1233,7 +1236,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata_.addMovingEndpoint(token, endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1275,7 +1278,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Note that the endpoint is being removed tokenMetadata_.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); // find the endpoint coordinating this removal that we need to notify when we're done String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); @@ -1294,7 +1297,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Gossiper.instance.removeEndpoint(endpoint); tokenMetadata_.removeEndpoint(endpoint); tokenMetadata_.removeBootstrapToken(token); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); if (!isClientMode) { logger_.info("Removing token " + token + " for " + endpoint); @@ -1327,114 +1330,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: - * - * (1) When in doubt, it is better to write too much to a node than too little. That is, if - * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning - * up unneeded data afterwards is better than missing writes during movement. - * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional - * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore - * we will first remove _all_ leaving tokens for the sake of calculation and then check what - * ranges would go where if all nodes are to leave. This way we get the biggest possible - * ranges with regard current leave operations, covering all subsets of possible final range - * values. - * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing - * complex calculations to see if multiple bootstraps overlap, we simply base calculations - * on the same token ring used before (reflecting situation after all leave operations have - * completed). Bootstrapping nodes will be added and removed one by one to that metadata and - * checked what their ranges would be. This will give us the biggest possible ranges the - * node could have. It might be that other bootstraps make our actual final ranges smaller, - * but it does not matter as we can clean up the data afterwards. - * - * NOTE: This is heavy and ineffective operation. This will be done only once when a node - * changes state in the cluster, so it should be manageable. - */ - private void calculatePendingRanges() - { - for (String table : Schema.instance.getNonSystemTables()) - calculatePendingRanges(Table.open(table).getReplicationStrategy(), table); - } - - // public & static for testing purposes - public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table) - { - TokenMetadata tm = StorageService.instance.getTokenMetadata(); - Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); - Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); - Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty()) - { - if (logger_.isDebugEnabled()) - logger_.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", table); - tm.setPendingRanges(table, pendingRanges); - return; - } - - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); - - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - for (Range<Token> range : affectedRanges) - { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); - } - - // At this stage pendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - synchronized (bootstrapTokens) - { - for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) - { - InetAddress endpoint = entry.getValue(); - - allLeftMetadata.updateNormalToken(entry.getKey(), endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } - } - - // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) - { - InetAddress endpoint = moving.right; // address of the moving node - - // moving.left is a new token of the endpoint - allLeftMetadata.updateNormalToken(moving.left, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - pendingRanges.put(range, endpoint); - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - tm.setPendingRanges(table, pendingRanges); - - if (logger_.isDebugEnabled()) - logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); - } - - /** * Finds living endpoints responsible for the given ranges * * @param table the table ranges belong to @@ -1623,7 +1518,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void onRemove(InetAddress endpoint) { tokenMetadata_.removeEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } public void onDead(InetAddress endpoint, EndpointState state) @@ -2367,7 +2262,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalToken())); tokenMetadata_.addLeavingEndpoint(FBUtilities.getBroadcastAddress()); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } public void decommission() throws InterruptedException @@ -2376,6 +2271,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new UnsupportedOperationException("local node is not a member of the token ring yet"); if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2) throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); + PendingRangeCalculatorService.instance.blockUntilFinished(); for (String table : Schema.instance.getNonSystemTables()) { if (tokenMetadata_.getPendingRanges(table, FBUtilities.getBroadcastAddress()).size() > 0) @@ -2407,7 +2303,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { SystemTable.setBootstrapState(SystemTable.BootstrapState.NEEDS_BOOTSTRAP); tokenMetadata_.removeEndpoint(FBUtilities.getBroadcastAddress()); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken(),Gossiper.computeExpireTime())); int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2); @@ -2480,6 +2376,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE InetAddress localAddress = FBUtilities.getBroadcastAddress(); List<String> tablesToProcess = Schema.instance.getNonSystemTables(); + PendingRangeCalculatorService.instance.blockUntilFinished(); // checking if data is moving to this node for (String table : tablesToProcess) { @@ -2687,7 +2584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE removingNode = endpoint; tokenMetadata_.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us // we add our own token so other nodes to let us know when they're done Gossiper.instance.advertiseRemoving(endpoint, token, localToken); http://git-wip-us.apache.org/repos/asf/cassandra/blob/713bba5a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index 8c17f6d..a0143aa 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.service.PendingRangeCalculatorService; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -139,7 +140,7 @@ public class SimpleStrategyTest extends SchemaLoader { strategy = getStrategy(table, tmd); - StorageService.calculatePendingRanges(strategy, table); + PendingRangeCalculatorService.calculatePendingRanges(strategy, table); int replicationFactor = strategy.getReplicationFactor();