Rework node replacement. Patch by brandonwilliams, reviewed by Tyle Hobbs for CASSANDRA-5916
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351d43ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351d43ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351d43ef Branch: refs/heads/trunk Commit: 351d43ef120a725cf83d29f80fd243ad9fc30fc2 Parents: c5368c7 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Oct 17 20:59:36 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Oct 17 20:59:36 2013 -0500 ---------------------------------------------------------------------- NEWS.txt | 8 +- .../cassandra/config/DatabaseDescriptor.java | 17 ++- .../org/apache/cassandra/db/SystemTable.java | 9 +- .../gms/GossipDigestAckVerbHandler.java | 10 +- .../gms/GossipDigestSynVerbHandler.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 69 ++++++++++- .../cassandra/service/StorageService.java | 122 ++++++++++++------- 7 files changed, 182 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 000986f..a676aa1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -14,15 +14,15 @@ restore snapshots created with the previous major version using the using the provided 'sstableupgrade' tool. -1.2.12 +1.2.11 ====== Features -------- - Added a new consistenct level, LOCAL_ONE, that forces all CL.ONE operations to execute only in the local datacenter. - -1.2.11 -====== + - New replace_address to supplant the (now removed) replace_token and + replace_node workflows to replace a dead node in place. Works like the + old options, but takes the IP address of the node to be replaced. Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 633ea9a..65a20cc 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -724,6 +724,21 @@ public class DatabaseDescriptor return conf.num_tokens; } + public static InetAddress getReplaceAddress() + { + try + { + if (System.getProperty("cassandra.replace_address", null) != null) + return InetAddress.getByName(System.getProperty("cassandra.replace_address", null)); + else + return null; + } + catch (UnknownHostException e) + { + return null; + } + } + public static Collection<String> getReplaceTokens() { return tokensFromString(System.getProperty("cassandra.replace_token", null)); @@ -742,7 +757,7 @@ public class DatabaseDescriptor public static boolean isReplacing() { - return 0 != getReplaceTokens().size() || getReplaceNode() != null; + return getReplaceAddress() != null; } public static String getClusterName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index a87ab50..432a434 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -654,8 +654,15 @@ public class SystemTable // ID not found, generate a new one, persist, and then return it. hostId = UUID.randomUUID(); logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId); + return setLocalHostId(hostId); + } - req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)"; + /** + * Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node. + */ + public static UUID setLocalHostId(UUID hostId) + { + String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)"; processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId)); return hostId; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 2a03ff2..b2af3a2 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@ -39,7 +39,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestAckMessage from {}", from); - if (!Gossiper.instance.isEnabled()) + if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled"); @@ -49,6 +49,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> GossipDigestAck gDigestAckMessage = message.payload; List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); + logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); if ( epStateMap.size() > 0 ) { @@ -58,6 +59,13 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> } Gossiper.instance.checkSeedContact(from); + if (Gossiper.instance.isInShadowRound()) + { + if (logger.isDebugEnabled()) + logger.debug("Finishing shadow round with {}", from); + Gossiper.instance.finishShadowRound(); + return; // don't bother doing anything else, we have what we came for + } /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index 61d21ed..476cb72 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -76,7 +76,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); - + logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size()); MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index acf40f3..cbb7d80 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -107,6 +107,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // have we ever in our lifetime reached a seed? private boolean seedContacted = false; + private boolean inShadowRound = false; + private class GossipTask implements Runnable { public void run() @@ -662,6 +664,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return endpointStateMap.get(ep); } + // removes ALL endpoint states; should only be called after shadow gossip + public void resetEndpointStateMap() + { + endpointStateMap.clear(); + } + public Set<Entry<InetAddress, EndpointState>> getEndpointStates() { return endpointStateMap.entrySet(); @@ -874,7 +882,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) { InetAddress ep = entry.getKey(); - if ( ep.equals(FBUtilities.getBroadcastAddress())) + if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound()) continue; if (justRemovedEndpoints.containsKey(ep)) { @@ -989,6 +997,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean */ void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap) { + if (gDigestList.size() == 0) + { + /* we've been sent a *completely* empty syn, which should normally never happen since an endpoint will at least send a syn with itself. + If this is happening then the node is attempting shadow gossip, and we should reply with everything we know. + */ + logger.debug("Shadow request received, adding all states"); + for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet()) + { + gDigestList.add(new GossipDigest(entry.getKey(), 0, 0)); + } + } for ( GossipDigest gDigest : gDigestList ) { int remoteGeneration = gDigest.getGeneration(); @@ -1074,6 +1093,43 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean TimeUnit.MILLISECONDS); } + /** + * Do a single 'shadow' round of gossip, where we do not modify any state + * Only used when replacing a node, to get and assume its states + */ + public void doShadowRound() + { + buildSeedsList(); + // send a completely empty syn + List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), + DatabaseDescriptor.getPartitionerName(), + gDigests); + MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN, + digestSynMessage, + GossipDigestSyn.serializer); + inShadowRound = true; + for (InetAddress seed : seeds) + MessagingService.instance().sendOneWay(message, seed); + int slept = 0; + try + { + while (true) + { + Thread.sleep(1000); + if (!inShadowRound) + break; + slept += 1000; + if (slept > StorageService.RING_DELAY) + throw new RuntimeException("Unable to gossip with any seeds"); + } + } + catch (InterruptedException wtf) + { + throw new RuntimeException(wtf); + } + } + private void buildSeedsList() { for (InetAddress seed : DatabaseDescriptor.getSeeds()) @@ -1154,6 +1210,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } + protected void finishShadowRound() + { + if (inShadowRound) + inShadowRound = false; + } + + protected boolean isInShadowRound() + { + return inShadowRound; + } + @VisibleForTesting public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 50719fd..d2c69d0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -387,6 +387,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return initialized; } + public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException + { + logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); + MessagingService.instance().listen(FBUtilities.getLocalAddress()); + + // make magic happen + Gossiper.instance.doShadowRound(); + + Collection<Token> tokens = new ArrayList<Token>(); + UUID hostId = null; + // now that we've gossiped at least once, we should be able to find the node we're replacing + if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) + throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); + hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); + try + { + if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) + throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); + tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + SystemTable.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc + MessagingService.instance().shutdown(); + Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need + return tokens; + } + public synchronized void initClient() throws IOException, ConfigurationException { // We don't wait, because we're going to actually try to work on @@ -561,22 +591,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void joinTokenRing(int delay) throws ConfigurationException { - logger.info("Starting up server gossip"); joined = true; - // Seed the host ID-to-endpoint map with our own ID. - getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress()); + Collection<Token> tokens = null; + Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>(); + if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null) + throw new RuntimeException("Replace method removed; use cassandra.replace_address instead"); + if (DatabaseDescriptor.isReplacing()) + { + if (!DatabaseDescriptor.isAutoBootstrap()) + throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); + tokens = prepareReplacementInfo(); + appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); + appStates.put(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + } // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) - Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>(); + // Seed the host ID-to-endpoint map with our own ID. + getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId())); appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress())); - if (DatabaseDescriptor.isReplacing()) - appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); + logger.info("Starting up server gossip"); Gossiper.instance.register(this); Gossiper.instance.register(migrationManager); Gossiper.instance.start(SystemTable.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. @@ -602,7 +641,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. Set<InetAddress> current = new HashSet<InetAddress>(); - Collection<Token> tokens; logger.debug("Bootstrap variables: {} {} {} {}", new Object[]{ DatabaseDescriptor.isAutoBootstrap(), SystemTable.bootstrapInProgress(), @@ -667,50 +705,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - if (DatabaseDescriptor.getReplaceTokens().size() != 0 && DatabaseDescriptor.getReplaceNode() != null) - throw new UnsupportedOperationException("You cannot specify both replace_token and replace_node, choose one or the other"); - try - { - // Sleeping additionally to make sure that the server actually is not alive - // and giving it more time to gossip if alive. - Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL); - } - catch (InterruptedException e) + if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress())) { - throw new AssertionError(e); - } - tokens = new ArrayList<Token>(); - if (DatabaseDescriptor.getReplaceTokens().size() !=0) - { - for (String token : DatabaseDescriptor.getReplaceTokens()) - tokens.add(StorageService.getPartitioner().getTokenFactory().fromString(token)); - } - else - { - assert DatabaseDescriptor.getReplaceNode() != null; - InetAddress endpoint = tokenMetadata.getEndpointForHostId(DatabaseDescriptor.getReplaceNode()); - if (endpoint == null) - throw new UnsupportedOperationException("Cannot replace host id " + DatabaseDescriptor.getReplaceNode() + " because it does not exist!"); - tokens = tokenMetadata.getTokens(endpoint); - } - - // check for operator errors... - for (Token token : tokens) - { - InetAddress existing = tokenMetadata.getEndpoint(token); - if (existing != null) + try { - if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay)) - throw new UnsupportedOperationException("Cannnot replace a token for a Live node... "); - current.add(existing); + // Sleep additionally to make sure that the server actually is not alive + // and giving it more time to gossip if alive. + Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL); } - else + catch (InterruptedException e) { - throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!"); + throw new AssertionError(e); } - } - setMode(Mode.JOINING, "Replacing a node with token: " + tokens, true); + // check for operator errors... + for (Token token : tokens) + { + InetAddress existing = tokenMetadata.getEndpoint(token); + if (existing != null) + { + if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay)) + throw new UnsupportedOperationException("Cannnot replace a live node... "); + current.add(existing); + } + else + { + throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!"); + } + } + } + setMode(Mode.JOINING, "Replacing a node with token(s): " + tokens, true); } bootstrap(tokens); @@ -1355,7 +1379,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). if (Gossiper.instance.usesHostId(endpoint)) - tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); + { + UUID hostId = Gossiper.instance.getHostId(endpoint); + if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) + logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); + else + tokenMetadata.updateHostId(hostId, endpoint); + } Set<Token> tokensToUpdateInMetadata = new HashSet<Token>(); Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();