Fix thundering herd on endpoint cache invalidation patch by rbranson and jbellis for CASSANDRA-6345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835 Branch: refs/heads/trunk Commit: 8145c83566450feb68a12352ac88efe9983ec266 Parents: fce1735 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Nov 26 14:09:56 2013 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Nov 26 14:09:56 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../locator/AbstractReplicationStrategy.java | 58 +++++++++++++------- .../apache/cassandra/locator/TokenMetadata.java | 47 +++++++--------- 3 files changed, 59 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 57c1896..8d443f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.13 + * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345) * Optimize FD phi calculation (CASSANDRA-6386) * Improve initial FD phi estimate when starting up (CASSANDRA-6385) * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index e17b0b4..51c4119 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -20,10 +20,12 @@ package org.apache.cassandra.locator; import java.lang.reflect.Constructor; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.locks.Lock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy public final Map<String, String> configOptions; private final TokenMetadata tokenMetadata; + // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance, + // so that our ownership calculations never block Gossip from processing an ownership change. + // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345), + // so we keep our own copy here. + // + // Writes to tokenMetadataClone should be synchronized. + private volatile TokenMetadata tokenMetadataClone = null; + private volatile long clonedTokenMetadataVersion = 0; + public IEndpointSnitch snitch; AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) @@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy assert tokenMetadata != null; this.tokenMetadata = tokenMetadata; this.snitch = snitch; - this.tokenMetadata.register(this); this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions; this.tableName = tableName; // lazy-initialize table itself since we don't create them until after the replication strategies @@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy public ArrayList<InetAddress> getCachedEndpoints(Token t) { - return cachedEndpoints.get(t); - } + long lastVersion = tokenMetadata.getRingVersion(); - public void cacheEndpoint(Token t, ArrayList<InetAddress> addr) - { - cachedEndpoints.put(t, addr); - } + if (lastVersion > clonedTokenMetadataVersion) + { + synchronized (this) + { + if (lastVersion > clonedTokenMetadataVersion) + { + logger.debug("clearing cached endpoints"); + tokenMetadataClone = null; + cachedEndpoints.clear(); + clonedTokenMetadataVersion = lastVersion; + } + } + } - public void clearEndpointCache() - { - logger.debug("clearing cached endpoints"); - cachedEndpoints.clear(); + return cachedEndpoints.get(t); } /** @@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken); if (endpoints == null) { - TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap(); - keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken); + if (tokenMetadataClone == null) + { + // synchronize to prevent thundering herd post-invalidation + synchronized (this) + { + if (tokenMetadataClone == null) + tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap(); + } + // if our clone got invalidated, it's possible there is a new token to account for too + keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken); + } + endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone)); - cacheEndpoint(keyToken, endpoints); + cachedEndpoints.put(keyToken, endpoints); } return new ArrayList<InetAddress>(endpoints); @@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy return getAddressRanges(temp).get(pendingAddress); } - public void invalidateCachedTokenEndpointValues() - { - clearEndpointCache(); - } - public abstract void validateOptions() throws ConfigurationException; /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 5ab1b3f..818ca8f 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -22,15 +22,10 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.*; - -import org.apache.cassandra.utils.BiMultiValMap; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.SortedBiMultiValMap; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.BiMultiValMap; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.SortedBiMultiValMap; public class TokenMetadata { @@ -96,8 +94,6 @@ public class TokenMetadata private volatile ArrayList<Token> sortedTokens; private final Topology topology; - /* list of subscribers that are notified when the tokenToEndpointMap changed */ - private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>(); private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>() { @@ -107,6 +103,9 @@ public class TokenMetadata } }; + // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership + private volatile long ringVersion = 0; + public TokenMetadata() { this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), @@ -428,7 +427,7 @@ public class TokenMetadata leavingEndpoints.remove(endpoint); endpointToHostIdMap.remove(endpoint); sortedTokens = sortTokens(); - invalidateCaches(); + invalidateCachedRings(); } finally { @@ -456,7 +455,7 @@ public class TokenMetadata } } - invalidateCaches(); + invalidateCachedRings(); } finally { @@ -885,7 +884,7 @@ public class TokenMetadata leavingEndpoints.clear(); pendingRanges.clear(); endpointToHostIdMap.clear(); - invalidateCaches(); + invalidateCachedRings(); } finally { @@ -977,24 +976,6 @@ public class TokenMetadata return sb.toString(); } - public void invalidateCaches() - { - for (AbstractReplicationStrategy subscriber : subscribers) - { - subscriber.invalidateCachedTokenEndpointValues(); - } - } - - public void register(AbstractReplicationStrategy subscriber) - { - subscribers.add(subscriber); - } - - public void unregister(AbstractReplicationStrategy subscriber) - { - subscribers.remove(subscriber); - } - public Collection<InetAddress> pendingEndpointsFor(Token token, String table) { Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table); @@ -1068,6 +1049,16 @@ public class TokenMetadata return topology; } + public long getRingVersion() + { + return ringVersion; + } + + private void invalidateCachedRings() + { + ringVersion++; + } + /** * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.