Prevent continuous schema exchange between 3.0 to 3.11 nodes patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e646e503 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e646e503 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e646e503 Branch: refs/heads/trunk Commit: e646e5032b68622f7ec1dd0c53137be08baabed9 Parents: f6381db Author: Robert Stupp <sn...@snazy.de> Authored: Wed Dec 13 13:58:53 2017 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Wed Dec 13 13:58:53 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 9 +++ .../org/apache/cassandra/config/Schema.java | 79 ++++++++++++++++++-- src/java/org/apache/cassandra/db/Columns.java | 7 ++ .../cassandra/db/SchemaCheckVerbHandler.java | 9 ++- .../apache/cassandra/db/rows/AbstractRow.java | 10 ++- .../db/rows/RangeTombstoneBoundMarker.java | 7 ++ .../db/rows/RangeTombstoneBoundaryMarker.java | 7 ++ .../apache/cassandra/db/rows/RowIterators.java | 21 +++++- .../apache/cassandra/db/rows/Unfiltered.java | 12 +++ .../org/apache/cassandra/gms/EndpointState.java | 23 ++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++++------- .../cassandra/hints/HintsDispatchTrigger.java | 3 +- .../apache/cassandra/schema/SchemaKeyspace.java | 15 +++- .../cassandra/service/MigrationManager.java | 58 +++++++++----- .../cassandra/service/StorageService.java | 25 +++++++ .../cassandra/utils/CassandraVersion.java | 5 ++ 17 files changed, 300 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eaf312f..60794f0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.2 + * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109) * Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084) * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948) * Remove OpenJDK log warning (CASSANDRA-13916) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 0c32278..f4b15e7 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -25,6 +25,15 @@ Upgrading - Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006 for more details. + - Upgrades from 3.0 might have produced unnecessary schema migrations while + there was at least one 3.0 node in the cluster. It is therefore highly + recommended to upgrade from 3.0 to at least 3.11.2. The root cause of + this schema mismatch was a difference in the way how schema digests were computed + in 3.0 and 3.11.2. To mitigate this issue, 3.11.2 and newer announce + 3.0 compatible digests as long as there is at least one 3.0 node in the + cluster. Once all nodes have been upgraded, the "real" schema version will be + announced. Note: this fix is only necessary in 3.11.2 and therefore only applies + to 3.11. (CASSANDRA-14109) Materialized Views ------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 8fc83df..253a66b 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -22,6 +22,7 @@ import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.locator.LocalStrategy; @@ -40,7 +42,6 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.Pair; -import org.cliffc.high_scale_lib.NonBlockingHashMap; public class Schema { @@ -58,6 +59,7 @@ public class Schema private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>(); private volatile UUID version; + private volatile UUID altVersion; /** * Initialize empty schema object and load the hardcoded system tables @@ -518,30 +520,82 @@ public class Schema /* Version control */ /** - * @return current schema version + * The schema version to announce. + * This will be either the "real" schema version including the {@code cdc} column, + * if no node in the cluster is running at 3.0, or a 3.0 compatible + * schema version, with the {@code cdc} column excluded, if at least one node is + * running 3.0. + * + * @return "current" schema version */ public UUID getVersion() { + return Gossiper.instance.isEnabled() && Gossiper.instance.isAnyNodeOn30() + ? altVersion + : version; + } + + /** + * The 3.11 schema version, always includes the {@code cdc} column. + */ + public UUID getRealVersion() + { return version; } /** + * The "alternative" schema version, compatible to 3.0, always excludes the + * {@code cdc} column. + */ + public UUID getAltVersion() + { + return altVersion; + } + + /** + * Checks whether the given schema version is the same as the current local schema + * version, either the 3.0 compatible or "real" one. + */ + public boolean isSameVersion(UUID schemaVersion) + { + return schemaVersion != null + && (schemaVersion.equals(version) || schemaVersion.equals(altVersion)); + } + + /** + * Checks whether the current schema is empty. + */ + public boolean isEmpty() + { + return SchemaConstants.emptyVersion.equals(version); + } + + /** * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest * will be converted into UUID which would act as content-based version of the schema. + * + * 3.11 note: we calculate the "real" schema version and the 3.0 compatible schema + * version here. */ public void updateVersion() { - version = SchemaKeyspace.calculateSchemaDigest(); - SystemKeyspace.updateSchemaVersion(version); + Pair<UUID, UUID> mixedVersions = SchemaKeyspace.calculateSchemaDigest(); + version = mixedVersions.left; + altVersion = mixedVersions.right; + SystemKeyspace.updateSchemaVersion(getVersion()); } - /* + /** * Like updateVersion, but also announces via gossip + * + * 3.11 note: we announce the "current" schema version, which can be either the 3.0 + * compatible one, if at least one node is still running 3.0, or the "real" schema version. */ public void updateVersionAndAnnounce() { updateVersion(); - MigrationManager.passiveAnnounce(version); + UUID current = getVersion(); + MigrationManager.passiveAnnounce(current, current == getAltVersion()); } /** @@ -785,4 +839,17 @@ public class Schema return transformed; } + + /** + * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null} + * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema. + */ + public static String schemaVersionToString(UUID version) + { + return version == null + ? "unknown" + : SchemaConstants.emptyVersion.equals(version) + ? "(empty)" + : version.toString(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 18729de..965e401 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -367,6 +367,13 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col digest.update(c.name.bytes.duplicate()); } + public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + { + for (ColumnDefinition c : this) + if (!columnsToExclude.contains(c.name.bytes)) + digest.update(c.name.bytes.duplicate()); + } + /** * Apply a function to each column definition in forwards or reversed order. * @param function http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java index 4270a24..be501de 100644 --- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java +++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java @@ -36,7 +36,14 @@ public class SchemaCheckVerbHandler implements IVerbHandler public void doVerb(MessageIn message, int id) { logger.trace("Received schema check request."); - MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer); + + /* + 3.11 is special here: We return the 3.0 compatible version, if the requesting node + is running 3.0. Otherwise the "real" schema version. + */ + MessageOut<UUID> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE, + Schema.instance.getVersion(), + UUIDSerializer.serializer); MessagingService.instance().sendReply(response, id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index 847cb47..13e6502 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -19,7 +19,9 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.AbstractCollection; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -63,6 +65,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme public void digest(MessageDigest digest) { + digest(digest, Collections.emptySet()); + } + + public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + { FBUtilities.updateWithByte(digest, kind().ordinal()); clustering().digest(digest); @@ -70,7 +77,8 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme primaryKeyLivenessInfo().digest(digest); for (ColumnData cd : this) - cd.digest(digest); + if (!columnsToExclude.contains(cd.column.name.bytes)) + cd.digest(digest); } public void validateData(CFMetaData metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index a82bb64..fb94da3 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.Objects; +import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; @@ -132,6 +133,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus deletion.digest(digest); } + @Override + public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + { + digest(digest); + } + public String toString(CFMetaData metadata) { return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 70d6a9d..9190ecf 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.Objects; +import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; @@ -151,6 +152,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C startDeletion.digest(digest); } + @Override + public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + { + digest(digest); + } + public String toString(CFMetaData metadata) { return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index bce6a7d..1463bf5 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.db.rows; +import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ public abstract class RowIterators private RowIterators() {} - public static void digest(RowIterator iterator, MessageDigest digest) + public static void digest(RowIterator iterator, MessageDigest digest, MessageDigest altDigest, Set<ByteBuffer> columnsToExclude) { // TODO: we're not computing digest the same way that old nodes. This is // currently ok as this is only used for schema digest and the is no exchange @@ -48,8 +50,23 @@ public abstract class RowIterators FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder()); iterator.staticRow().digest(digest); + if (altDigest != null) + { + // Compute the "alternative digest" here. + altDigest.update(iterator.partitionKey().getKey().duplicate()); + iterator.columns().regulars.digest(altDigest, columnsToExclude); + iterator.columns().statics.digest(altDigest, columnsToExclude); + FBUtilities.updateWithBoolean(altDigest, iterator.isReverseOrder()); + iterator.staticRow().digest(altDigest, columnsToExclude); + } + while (iterator.hasNext()) - iterator.next().digest(digest); + { + Row row = iterator.next(); + row.digest(digest); + if (altDigest != null) + row.digest(altDigest, columnsToExclude); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/Unfiltered.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java index e75c632..3d8a9b1 100644 --- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.db.rows; +import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.Clusterable; @@ -46,6 +48,16 @@ public interface Unfiltered extends Clusterable public void digest(MessageDigest digest); /** + * Digest the atom using the provided {@code MessageDigest}. + * This method only exists in 3.11. + * Same like {@link #digest(MessageDigest)}, but excludes the given columns from digest calculation. + */ + public default void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude) + { + throw new UnsupportedOperationException("no no no - don't use this one - use digest(MessageDigest) instead"); + } + + /** * Validate the data of this atom. * * @param metadata the metadata for the table this atom is part of. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 70f2a68..674b597 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -22,14 +22,19 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.CassandraVersion; + /** * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState * instance. Any state for a given endpoint can be retrieved from this instance. @@ -154,6 +159,24 @@ public class EndpointState return pieces[0]; } + @Nullable + public UUID getSchemaVersion() + { + VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA); + return applicationState != null + ? UUID.fromString(applicationState.value) + : null; + } + + @Nullable + public CassandraVersion getReleaseVersion() + { + VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION); + return applicationState != null + ? new CassandraVersion(applicationState.value) + : null; + } + public String toString() { return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 14601d7..2dac5c2 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -24,7 +24,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; - +import javax.annotation.Nullable; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -32,9 +32,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.utils.CassandraVersion; -import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,14 +40,17 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.Pair; /** * This module is responsible for Gossiping information for the local endpoint. This abstraction @@ -75,6 +75,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE); static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>(); + static { SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES); @@ -130,6 +131,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); + private volatile boolean anyNodeOn30 = false; // we assume the regular case here - all nodes are on 3.11 private volatile boolean inShadowRound = false; // seeds gathered during shadow round that indicated to be in the shadow round phase as well private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator); @@ -852,20 +854,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return endpointStateMap.get(ep); } - public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as) - { - EndpointState state1 = getEndpointStateForEndpoint(ep1); - EndpointState state2 = getEndpointStateForEndpoint(ep2); - - if (state1 == null || state2 == null) - return false; - - VersionedValue value1 = state1.getApplicationState(as); - VersionedValue value2 = state2.getApplicationState(as); - - return !(value1 == null || value2 == null) && value1.value.equals(value2.value); - } - public Set<Entry<InetAddress, EndpointState>> getEndpointStates() { return endpointStateMap.entrySet(); @@ -1198,6 +1186,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean handleMajorStateChange(ep, remoteState); } } + + boolean any30 = anyEndpointOn30(); + if (any30 != anyNodeOn30) + { + logger.info(any30 + ? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version" + : "There are no 3.0 nodes in the cluster - will store and announce real schema version"); + + anyNodeOn30 = any30; + executor.submit(Schema.instance::updateVersionAndAnnounce); + } + } + + private boolean anyEndpointOn30() + { + return endpointStateMap.values() + .stream() + .map(EndpointState::getReleaseVersion) + .filter(Objects::nonNull) + .anyMatch(CassandraVersion::is30); } private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) @@ -1547,6 +1555,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } + public boolean isAnyNodeOn30() + { + return anyNodeOn30; + } + protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap) { if (inShadowRound) @@ -1629,16 +1642,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return System.currentTimeMillis() + Gossiper.aVeryLongTime; } + @Nullable public CassandraVersion getReleaseVersion(InetAddress ep) { EndpointState state = getEndpointStateForEndpoint(ep); - if (state != null) - { - VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION); - if (applicationState != null) - return new CassandraVersion(applicationState.value); - } - return null; + return state != null ? state.getReleaseVersion() : null; + } + + @Nullable + public UUID getSchemaVersion(InetAddress ep) + { + EndpointState state = getEndpointStateForEndpoint(ep); + return state != null ? state.getSchemaVersion() : null; } public static void waitToSettle() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java index 47d986f..cc1c221 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java @@ -19,6 +19,7 @@ package org.apache.cassandra.hints; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; @@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable .filter(store -> !isScheduled(store)) .filter(HintsStore::isLive) .filter(store -> store.isWriting() || store.hasFiles()) - .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA)) + .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address()))) .forEach(this::schedule); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 7834b12..b6add96 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -311,18 +311,24 @@ public final class SchemaKeyspace /** * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest * will be converted into UUID which would act as content-based version of the schema. + * + * This implementation is special cased for 3.11 as it returns the schema digests for 3.11 + * <em>and</em> 3.0 - i.e. with and without the beloved {@code cdc} column. */ - public static UUID calculateSchemaDigest() + public static Pair<UUID, UUID> calculateSchemaDigest() { MessageDigest digest; + MessageDigest digest30; try { digest = MessageDigest.getInstance("MD5"); + digest30 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } + Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc")); for (String table : ALL_FOR_DIGEST) { @@ -340,12 +346,15 @@ public final class SchemaKeyspace try (RowIterator partition = schema.next()) { if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) - RowIterators.digest(partition, digest); + { + RowIterators.digest(partition, digest, digest30, cdc); + } } } } } - return UUID.nameUUIDFromBytes(digest.digest()); + + return Pair.create(UUID.nameUUIDFromBytes(digest.digest()), UUID.nameUUIDFromBytes(digest30.digest())); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 3332d2c..a1b3597 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -31,7 +31,6 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.cql3.functions.UDFunction; @@ -78,10 +77,9 @@ public class MigrationManager public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) { - VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); - - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); + UUID schemaVersion = state.getSchemaVersion(); + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null) + maybeScheduleSchemaPull(schemaVersion, endpoint); } /** @@ -90,16 +88,37 @@ public class MigrationManager */ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) { - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) + if (Schema.instance.getVersion() == null) + { + logger.debug("Not pulling schema from {}, because local schama version is not known yet", + endpoint); + return; + } + if (Schema.instance.isSameVersion(theirVersion)) + { + logger.debug("Not pulling schema from {}, because schema versions match: " + + "local/real={}, local/compatible={}, remote={}", + endpoint, + Schema.schemaVersionToString(Schema.instance.getRealVersion()), + Schema.schemaVersionToString(Schema.instance.getAltVersion()), + Schema.schemaVersionToString(theirVersion)); + return; + } + if (!shouldPullSchemaFrom(endpoint)) { logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); return; } - if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) { // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately - logger.debug("Submitting migration task for {}", endpoint); + logger.debug("Immediately submitting migration task for {}, " + + "schema versions: local/real={}, local/compatible={}, remote={}", + endpoint, + Schema.schemaVersionToString(Schema.instance.getRealVersion()), + Schema.schemaVersionToString(Schema.instance.getAltVersion()), + Schema.schemaVersionToString(theirVersion)); submitMigrationTask(endpoint); } else @@ -109,20 +128,22 @@ public class MigrationManager Runnable runnable = () -> { // grab the latest version of the schema since it may have changed again since the initial scheduling - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (epState == null) + UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint); + if (epSchemaVersion == null) { logger.debug("epState vanished for {}, not submitting migration task", endpoint); return; } - VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) + if (Schema.instance.isSameVersion(epSchemaVersion)) { - logger.debug("not submitting migration task for {} because our versions match", endpoint); + logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion); return; } - logger.debug("submitting migration task for {}", endpoint); + logger.debug("submitting migration task for {}, schema version mismatch: local/real={}, local/compatible={}, remote={}", + endpoint, + Schema.schemaVersionToString(Schema.instance.getRealVersion()), + Schema.schemaVersionToString(Schema.instance.getAltVersion()), + Schema.schemaVersionToString(epSchemaVersion)); submitMigrationTask(endpoint); }; ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); @@ -585,11 +606,14 @@ public class MigrationManager * Used to notify nodes as they arrive in the cluster. * * @param version The schema version to announce + * @param compatible flag whether {@code version} is a 3.0 compatible version */ - public static void passiveAnnounce(UUID version) + public static void passiveAnnounce(UUID version, boolean compatible) { Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); - logger.debug("Gossiping my schema version {}", version); + logger.debug("Gossiping my {} schema version {}", + compatible ? "3.0 compatible" : "3.11", + Schema.schemaVersionToString(version)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 15027b2..c5e2912 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -831,6 +831,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public void waitForSchema(int delay) + { + logger.debug("Waiting for schema (max {} seconds)", delay); + // first sleep the delay to make sure we see all our peers + for (int i = 0; i < delay; i += 1000) + { + // if we see schema, we can proceed to the next check directly + if (!Schema.instance.isEmpty()) + { + logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion()); + break; + } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + // if our schema hasn't matched yet, wait until it has + // we do this by waiting for all in-flight migration requests and responses to complete + // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) + if (!MigrationManager.isReadyForBootstrap()) + { + setMode(Mode.JOINING, "waiting for schema information to complete", true); + MigrationManager.waitUntilReadyForBootstrap(); + } + logger.info("Has schema with version {}", Schema.instance.getVersion()); + } + private void joinTokenRing(int delay) throws ConfigurationException { joined = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/utils/CassandraVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java index aed0fe7..bf9fe6a 100644 --- a/src/java/org/apache/cassandra/utils/CassandraVersion.java +++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java @@ -118,6 +118,11 @@ public class CassandraVersion implements Comparable<CassandraVersion> return compareIdentifiers(build, other.build, -1); } + public boolean is30() + { + return major == 3 && minor == 0; + } + /** * Returns a version that is backward compatible with this version amongst a list * of provided version, or null if none can be found. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org