Forward writes to replacement node when replace_address != broadcast_address
Patch by Paulo Motta; reviewed by Richard Low for CASSANDRA-8523 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b39d984f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b39d984f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b39d984f Branch: refs/heads/cassandra-3.9 Commit: b39d984f7bd682c7638415d65dcc4ac9bcb74e5f Parents: 6eff082 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Fri Jun 17 21:09:31 2016 -0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Aug 31 20:21:30 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 5 +- .../apache/cassandra/gms/VersionedValue.java | 6 + .../apache/cassandra/locator/TokenMetadata.java | 54 ++++++- .../cassandra/service/LoadBroadcaster.java | 2 +- .../cassandra/service/StorageService.java | 139 +++++++++++++++---- 6 files changed, 177 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0f7cf0e..d7e9394 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.8 + * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523) * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522) * Fail repair on non-existing table (CASSANDRA-12279) * cqlsh copy: fix missing counter values (CASSANDRA-12476) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 00e3da8..a8f9524 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -76,6 +76,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean static { SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES); SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING); + SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE); } private volatile ScheduledFuture<?> scheduledGossipTask; @@ -333,10 +334,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (epState == null) return; - logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive()); if (!epState.isAlive()) return; + logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive()); + + if (isShutdown(endpoint)) { markAsShutdown(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 3ea7bb4..661d3ba 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -65,6 +65,7 @@ public class VersionedValue implements Comparable<VersionedValue> // values for ApplicationState.STATUS public final static String STATUS_BOOTSTRAPPING = "BOOT"; + public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE"; public final static String STATUS_NORMAL = "NORMAL"; public final static String STATUS_LEAVING = "LEAVING"; public final static String STATUS_LEFT = "LEFT"; @@ -133,6 +134,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(value.value); } + public VersionedValue bootReplacing(InetAddress oldNode) + { + return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress())); + } + public VersionedValue bootstrapping(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index de16fda..b06c9c8 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.base.Optional; import com.google.common.collect.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -79,6 +80,9 @@ public class TokenMetadata // means we can detect and reject the addition of multiple nodes at the same token // before one becomes part of the ring. private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>(); + + private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create(); + // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private final Set<InetAddress> leavingEndpoints = new HashSet<>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} @@ -185,6 +189,7 @@ public class TokenMetadata tokenToEndpointMap.removeValue(endpoint); topology.addEndpoint(endpoint); leavingEndpoints.remove(endpoint); + replacementToOriginal.remove(endpoint); removeFromMoving(endpoint); // also removing this endpoint from moving for (Token token : tokens) @@ -297,13 +302,17 @@ public class TokenMetadata public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint) { + addBootstrapTokens(tokens, endpoint, null); + } + + private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original) + { assert tokens != null && !tokens.isEmpty(); assert endpoint != null; lock.writeLock().lock(); try { - InetAddress oldEndpoint; for (Token token : tokens) @@ -313,7 +322,7 @@ public class TokenMetadata throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); oldEndpoint = tokenToEndpointMap.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) + if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original)) throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); } @@ -328,6 +337,43 @@ public class TokenMetadata } } + public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode) + { + assert replacingTokens != null && !replacingTokens.isEmpty(); + assert newNode != null && oldNode != null; + + lock.writeLock().lock(); + try + { + Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode); + if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens)) + { + throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " + + "different set of tokens %s.", newNode, oldNode, oldNodeTokens, + replacingTokens)); + } + + logger.debug("Replacing {} with {}", newNode, oldNode); + replacementToOriginal.put(newNode, oldNode); + + addBootstrapTokens(replacingTokens, newNode, oldNode); + } + finally + { + lock.writeLock().unlock(); + } + } + + public Optional<InetAddress> getReplacementNode(InetAddress endpoint) + { + return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint)); + } + + public Optional<InetAddress> getReplacingNode(InetAddress endpoint) + { + return Optional.fromNullable((replacementToOriginal.get(endpoint))); + } + public void removeBootstrapTokens(Collection<Token> tokens) { assert tokens != null && !tokens.isEmpty(); @@ -391,6 +437,10 @@ public class TokenMetadata tokenToEndpointMap.removeValue(endpoint); topology.removeEndpoint(endpoint); leavingEndpoints.remove(endpoint); + if (replacementToOriginal.remove(endpoint) != null) + { + logger.debug("Node {} failed during replace.", endpoint); + } endpointToHostIdMap.remove(endpoint); sortedTokens = sortTokens(); invalidateCachedRings(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/LoadBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java index 69fa93d..945dd2f 100644 --- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java +++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java @@ -32,7 +32,7 @@ import org.apache.cassandra.gms.*; public class LoadBroadcaster implements IEndpointStateChangeSubscriber { - static final int BROADCAST_INTERVAL = 60 * 1000; + static final int BROADCAST_INTERVAL = Integer.getInteger("cassandra.broadcast_interval_ms", 60 * 1000); public static final LoadBroadcaster instance = new LoadBroadcaster(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 48a291b..9197ab1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -32,6 +32,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.*; @@ -185,6 +186,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false")); private boolean replacing; + private UUID replacingId; private final StreamStateStore streamStateStore = new StreamStateStore(); @@ -194,9 +196,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (logger.isDebugEnabled()) logger.debug("Setting tokens to {}", tokens); SystemKeyspace.updateTokens(tokens); - tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); Collection<Token> localTokens = getLocalTokens(); setGossipTokens(localTokens); + tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); setMode(Mode.NORMAL, false); } @@ -431,11 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // make magic happen Gossiper.instance.doShadowRound(); - UUID hostId = null; // now that we've gossiped at least once, we should be able to find the node we're replacing if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); + replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); try { VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); @@ -443,7 +444,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); - SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc + if (isReplacingSameAddress()) + { + SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc + } Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need return tokens; } @@ -472,7 +476,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // ignore local node or empty status if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null) continue; - String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1); + String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS)); assert (pieces.length > 0); String state = pieces[0]; if (state.equals(VersionedValue.STATUS_BOOTSTRAPPING) || state.equals(VersionedValue.STATUS_LEAVING) || state.equals(VersionedValue.STATUS_MOVING)) @@ -681,8 +685,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (!DatabaseDescriptor.isAutoBootstrap()) throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); bootstrapTokens = prepareReplacementInfo(); - appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens)); - appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); + if (isReplacingSameAddress()) + { + logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " + + "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " + + "repair must be run after the replacement process in order to make this node consistent.", + DatabaseDescriptor.getReplaceAddress()); + appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens)); + appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); + } } else if (shouldBootstrap()) { @@ -799,7 +810,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress())) + if (!isReplacingSameAddress()) { try { @@ -885,17 +896,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (dataAvailable) { - // start participating in the ring. - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - setTokens(bootstrapTokens); + finishJoiningRing(); + // remove the existing info about the replaced node. if (!current.isEmpty()) { for (InetAddress existing : current) Gossiper.instance.replacedEndpoint(existing); } - assert tokenMetadata.sortedTokens().size() > 0; - doAuthSetup(); } else { @@ -908,6 +916,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public static boolean isReplacingSameAddress() + { + return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()); + } + public void gossipSnitchInfo() { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); @@ -933,16 +946,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else if (isSurveyMode) { - setTokens(SystemKeyspace.getSavedTokens()); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); isSurveyMode = false; logger.info("Leaving write survey mode and joining ring at operator request"); - assert tokenMetadata.sortedTokens().size() > 0; - - doAuthSetup(); + finishJoiningRing(); } } + private void finishJoiningRing() + { + // start participating in the ring. + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + setTokens(bootstrapTokens); + + assert tokenMetadata.sortedTokens().size() > 0; + doAuthSetup(); + } + private void doAuthSetup() { maybeAddOrUpdateKeyspace(AuthKeyspace.definition()); @@ -1000,7 +1019,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public boolean isJoined() { - return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()); + return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode; } public void rebuild(String sourceDc) @@ -1122,12 +1141,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { isBootstrapMode = true; SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping - if (!replacing) + + if (!replacing || !isReplacingSameAddress()) { // if not an existing token then bootstrap List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>(); states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); - states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens))); + states.add(Pair.create(ApplicationState.STATUS, replacing? + valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) : + valueFactory.bootstrapping(tokens))); Gossiper.instance.addLocalApplicationStates(states); setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true); Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS); @@ -1138,6 +1160,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress()); } + if (!Gossiper.instance.seenAnySeed()) throw new IllegalStateException("Unable to contact any seeds!"); @@ -1575,14 +1598,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (state == ApplicationState.STATUS) { - String apStateValue = value.value; - String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1); + String[] pieces = splitValue(value); assert (pieces.length > 0); String moveName = pieces[0]; switch (moveName) { + case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE: + handleStateBootreplacing(endpoint, pieces); + break; case VersionedValue.STATUS_BOOTSTRAPPING: handleStateBootstrap(endpoint); break; @@ -1656,6 +1681,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + private static String[] splitValue(VersionedValue value) + { + return value.value.split(VersionedValue.DELIMITER_STR, -1); + } + public void updateTopology(InetAddress endpoint) { if (getTokenMetadata().isMember(endpoint)) @@ -1820,6 +1850,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); } + + private void handleStateBootreplacing(InetAddress newNode, String[] pieces) + { + InetAddress oldNode; + try + { + oldNode = InetAddress.getByName(pieces[1]); + } + catch (Exception e) + { + logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e); + return; + } + + if (FailureDetector.instance.isAlive(oldNode)) + { + throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode)); + } + + Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode); + if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode)) + { + throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.", + newNode, replacingNode.get(), oldNode)); + } + + Collection<Token> tokens = getTokensFor(newNode); + + if (logger.isDebugEnabled()) + logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens); + + tokenMetadata.addReplaceTokens(tokens, newNode, oldNode); + PendingRangeCalculatorService.instance.update(); + + tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode); + } + /** * Handle node move to normal state. That is, node is entering token ring and participating * in reads. @@ -1844,11 +1911,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE endpoint, Gossiper.instance.getEndpointStateForEndpoint(endpoint)); + Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint); + if (replacingNode.isPresent()) + { + assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported"; + logger.info("Node {} will complete replacement of {} for tokens {}", endpoint, replacingNode.get(), tokens); + if (FailureDetector.instance.isAlive(replacingNode.get())) + { + logger.error("Node {} cannot complete replacement of alive node {}.", endpoint, replacingNode.get()); + return; + } + endpointsToRemove.add(replacingNode.get()); + } + + Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint); + if (replacementNode.isPresent()) + { + logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get()); + } + updatePeerInfo(endpoint); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). UUID hostId = Gossiper.instance.getHostId(endpoint); InetAddress existing = tokenMetadata.getEndpointForHostId(hostId); - if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) + if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null + && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); else { @@ -1933,7 +2020,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 } if (!tokensToUpdateInSystemKeyspace.isEmpty()) - SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);; + SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace); if (isMoving || operationMode == Mode.MOVING) { @@ -2058,7 +2145,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE PendingRangeCalculatorService.instance.update(); // find the endpoint coordinating this removal that we need to notify when we're done - String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); + String[] coordinator = splitValue(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR)); UUID hostId = UUID.fromString(coordinator[1]); // grab any data we are now responsible for and notify responsible node restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId));