Updated Branches: refs/heads/trunk 36aae612a -> d99a6f2a2
Gossiper.handleMajorStateChange can lose existing node ApplicationState patch by jasobrown; reviewe4d by jbellis for CASSANDRA-5665 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d99a6f2a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d99a6f2a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d99a6f2a Branch: refs/heads/trunk Commit: d99a6f2a2bda290404c6706e7b1db87c59dd04cb Parents: b8b96bd Author: Jason Brown <jasedbr...@gmail.com> Authored: Fri Jun 21 09:23:17 2013 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri Jun 21 09:55:34 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/Gossiper.java | 45 +++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d99a6f2a/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index d8918f3..d1a2ab5 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -874,6 +874,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (logger.isTraceEnabled()) logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep); // major state change will handle the update by inserting the remote state directly + copyNewerApplicationStates(remoteState, localEpStatePtr); handleMajorStateChange(ep, remoteState); } else if (remoteGeneration == localGeneration) // generation has not changed, apply new states @@ -883,8 +884,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int remoteMaxVersion = getMaxEndpointStateVersion(remoteState); if (remoteMaxVersion > localMaxVersion) { - // apply states, but do not notify since there is no major change - applyNewStates(ep, localEpStatePtr, remoteState); + if (logger.isTraceEnabled()) + { + logger.trace("Updating heartbeat state version to " + remoteState.getHeartBeatState().getHeartBeatVersion() + + " from " + localEpStatePtr.getHeartBeatState().getHeartBeatVersion() + " for " + ep); + } + localEpStatePtr.setHeartBeatState(remoteState.getHeartBeatState()); + Map<ApplicationState, VersionedValue> merged = copyNewerApplicationStates(localEpStatePtr, remoteState); + for (Entry<ApplicationState, VersionedValue> appState : merged.entrySet()) + doNotifications(ep, appState.getKey(), appState.getValue()); } else if (logger.isTraceEnabled()) logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); @@ -906,28 +914,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) + private Map<ApplicationState, VersionedValue> copyNewerApplicationStates(EndpointState toState, EndpointState fromState) { - // don't assert here, since if the node restarts the version will go back to zero - int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); - - localState.setHeartBeatState(remoteState.getHeartBeatState()); - if (logger.isTraceEnabled()) - logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); - - // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) + Map<ApplicationState, VersionedValue> merged = new HashMap<ApplicationState, VersionedValue>(); + for (Entry<ApplicationState, VersionedValue> fromEntry : fromState.getApplicationStateMap().entrySet()) { - ApplicationState remoteKey = remoteEntry.getKey(); - VersionedValue remoteValue = remoteEntry.getValue(); + ApplicationState key = fromEntry.getKey(); + VersionedValue value = fromEntry.getValue(); + assert fromState.getHeartBeatState().getGeneration() == toState.getHeartBeatState().getGeneration(); - assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationState(remoteKey, remoteValue); - } - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + if ( (toState.applicationState.containsKey(key) && toState.applicationState.get(key).compareTo(value) < 0) + || !toState.applicationState.containsKey(key) ) + { + if (logger.isTraceEnabled()) + logger.trace("merging {}:{} into ApplicationState", key, value); + toState.addApplicationState(key, value); + merged.put(key, value); + } } + return merged; } // notify that an application state has changed