Gossiper.handleMajorStateChange can lose existing node ApplicationState patch by jasobrown; reviewe4d by jbellis for CASSANDRA-5665
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7e13b89 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7e13b89 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7e13b89 Branch: refs/heads/trunk Commit: b7e13b89c265c28acfb624a984b97a06a837c3ea Parents: 110d283 Author: Jason Brown <jasedbr...@gmail.com> Authored: Fri Jun 21 09:23:17 2013 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri Jun 21 09:23:52 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7e13b89/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index efa9865..6b0bbe9 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -871,6 +871,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (logger.isTraceEnabled()) logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep); // major state change will handle the update by inserting the remote state directly + copyNewerApplicationStates(remoteState, localEpStatePtr); handleMajorStateChange(ep, remoteState); } else if ( remoteGeneration == localGeneration ) // generation has not changed, apply new states @@ -880,11 +881,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int remoteMaxVersion = getMaxEndpointStateVersion(remoteState); if ( remoteMaxVersion > localMaxVersion ) { - // apply states, but do not notify since there is no major change - applyNewStates(ep, localEpStatePtr, remoteState); + if (logger.isTraceEnabled()) + { + logger.trace("Updating heartbeat state version to " + remoteState.getHeartBeatState().getHeartBeatVersion() + + " from " + localEpStatePtr.getHeartBeatState().getHeartBeatVersion() + " for " + ep); + } + localEpStatePtr.setHeartBeatState(remoteState.getHeartBeatState()); + Map<ApplicationState, VersionedValue> merged = copyNewerApplicationStates(localEpStatePtr, remoteState); + for (Entry<ApplicationState, VersionedValue> appState : merged.entrySet()) + doNotifications(ep, appState.getKey(), appState.getValue()); } else if (logger.isTraceEnabled()) - logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); + logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead markAlive(ep, localEpStatePtr); } @@ -903,28 +911,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) + private Map<ApplicationState, VersionedValue> copyNewerApplicationStates(EndpointState toState, EndpointState fromState) { - // don't assert here, since if the node restarts the version will go back to zero - int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); - - localState.setHeartBeatState(remoteState.getHeartBeatState()); - if (logger.isTraceEnabled()) - logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); - - // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) + Map<ApplicationState, VersionedValue> merged = new HashMap<ApplicationState, VersionedValue>(); + for (Entry<ApplicationState, VersionedValue> fromEntry : fromState.getApplicationStateMap().entrySet()) { - ApplicationState remoteKey = remoteEntry.getKey(); - VersionedValue remoteValue = remoteEntry.getValue(); + ApplicationState key = fromEntry.getKey(); + VersionedValue value = fromEntry.getValue(); + assert fromState.getHeartBeatState().getGeneration() == toState.getHeartBeatState().getGeneration(); - assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationState(remoteKey, remoteValue); - } - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + if ( (toState.applicationState.containsKey(key) && toState.applicationState.get(key).compareTo(value) < 0) + || !toState.applicationState.containsKey(key) ) + { + if (logger.isTraceEnabled()) + logger.trace("merging {}:{} into ApplicationState", key, value); + toState.addApplicationState(key, value); + merged.put(key, value); + } } + return merged; } // notify that an application state has changed