10089 - 2.2 patch
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bb6bb00 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bb6bb00 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bb6bb00 Branch: refs/heads/trunk Commit: 6bb6bb005197c33fa94026d472ff78d4f36613cc Parents: 87fe1e0 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Nov 11 15:04:25 2015 -0500 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Wed Nov 11 15:04:25 2015 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/EndpointState.java | 76 ++++++--- .../apache/cassandra/gms/FailureDetector.java | 7 +- src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++--- .../apache/cassandra/gms/VersionedValue.java | 5 + .../cassandra/service/StorageService.java | 65 ++++---- .../apache/cassandra/gms/EndpointStateTest.java | 159 +++++++++++++++++++ .../cassandra/locator/CloudstackSnitchTest.java | 4 +- .../apache/cassandra/locator/EC2SnitchTest.java | 4 +- .../locator/GoogleCloudSnitchTest.java | 4 +- 9 files changed, 283 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 0e6985a..931da8d 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -18,7 +18,11 @@ package org.apache.cassandra.gms; import java.io.*; +import java.util.Collections; +import java.util.EnumMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +31,6 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - /** * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState * instance. Any state for a given endpoint can be retrieved from this instance. @@ -42,7 +44,7 @@ public class EndpointState public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer(); private volatile HeartBeatState hbState; - final Map<ApplicationState, VersionedValue> applicationState = new NonBlockingHashMap<ApplicationState, VersionedValue>(); + private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState; /* fields below do not get serialized */ private volatile long updateTimestamp; @@ -50,7 +52,13 @@ public class EndpointState EndpointState(HeartBeatState initialHbState) { + this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class)); + } + + EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) + { hbState = initialHbState; + applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states)); updateTimestamp = System.nanoTime(); isAlive = true; } @@ -68,21 +76,37 @@ public class EndpointState public VersionedValue getApplicationState(ApplicationState key) { - return applicationState.get(key); + return applicationState.get().get(key); } - /** - * TODO replace this with operations that don't expose private state - */ - @Deprecated - public Map<ApplicationState, VersionedValue> getApplicationStateMap() + public Set<Map.Entry<ApplicationState, VersionedValue>> states() + { + return applicationState.get().entrySet(); + } + + public void addApplicationState(ApplicationState key, VersionedValue value) { - return applicationState; + addApplicationStates(Collections.singletonMap(key, value)); } - void addApplicationState(ApplicationState key, VersionedValue value) + public void addApplicationStates(Map<ApplicationState, VersionedValue> values) { - applicationState.put(key, value); + addApplicationStates(values.entrySet()); + } + + public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values) + { + while (true) + { + Map<ApplicationState, VersionedValue> orig = applicationState.get(); + Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig); + + for (Map.Entry<ApplicationState, VersionedValue> value : values) + copy.put(value.getKey(), value.getValue()); + + if (applicationState.compareAndSet(orig, copy)) + return; + } } /* getters and setters */ @@ -133,7 +157,7 @@ public class EndpointState public String toString() { - return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; + return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get(); } } @@ -146,12 +170,12 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState> HeartBeatState.serializer.serialize(hbState, out, version); /* serialize the map of ApplicationState objects */ - int size = epState.applicationState.size(); - out.writeInt(size); - for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet()) + Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states(); + out.writeInt(states.size()); + for (Map.Entry<ApplicationState, VersionedValue> state : states) { - VersionedValue value = entry.getValue(); - out.writeInt(entry.getKey().ordinal()); + VersionedValue value = state.getValue(); + out.writeInt(state.getKey().ordinal()); VersionedValue.serializer.serialize(value, out, version); } } @@ -159,26 +183,28 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState> public EndpointState deserialize(DataInput in, int version) throws IOException { HeartBeatState hbState = HeartBeatState.serializer.deserialize(in, version); - EndpointState epState = new EndpointState(hbState); int appStateSize = in.readInt(); + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); for (int i = 0; i < appStateSize; ++i) { int key = in.readInt(); VersionedValue value = VersionedValue.serializer.deserialize(in, version); - epState.addApplicationState(Gossiper.STATES[key], value); + states.put(Gossiper.STATES[key], value); } - return epState; + + return new EndpointState(hbState, states); } public long serializedSize(EndpointState epState, int version) { long size = HeartBeatState.serializer.serializedSize(epState.getHeartBeatState(), version); - size += TypeSizes.NATIVE.sizeof(epState.applicationState.size()); - for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet()) + Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states(); + size += TypeSizes.NATIVE.sizeof(states.size()); + for (Map.Entry<ApplicationState, VersionedValue> state : states) { - VersionedValue value = entry.getValue(); - size += TypeSizes.NATIVE.sizeof(entry.getKey().ordinal()); + VersionedValue value = state.getValue(); + size += TypeSizes.NATIVE.sizeof(state.getKey().ordinal()); size += VersionedValue.serializer.serializedSize(value, version); } return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index c563872..a0754b1 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -192,15 +192,16 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean { sb.append(" generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n"); sb.append(" heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n"); - for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet()) + for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.states()) { if (state.getKey() == ApplicationState.TOKENS) continue; sb.append(" ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n"); } - if (endpointState.applicationState.containsKey(ApplicationState.TOKENS)) + VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS); + if (tokens != null) { - sb.append(" TOKENS:").append(endpointState.applicationState.get(ApplicationState.TOKENS).version).append(":<hidden>\n"); + sb.append(" TOKENS:").append(tokens.version).append(":<hidden>\n"); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index f78dc7a..86fdab2 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -224,7 +224,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return true; try { - if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value))) + VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP); + if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value))) return true; } catch (UnknownHostException e) @@ -371,8 +372,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int getMaxEndpointStateVersion(EndpointState epState) { int maxVersion = epState.getHeartBeatState().getHeartBeatVersion(); - for (VersionedValue value : epState.getApplicationStateMap().values()) - maxVersion = Math.max(maxVersion, value.version); + for (Map.Entry<ApplicationState, VersionedValue> state : epState.states()) + maxVersion = Math.max(maxVersion, state.getValue().version); return maxVersion; } @@ -525,8 +526,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.info("Advertising removal for {}", endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); - epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); + states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); + epState.addApplicationStates(states); endpointStateMap.put(endpoint, epState); } @@ -853,7 +856,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion, version, forEndpoint); } /* Accumulate all application states whose versions are greater than "version" variable */ - for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + for (Entry<ApplicationState, VersionedValue> entry : epState.states()) { VersionedValue value = entry.getValue(); if (value.version > version) @@ -865,9 +869,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean final ApplicationState key = entry.getKey(); if (logger.isTraceEnabled()) logger.trace("Adding state {}: {}" , key, value.value); - reqdEndpointState.addApplicationState(key, value); + + states.put(key, value); } } + reqdEndpointState.addApplicationStates(states); } return reqdEndpointState; } @@ -1147,19 +1153,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean localState.setHeartBeatState(remoteState.getHeartBeatState()); if (logger.isTraceEnabled()) logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr); - // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - ApplicationState remoteKey = remoteEntry.getKey(); - VersionedValue remoteValue = remoteEntry.getValue(); - assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationState(remoteKey, remoteValue); - } - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { + Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states(); + assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); + localState.addApplicationStates(remoteStates); + + for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates) doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); - } } // notify that a local application state is going to change (doesn't get triggered for remote changes) @@ -1273,7 +1273,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void start(int generationNumber) { - start(generationNumber, new HashMap<ApplicationState, VersionedValue>()); + start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class)); } /** @@ -1285,8 +1285,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); - for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet()) - localState.addApplicationState(entry.getKey(), entry.getValue()); + localState.addApplicationStates(preloadLocalStates); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); @@ -1475,8 +1474,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState localState = oldState == null ? newState : oldState; // always add the version state - localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); - localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); + states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + localState.addApplicationStates(states); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index a142f41..3ea7bb4 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -109,6 +109,11 @@ public class VersionedValue implements Comparable<VersionedValue> return "Value(" + value + "," + version + ")"; } + public byte[] toBytes() + { + return value.getBytes(ISO_8859_1); + } + private static String versionString(String... args) { return StringUtils.join(args, VersionedValue.DELIMITER); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ad209fc..3ea261e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -521,9 +522,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); try { - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) + VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); + if (tokensVersionedValue == null) throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); + Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need @@ -740,7 +742,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (!joined) { - Map<ApplicationState, VersionedValue> appStates = new HashMap<>(); + Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class); if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); @@ -1655,8 +1657,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE handleStateBootstrap(endpoint); break; case VersionedValue.STATUS_NORMAL: + handleStateNormal(endpoint, VersionedValue.STATUS_NORMAL); + break; case VersionedValue.SHUTDOWN: - handleStateNormal(endpoint); + handleStateNormal(endpoint, VersionedValue.SHUTDOWN); break; case VersionedValue.REMOVING_TOKEN: case VersionedValue.REMOVED_TOKEN: @@ -1738,7 +1742,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void updatePeerInfo(InetAddress endpoint) { EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) + for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states()) { switch (entry.getKey()) { @@ -1771,12 +1775,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate) - { - String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value; - return vvalue.getBytes(ISO_8859_1); - } - private void notifyRpcChange(InetAddress endpoint, boolean ready) { if (ready) @@ -1846,7 +1844,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { try { - return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS)))); + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + return Collections.emptyList(); + + VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS); + if (versionedValue == null) + return Collections.emptyList(); + + return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes()))); } catch (IOException e) { @@ -1895,22 +1901,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint node */ - private void handleStateNormal(final InetAddress endpoint) + private void handleStateNormal(final InetAddress endpoint, final String status) { - Collection<Token> tokens; - - tokens = getTokensFor(endpoint); - + Collection<Token> tokens = getTokensFor(endpoint); Set<Token> tokensToUpdateInMetadata = new HashSet<>(); Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>(); Set<InetAddress> endpointsToRemove = new HashSet<>(); - if (logger.isDebugEnabled()) - logger.debug("Node {} state normal, token {}", endpoint, tokens); + logger.debug("Node {} state {}, token {}", endpoint, status, tokens); if (tokenMetadata.isMember(endpoint)) - logger.info("Node {} state jump to normal", endpoint); + logger.info("Node {} state jump to {}", endpoint, status); + + if (tokens.isEmpty() && status.equals(VersionedValue.STATUS_NORMAL)) + logger.error("Node {} is in state normal but it has no tokens, state: {}", + endpoint, + Gossiper.instance.getEndpointStateForEndpoint(endpoint)); updatePeerInfo(endpoint); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). @@ -2021,8 +2028,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private void handleStateLeaving(InetAddress endpoint) { - Collection<Token> tokens; - tokens = getTokensFor(endpoint); + Collection<Token> tokens = getTokensFor(endpoint); if (logger.isDebugEnabled()) logger.debug("Node {} state leaving, tokens {}", endpoint, tokens); @@ -2056,16 +2062,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void handleStateLeft(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Collection<Token> tokens = null; - try - { - tokens = getTokensFor(endpoint); - } - catch (Throwable th) - { - JVMStabilityInspector.inspectThrowable(th); - logger.warn("Unable to calculate tokens for {}.", endpoint); - } + Collection<Token> tokens = getTokensFor(endpoint); if (logger.isDebugEnabled()) logger.debug("Node {} state left, tokens {}", endpoint, tokens); @@ -2154,7 +2151,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); removeEndpoint(endpoint); tokenMetadata.removeEndpoint(endpoint); - if (tokens != null) + if (!tokens.isEmpty()) tokenMetadata.removeBootstrapTokens(tokens); notifyLeft(endpoint); @@ -2358,7 +2355,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void onJoin(InetAddress endpoint, EndpointState epState) { - for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) + for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states()) { onChange(endpoint, entry.getKey(), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/gms/EndpointStateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateTest.java b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java new file mode 100644 index 0000000..b06c435 --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Token; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class EndpointStateTest +{ + public volatile VersionedValue.VersionedValueFactory valueFactory = + new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + @Test + public void testMultiThreadedReadConsistency() throws InterruptedException + { + for (int i = 0; i < 500; i++) + innerTestMultiThreadedReadConsistency(); + } + + /** + * Test that a thread reading values whilst they are updated by another thread will + * not see an entry unless it sees the entry previously added as well, even though + * we are accessing the map via an iterator backed by the underlying map. This + * works because EndpointState copies the map each time values are added. + */ + private void innerTestMultiThreadedReadConsistency() throws InterruptedException + { + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + final List<Token> tokens = Collections.singletonList(token); + final HeartBeatState hb = new HeartBeatState(0); + final EndpointState state = new EndpointState(hb); + final AtomicInteger numFailures = new AtomicInteger(); + + Thread t1 = new Thread(new Runnable() + { + public void run() + { + state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens)); + } + }); + + Thread t2 = new Thread(new Runnable() + { + public void run() + { + for (int i = 0; i < 50; i++) + { + Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class); + for (Map.Entry<ApplicationState, VersionedValue> entry : state.states()) + values.put(entry.getKey(), entry.getValue()); + + if (values.containsKey(ApplicationState.STATUS) && !values.containsKey(ApplicationState.TOKENS)) + { + numFailures.incrementAndGet(); + System.out.println(String.format("Failed: %s", values)); + } + } + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + assertTrue(numFailures.get() == 0); + } + + @Test + public void testMultiThreadWriteConsistency() throws InterruptedException + { + for (int i = 0; i < 500; i++) + innerTestMultiThreadWriteConsistency(); + } + + /** + * Test that two threads can update the state map concurrently. + */ + private void innerTestMultiThreadWriteConsistency() throws InterruptedException + { + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + final List<Token> tokens = Collections.singletonList(token); + final String ip = "127.0.0.1"; + final UUID hostId = UUID.randomUUID(); + final HeartBeatState hb = new HeartBeatState(0); + final EndpointState state = new EndpointState(hb); + + Thread t1 = new Thread(new Runnable() + { + public void run() + { + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + states.put(ApplicationState.STATUS, valueFactory.normal(tokens)); + state.addApplicationStates(states); + } + }); + + Thread t2 = new Thread(new Runnable() + { + public void run() + { + Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.INTERNAL_IP, valueFactory.internalIP(ip)); + states.put(ApplicationState.HOST_ID, valueFactory.hostId(hostId)); + state.addApplicationStates(states); + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + Set<Map.Entry<ApplicationState, VersionedValue>> states = state.states(); + assertEquals(4, states.size()); + + Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class); + for (Map.Entry<ApplicationState, VersionedValue> entry : states) + values.put(entry.getKey(), entry.getValue()); + + assertTrue(values.containsKey(ApplicationState.STATUS)); + assertTrue(values.containsKey(ApplicationState.TOKENS)); + assertTrue(values.containsKey(ApplicationState.INTERNAL_IP)); + assertTrue(values.containsKey(ApplicationState.HOST_ID)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java index d9a4ef1..90e63e0 100644 --- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; @@ -77,9 +78,10 @@ public class CloudstackSnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("ch-zrh")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.rack("2")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("ch-zrh", snitch.getDatacenter(nonlocal)); assertEquals("2", snitch.getRack(nonlocal)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java index 6015adf..56bbb77 100644 --- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java @@ -24,6 +24,7 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; @@ -79,9 +80,10 @@ public class EC2SnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("us-west")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("1a")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("us-west", snitch.getDatacenter(nonlocal)); assertEquals("1a", snitch.getRack(nonlocal)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java index 54ea722..1521454 100644 --- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java @@ -23,6 +23,7 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; @@ -75,9 +76,10 @@ public class GoogleCloudSnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("europe-west1")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("a")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("europe-west1", snitch.getDatacenter(nonlocal)); assertEquals("a", snitch.getRack(nonlocal));