krummas commented on code in PR #4613: URL: https://github.com/apache/cassandra/pull/4613#discussion_r2811093721
########## src/java/org/apache/cassandra/tcm/CMSMembership.java: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.btree.BTreeSet; + +public class CMSMembership implements MetadataValue<CMSMembership> +{ + public static final Serializer serializer = new Serializer(); + public static final CMSMembership EMPTY = new CMSMembership(); + + private final Epoch lastModified; + private final BTreeSet<NodeId> fullMembers; + private final BTreeSet<NodeId> joiningMembers; + + /** + * Used to derive a CMSMembership when deserializing a ClusterMetadata instance written with a metadata version + * prior to V7. At that time, CMS membership was always inferred from the data placements of the distributed Review Comment: "prior to V9" I think? ########## src/java/org/apache/cassandra/tcm/CMSLookup.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.Pair; + +public class CMSLookup +{ + private static final Logger logger = LoggerFactory.getLogger(CMSLookup.class); + + public enum State { PRE_INIT, ACTIVE, RETIRED }; + + public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, Epoch.EMPTY, new HashMap<>()); + public static InitialBuilder builder(ClusterMetadata metadata) + { + return new InitialBuilder(metadata); + } + + private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides; + private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap; Review Comment: `addressMap` is only used in the `toString` method ########## src/java/org/apache/cassandra/tcm/CMSLookup.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.Pair; + +public class CMSLookup +{ + private static final Logger logger = LoggerFactory.getLogger(CMSLookup.class); + + public enum State { PRE_INIT, ACTIVE, RETIRED }; + + public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, Epoch.EMPTY, new HashMap<>()); + public static InitialBuilder builder(ClusterMetadata metadata) + { + return new InitialBuilder(metadata); + } + + private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides; + private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap; + private final Epoch lastModified; + private final State state; + + private CMSLookup(State state, Epoch epoch, Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides) + { + this.state = state; + this.lastModified = epoch; + this.addressMap = HashBiMap.create(overrides.size()); + this.overrides = Maps.newHashMapWithExpectedSize(overrides.size()); + for (Map.Entry<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> e : overrides.entrySet()) + { + this.overrides.put(e.getKey(), e.getValue()); + this.addressMap.put(e.getValue().left, e.getValue().right); + } + } + + public boolean isUninitialized() + { + return state == State.PRE_INIT; + } + + public boolean isActive() + { + return state == State.ACTIVE; + } + + public InetAddressAndPort getAddressOverride(NodeId id) Review Comment: unused ########## src/java/org/apache/cassandra/tcm/Startup.java: ########## @@ -184,6 +239,128 @@ public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapPro } } + + /** + * If the broadcast address of this node has changed, we must verify the endpoints it knows for + * the members of the CMS are still reachable and valid. This is necessary for the node to submit + * a STARTUP transformation which updates its broadcast address in ClusterMetadata. + * + * If the node is itself a CMS member, it is also a requirement to be able to contact a + * majority of the other CMS members in order to perform the serial reads and writes which + * constitute committing to and fetching from the distributed metadata log. + * + * To do this, we use a simple protocol: + * 1. For each CMS member in our replayed ClusterMetadata, ping the associated broadcast address + * to query for id of the node at that address. This determines whether the endpoint still + * belongs to that same node (which is/was a CMS member). + * 2. While we don't have confirmed current addresses for a majority of CMS nodes: + * 2a. Run discovery to locate as many peer addresses as possible. + * 2b. Query every discovered endpoint and ask for its node id. + * If we still don't have confirmed addresses for a majority of CMS members, go to 2a and + * repeat as peers may themselves still be starting up and so may have become discoverable. + * + * This process builds up a mapping of id -> current address for CMS members which can then be + * used to construct a set of temporary redirects between addresses according to ClusterMetadata + * and the newly discovered ones. + * + * As each CMS node with a changed address goes through the startup process, it will commit its + * STARTUP transformation and the new broadcast address will be found in ClusterMetadata. A log + * listener is used to react to these transformations by removing redundant address overrides + * as they are enacted. + * + * @param nodeId derived from the persisted id of this node from the system.peers table + * @param replayed current ClusterMetadata after replaying the metadata log for startup + */ + private static ClusterMetadata initializeCMSLookup(NodeId nodeId, ClusterMetadata replayed) + { + InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId); + InetAddressAndPort newAddress = FBUtilities.getBroadcastAddressAndPort(); + if (newAddress.equals(oldAddress)) + return replayed; + + Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>(); + replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, replayed.directory.endpoint(id))); + Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>(); + + Set<InetAddressAndPort> candidates = new HashSet<>(previousCMS.values()); + candidates.add(newAddress); Review Comment: any reason we don't add the seeds to candidates here? Feels like it could save us a discovery round ########## src/java/org/apache/cassandra/tcm/CMSLookup.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.Pair; + +public class CMSLookup +{ + private static final Logger logger = LoggerFactory.getLogger(CMSLookup.class); + + public enum State { PRE_INIT, ACTIVE, RETIRED }; + + public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, Epoch.EMPTY, new HashMap<>()); + public static InitialBuilder builder(ClusterMetadata metadata) + { + return new InitialBuilder(metadata); + } + + private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides; + private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap; + private final Epoch lastModified; + private final State state; + + private CMSLookup(State state, Epoch epoch, Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides) + { + this.state = state; + this.lastModified = epoch; + this.addressMap = HashBiMap.create(overrides.size()); + this.overrides = Maps.newHashMapWithExpectedSize(overrides.size()); + for (Map.Entry<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> e : overrides.entrySet()) + { + this.overrides.put(e.getKey(), e.getValue()); + this.addressMap.put(e.getValue().left, e.getValue().right); + } + } + + public boolean isUninitialized() + { + return state == State.PRE_INIT; + } + + public boolean isActive() + { + return state == State.ACTIVE; + } + + public InetAddressAndPort getAddressOverride(NodeId id) + { + Pair<InetAddressAndPort, InetAddressAndPort> override = overrides.get(id); + return override != null ? override.right : null; + } + + public EndpointLookup asNodeLookup(EndpointLookup lookup) + { + return new EndpointLookup() + { + @Override + public InetAddressAndPort endpoint(NodeId id) + { + if (overrides.containsKey(id)) + return overrides.get(id).right; + return lookup.endpoint(id); + } + }; + } + + public CMSLookup rebuild(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) + { + logger.debug("Rebuilding CMS lookup {} with metadata from epoch {}", this, next.epoch.getEpoch()); + + // All address changes have been enacted, nothing to do + if (state == State.RETIRED) + return this; + + if (!next.epoch.isEqualOrBefore(Epoch.FIRST) + && !fromSnapshot + && next.directory.lastModified().equals(prev.directory.lastModified())) + return this; + + // Filters from the override list those which are no longer necessary as a transformation has now + // replaced the old address with the new one for that node id + Predicate<Map.Entry<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>>> overrideNowEnacted = entry -> { + NodeId nodeId = entry.getKey(); + if (!Objects.equals(prev.directory.getNodeAddresses(nodeId), next.directory.getNodeAddresses(nodeId))) + { + Pair<InetAddressAndPort, InetAddressAndPort> override = overrides.get(nodeId); + if (override != null) + { + // If there was an override for this nodeId && the old/new addresses match the prev/next directory + // entries, filter the override from the map which will be used to build the new version + return !override.left.equals(prev.directory.endpoint(nodeId)) + || !override.right.equals(next.directory.endpoint(nodeId)); + } + } + return true; + }; + + logger.debug("Current endpoint overrides: {}", overrides); + Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> nextOverrides + = overrides.entrySet() + .stream() + .filter(overrideNowEnacted) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + logger.debug("Proposed endpoint overrides: {}", nextOverrides); + State state = nextOverrides.isEmpty() ? State.RETIRED : State.ACTIVE; Review Comment: should we remove the log listener if `state == retired`? ########## src/java/org/apache/cassandra/tcm/Discovery.java: ########## @@ -94,12 +118,14 @@ public Discovery(Supplier<MessageDelivery> messaging, Supplier<Set<InetAddressAn public DiscoveredNodes discover() { - return discover(5); + return discover(5, false); } - public DiscoveredNodes discover(int rounds) + public DiscoveredNodes discover(int rounds, boolean allPeers) { boolean res = state.compareAndSet(State.NOT_STARTED, State.IN_PROGRESS); + if (!res) Review Comment: I think we could replace state with an `inProgress` `AtomicBoolean` - `State.FOUND_CMS` is not used and `State.FINISHED` and `State.NOT_STARTED` are equivalent with this change ########## src/java/org/apache/cassandra/tcm/ClusterMetadata.java: ########## @@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier, this.schema = schema; this.directory = directory; this.tokenMap = tokenMap; - this.placements = placements; this.accordFastPath = accordFastPath; + this.placements = placements; this.lockedRanges = lockedRanges; this.inProgressSequences = inProgressSequences; this.consensusMigrationState = consensusMigrationState; this.extensions = ImmutableMap.copyOf(extensions); this.locator = Locator.usingDirectory(directory); this.accordStaleReplicas = accordStaleReplicas; + this.cmsMembership = cmsMembership; + // Build CMS placement using no-op CMS lookup, i.e. using only committed node addresses + this.cmsDataPlacement = calculateCMSPlacement(placements, cmsMembership, CMSLookup.NO_OP); } - public Set<InetAddressAndPort> fullCMSMembers() + public Set<NodeId> fullCMSMemberIds() { - if (fullCMSEndpoints == null) - this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); - return fullCMSEndpoints; + return cmsMembership.fullMembers(); } - public Set<NodeId> fullCMSMemberIds() + public boolean isCMSMember(InetAddressAndPort endpoint) Review Comment: most of the callers of this method use `FBUtilities.getBroadcastAddressAndPort()` - should we add a convenience method to look this up using `myNodeId()` and `cmsMembership`? Or maybe set a boolean in `cmsMembership` constructor to avoid the lookups ########## src/java/org/apache/cassandra/tcm/CMSMembership.java: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.btree.BTreeSet; + +public class CMSMembership implements MetadataValue<CMSMembership> +{ + public static final Serializer serializer = new Serializer(); + public static final CMSMembership EMPTY = new CMSMembership(); + + private final Epoch lastModified; + private final BTreeSet<NodeId> fullMembers; + private final BTreeSet<NodeId> joiningMembers; + + /** + * Used to derive a CMSMembership when deserializing a ClusterMetadata instance written with a metadata version + * prior to V7. At that time, CMS membership was always inferred from the data placements of the distributed + * cluster metadata keyspace. Read replicas are full members of the CMS and write-only replicas are in the process + * of joining. Note: every read replica must also be a write replica, leaving the CMS is atomic in respect of the + * placements. + * @param placement + * @param directory + * @return + */ + public static CMSMembership reconstruct(DataPlacement placement, Directory directory) + { + SortedSet<NodeId> full = new TreeSet<>(); + SortedSet<NodeId> joining = new TreeSet<>(); + Epoch lm = Epoch.EMPTY; + for (VersionedEndpoints.ForRange endpoints : placement.reads.endpoints) + { + lm = endpoints.lastModified().isAfter(lm) ? endpoints.lastModified() : lm; + endpoints.get().endpoints().forEach(e -> full.add(directory.peerId(e))); + } + + for (VersionedEndpoints.ForRange endpoints : placement.writes.endpoints) + { + lm = endpoints.lastModified().isAfter(lm) ? endpoints.lastModified() : lm; + endpoints.get().endpoints().forEach(e -> { + NodeId id = directory.peerId(e); + if (!full.contains(id)) + joining.add(id); + }); + } + return new CMSMembership(lm, BTreeSet.of(full), BTreeSet.of(joining)); + } + + public DataPlacement toPlacement(EndpointLookup lookup) + { + DataPlacement.Builder builder = DataPlacement.builder(); + for (NodeId id : fullMembers) + { + Replica replica = MetaStrategy.replica(lookup.endpoint(id)); + builder.withReadReplica(lastModified, replica); + builder.withWriteReplica(lastModified, replica); + } + for(NodeId id : joiningMembers) + { + builder.withWriteReplica(lastModified, MetaStrategy.replica(lookup.endpoint(id))); + } + return builder.build(); + } + + private CMSMembership() + { + this(Epoch.EMPTY, + BTreeSet.empty(NodeId::compareTo), + BTreeSet.empty(NodeId::compareTo)); + } + + private CMSMembership(Epoch lastModified, BTreeSet<NodeId> fullMembers, BTreeSet<NodeId> joiningMembers) + { + this.lastModified = lastModified; + this.fullMembers = fullMembers; + this.joiningMembers = joiningMembers; + } + + + @Override + public CMSMembership withLastModified(Epoch epoch) + { + return lastModified.is(epoch) ? this : new CMSMembership(epoch, fullMembers, joiningMembers); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + public ImmutableSet<NodeId> joiningMembers() + { + return ImmutableSet.copyOf(joiningMembers); Review Comment: nit; BTreeSet is already immutable, we probably don't need to copy it ########## src/java/org/apache/cassandra/tcm/ClusterMetadata.java: ########## @@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier, this.schema = schema; this.directory = directory; this.tokenMap = tokenMap; - this.placements = placements; this.accordFastPath = accordFastPath; + this.placements = placements; this.lockedRanges = lockedRanges; this.inProgressSequences = inProgressSequences; this.consensusMigrationState = consensusMigrationState; this.extensions = ImmutableMap.copyOf(extensions); this.locator = Locator.usingDirectory(directory); this.accordStaleReplicas = accordStaleReplicas; + this.cmsMembership = cmsMembership; + // Build CMS placement using no-op CMS lookup, i.e. using only committed node addresses + this.cmsDataPlacement = calculateCMSPlacement(placements, cmsMembership, CMSLookup.NO_OP); } - public Set<InetAddressAndPort> fullCMSMembers() + public Set<NodeId> fullCMSMemberIds() { - if (fullCMSEndpoints == null) - this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); - return fullCMSEndpoints; + return cmsMembership.fullMembers(); } - public Set<NodeId> fullCMSMemberIds() + public boolean isCMSMember(InetAddressAndPort endpoint) { - if (fullCMSIds == null) - this.fullCMSIds = placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet()); - return fullCMSIds; + if (epoch.isAfter(Epoch.FIRST)) + return fullCMSMembers().contains(endpoint); + + // special case to handle initialization of the CMS for the first time + return epoch.isEqualOrBefore(Epoch.FIRST) && cmsDataPlacement.reads.byEndpoint() Review Comment: nit; guess we can remove the `isEqualOrBefore` check here ########## src/java/org/apache/cassandra/tcm/CMSLookup.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.Pair; + +public class CMSLookup +{ + private static final Logger logger = LoggerFactory.getLogger(CMSLookup.class); + + public enum State { PRE_INIT, ACTIVE, RETIRED }; + + public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, Epoch.EMPTY, new HashMap<>()); + public static InitialBuilder builder(ClusterMetadata metadata) + { + return new InitialBuilder(metadata); + } + + private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> overrides; Review Comment: this should probably be an `ImmutableMap` for clarity? and if we make `InitialBuilder` and `rebuild` below build immutablemaps we can avoid the copying ########## src/java/org/apache/cassandra/tcm/CMSMembership.java: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.btree.BTreeSet; + +public class CMSMembership implements MetadataValue<CMSMembership> +{ + public static final Serializer serializer = new Serializer(); + public static final CMSMembership EMPTY = new CMSMembership(); + + private final Epoch lastModified; + private final BTreeSet<NodeId> fullMembers; + private final BTreeSet<NodeId> joiningMembers; + + /** + * Used to derive a CMSMembership when deserializing a ClusterMetadata instance written with a metadata version + * prior to V7. At that time, CMS membership was always inferred from the data placements of the distributed + * cluster metadata keyspace. Read replicas are full members of the CMS and write-only replicas are in the process + * of joining. Note: every read replica must also be a write replica, leaving the CMS is atomic in respect of the + * placements. + * @param placement + * @param directory + * @return + */ + public static CMSMembership reconstruct(DataPlacement placement, Directory directory) + { + SortedSet<NodeId> full = new TreeSet<>(); Review Comment: we could probably build the BTreeSets directly and avoid the copying below? ########## src/java/org/apache/cassandra/tcm/CMSMembership.java: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.EndpointLookup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.btree.BTreeSet; + +public class CMSMembership implements MetadataValue<CMSMembership> +{ + public static final Serializer serializer = new Serializer(); + public static final CMSMembership EMPTY = new CMSMembership(); + + private final Epoch lastModified; + private final BTreeSet<NodeId> fullMembers; + private final BTreeSet<NodeId> joiningMembers; + + /** + * Used to derive a CMSMembership when deserializing a ClusterMetadata instance written with a metadata version + * prior to V7. At that time, CMS membership was always inferred from the data placements of the distributed + * cluster metadata keyspace. Read replicas are full members of the CMS and write-only replicas are in the process + * of joining. Note: every read replica must also be a write replica, leaving the CMS is atomic in respect of the + * placements. + * @param placement + * @param directory + * @return + */ + public static CMSMembership reconstruct(DataPlacement placement, Directory directory) + { + SortedSet<NodeId> full = new TreeSet<>(); + SortedSet<NodeId> joining = new TreeSet<>(); + Epoch lm = Epoch.EMPTY; + for (VersionedEndpoints.ForRange endpoints : placement.reads.endpoints) + { + lm = endpoints.lastModified().isAfter(lm) ? endpoints.lastModified() : lm; + endpoints.get().endpoints().forEach(e -> full.add(directory.peerId(e))); + } + + for (VersionedEndpoints.ForRange endpoints : placement.writes.endpoints) + { + lm = endpoints.lastModified().isAfter(lm) ? endpoints.lastModified() : lm; + endpoints.get().endpoints().forEach(e -> { + NodeId id = directory.peerId(e); + if (!full.contains(id)) + joining.add(id); + }); + } + return new CMSMembership(lm, BTreeSet.of(full), BTreeSet.of(joining)); + } + + public DataPlacement toPlacement(EndpointLookup lookup) + { + DataPlacement.Builder builder = DataPlacement.builder(); + for (NodeId id : fullMembers) + { + Replica replica = MetaStrategy.replica(lookup.endpoint(id)); + builder.withReadReplica(lastModified, replica); + builder.withWriteReplica(lastModified, replica); + } + for(NodeId id : joiningMembers) + { + builder.withWriteReplica(lastModified, MetaStrategy.replica(lookup.endpoint(id))); + } + return builder.build(); + } + + private CMSMembership() + { + this(Epoch.EMPTY, + BTreeSet.empty(NodeId::compareTo), + BTreeSet.empty(NodeId::compareTo)); + } + + private CMSMembership(Epoch lastModified, BTreeSet<NodeId> fullMembers, BTreeSet<NodeId> joiningMembers) + { + this.lastModified = lastModified; + this.fullMembers = fullMembers; + this.joiningMembers = joiningMembers; + } + + + @Override + public CMSMembership withLastModified(Epoch epoch) + { + return lastModified.is(epoch) ? this : new CMSMembership(epoch, fullMembers, joiningMembers); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + public ImmutableSet<NodeId> joiningMembers() + { + return ImmutableSet.copyOf(joiningMembers); + } + + public ImmutableSet<NodeId> fullMembers() + { + return ImmutableSet.copyOf(fullMembers); + } + + public CMSMembership startJoining(NodeId id) + { + if (joiningMembers.contains(id)) + throw new IllegalStateException(id + " is already joining the CMS"); + if (fullMembers.contains(id)) + throw new IllegalStateException(id + " has already fully joined the CMS"); + + return new CMSMembership(lastModified, fullMembers, joiningMembers.with(id)); + } + + public CMSMembership cancelJoining(NodeId id) + { + if (!joiningMembers.contains(id)) + throw new IllegalStateException(id + " is not currently joining the CMS"); + if (fullMembers.contains(id)) + throw new IllegalStateException(id + " has already fully joined the CMS"); + + return new CMSMembership(lastModified, fullMembers, joiningMembers.without(id)); + } + + public CMSMembership finishJoining(NodeId id) + { + if (!joiningMembers.contains(id)) + throw new IllegalStateException(id + " is not currently joining the CMS"); + if (fullMembers.contains(id)) + throw new IllegalStateException(id + " has already fully joined the CMS"); + + return new CMSMembership(lastModified, fullMembers.with(id), joiningMembers.without(id)); + } + + public CMSMembership leave(NodeId id) + { + if (joiningMembers.contains(id)) + throw new IllegalStateException(id + " is currently joining the CMS, "); + if (!fullMembers.contains(id)) + throw new IllegalStateException(id + " is not a CMS member"); + + return new CMSMembership(lastModified, fullMembers.without(id), joiningMembers); + } + + @Override + public String toString() + { + return "CMSMembership{" + + "lastModified=" + lastModified + + ", fullMembers=" + fullMembers + + ", joiningMembers=" + joiningMembers + + '}'; + } + + @Override + public final boolean equals(Object o) + { + if (!(o instanceof CMSMembership)) return false; + + CMSMembership that = (CMSMembership) o; + return Objects.equals(lastModified, that.lastModified) && + Objects.equals(fullMembers, that.fullMembers) && + Objects.equals(joiningMembers, that.joiningMembers); + } + + @Override + public int hashCode() + { + int result = Objects.hashCode(lastModified); + result = 31 * result + Objects.hashCode(fullMembers); + result = 31 * result + Objects.hashCode(joiningMembers); + return result; + } + + public static class Serializer implements MetadataSerializer<CMSMembership> + { + @Override + public void serialize(CMSMembership t, DataOutputPlus out, Version version) throws IOException + { + Epoch.serializer.serialize(t.lastModified, out); + + out.writeUnsignedVInt32(t.fullMembers.size()); + for (NodeId id : t.fullMembers) + NodeId.serializer.serialize(id, out, version); + + out.writeUnsignedVInt32(t.joiningMembers.size()); + for (NodeId id : t.joiningMembers) + NodeId.serializer.serialize(id, out, version); + } + + @Override + public CMSMembership deserialize(DataInputPlus in, Version version) throws IOException + { + Epoch lastModified = Epoch.serializer.deserialize(in, version); + + int fullMemberCount = in.readUnsignedVInt32(); + SortedSet<NodeId> fullMembers = new TreeSet<>(); Review Comment: maybe build the btreesets directly ########## src/java/org/apache/cassandra/tcm/Startup.java: ########## @@ -184,6 +239,128 @@ public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapPro } } + + /** + * If the broadcast address of this node has changed, we must verify the endpoints it knows for + * the members of the CMS are still reachable and valid. This is necessary for the node to submit + * a STARTUP transformation which updates its broadcast address in ClusterMetadata. + * + * If the node is itself a CMS member, it is also a requirement to be able to contact a + * majority of the other CMS members in order to perform the serial reads and writes which + * constitute committing to and fetching from the distributed metadata log. + * + * To do this, we use a simple protocol: + * 1. For each CMS member in our replayed ClusterMetadata, ping the associated broadcast address + * to query for id of the node at that address. This determines whether the endpoint still + * belongs to that same node (which is/was a CMS member). + * 2. While we don't have confirmed current addresses for a majority of CMS nodes: + * 2a. Run discovery to locate as many peer addresses as possible. + * 2b. Query every discovered endpoint and ask for its node id. + * If we still don't have confirmed addresses for a majority of CMS members, go to 2a and + * repeat as peers may themselves still be starting up and so may have become discoverable. + * + * This process builds up a mapping of id -> current address for CMS members which can then be + * used to construct a set of temporary redirects between addresses according to ClusterMetadata + * and the newly discovered ones. + * + * As each CMS node with a changed address goes through the startup process, it will commit its + * STARTUP transformation and the new broadcast address will be found in ClusterMetadata. A log + * listener is used to react to these transformations by removing redundant address overrides + * as they are enacted. + * + * @param nodeId derived from the persisted id of this node from the system.peers table + * @param replayed current ClusterMetadata after replaying the metadata log for startup + */ + private static ClusterMetadata initializeCMSLookup(NodeId nodeId, ClusterMetadata replayed) + { + InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId); + InetAddressAndPort newAddress = FBUtilities.getBroadcastAddressAndPort(); + if (newAddress.equals(oldAddress)) + return replayed; + + Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>(); + replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, replayed.directory.endpoint(id))); + Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>(); + + Set<InetAddressAndPort> candidates = new HashSet<>(previousCMS.values()); + candidates.add(newAddress); + + int maxRounds = 5; + int currentRound = 0; + long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4), + DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / maxRounds); + // TODO a non-CMS node only needs to be able to contact a single CMS member to commit its STARTUP + int quorum = (previousCMS.size() / 2) + 1; + logger.info("Running survey and discovery for CMS nodes {} (quorum = {})", previousCMS, quorum); + while (confirmedCMS.size() < quorum && currentRound < maxRounds) + { + logger.info("In round {} sending survey to {}", currentRound, candidates); + Collection<Pair<InetAddressAndPort, NodeId>> surveyed = + MessageDelivery.fanoutAndWait(MessagingService.instance(), + candidates, + Verb.TCM_DISCOVER_SURVEY_REQ, + NoPayload.noPayload, + roundTimeNanos, + TimeUnit.NANOSECONDS); + logger.info("Survey of {} discovered {}", candidates, surveyed); + surveyed.forEach(pair -> { + if (previousCMS.containsKey(pair.right)) + confirmedCMS.put(pair.right, pair.left); + + }); + + logger.info("Confirmed CMS members {}", confirmedCMS); + if (confirmedCMS.size() < quorum || (previousCMS.size() == 1 && confirmedCMS.containsKey(nodeId))) + { + // In the single node CMS case, run discovery simply to propagate this node's new address to the rest of + // the cluster via the seeds & discovery meshing. Otherwise, if every node has a new address the non-CMS + // members have no way to discover the CMS and it has no way to know the new places to push updates to. + logger.info("Running discovery round; either CMS quorum was not confirmed or this is the only CMS member"); + Discovery.DiscoveredNodes nodes = Discovery.instance.discover(5, true); + candidates.addAll(nodes.nodes()); + logger.info("Rediscovery completed, discovered nodes: {}", nodes); + } + currentRound++; + } + + if (confirmedCMS.size() >= quorum) + { + logger.info("Identified a quorum of CMS members (found {}, required {}).", confirmedCMS.size(), quorum); + if (confirmedCMS.values().containsAll(previousCMS.values())) + { + logger.info("No endpoint changes found for CMS members"); + return replayed; + } + else + { + logger.info("Applying temporary address overrides for uncommitted CMS endpoint changes"); + CMSLookup.InitialBuilder builder = CMSLookup.builder(replayed); + for (NodeId confirmed : confirmedCMS.keySet()) + { + InetAddressAndPort prev = previousCMS.get(confirmed); + InetAddressAndPort next = confirmedCMS.get(confirmed); + if (!next.equals(prev)) + { + logger.info("Added override for {}, ({} -> {})", confirmed, previousCMS.get(confirmed), confirmedCMS.get(confirmed)); + builder = builder.withOverride(confirmed, previousCMS.get(confirmed), confirmedCMS.get(confirmed)); Review Comment: nit; use prev/next here ########## src/java/org/apache/cassandra/tcm/Startup.java: ########## @@ -184,6 +239,128 @@ public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapPro } } + + /** + * If the broadcast address of this node has changed, we must verify the endpoints it knows for + * the members of the CMS are still reachable and valid. This is necessary for the node to submit + * a STARTUP transformation which updates its broadcast address in ClusterMetadata. + * + * If the node is itself a CMS member, it is also a requirement to be able to contact a + * majority of the other CMS members in order to perform the serial reads and writes which + * constitute committing to and fetching from the distributed metadata log. + * + * To do this, we use a simple protocol: + * 1. For each CMS member in our replayed ClusterMetadata, ping the associated broadcast address + * to query for id of the node at that address. This determines whether the endpoint still + * belongs to that same node (which is/was a CMS member). + * 2. While we don't have confirmed current addresses for a majority of CMS nodes: + * 2a. Run discovery to locate as many peer addresses as possible. + * 2b. Query every discovered endpoint and ask for its node id. + * If we still don't have confirmed addresses for a majority of CMS members, go to 2a and + * repeat as peers may themselves still be starting up and so may have become discoverable. + * + * This process builds up a mapping of id -> current address for CMS members which can then be + * used to construct a set of temporary redirects between addresses according to ClusterMetadata + * and the newly discovered ones. + * + * As each CMS node with a changed address goes through the startup process, it will commit its + * STARTUP transformation and the new broadcast address will be found in ClusterMetadata. A log + * listener is used to react to these transformations by removing redundant address overrides + * as they are enacted. + * + * @param nodeId derived from the persisted id of this node from the system.peers table + * @param replayed current ClusterMetadata after replaying the metadata log for startup + */ + private static ClusterMetadata initializeCMSLookup(NodeId nodeId, ClusterMetadata replayed) + { + InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId); + InetAddressAndPort newAddress = FBUtilities.getBroadcastAddressAndPort(); + if (newAddress.equals(oldAddress)) + return replayed; + + Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>(); + replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, replayed.directory.endpoint(id))); + Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>(); + + Set<InetAddressAndPort> candidates = new HashSet<>(previousCMS.values()); + candidates.add(newAddress); + + int maxRounds = 5; + int currentRound = 0; + long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4), Review Comment: is 4s enough here? Should we add another "discover survey" config setting? ########## src/java/org/apache/cassandra/tcm/ClusterMetadata.java: ########## @@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier, this.schema = schema; this.directory = directory; this.tokenMap = tokenMap; - this.placements = placements; this.accordFastPath = accordFastPath; + this.placements = placements; this.lockedRanges = lockedRanges; this.inProgressSequences = inProgressSequences; this.consensusMigrationState = consensusMigrationState; this.extensions = ImmutableMap.copyOf(extensions); this.locator = Locator.usingDirectory(directory); this.accordStaleReplicas = accordStaleReplicas; + this.cmsMembership = cmsMembership; + // Build CMS placement using no-op CMS lookup, i.e. using only committed node addresses + this.cmsDataPlacement = calculateCMSPlacement(placements, cmsMembership, CMSLookup.NO_OP); } - public Set<InetAddressAndPort> fullCMSMembers() + public Set<NodeId> fullCMSMemberIds() { - if (fullCMSEndpoints == null) - this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); - return fullCMSEndpoints; + return cmsMembership.fullMembers(); } - public Set<NodeId> fullCMSMemberIds() + public boolean isCMSMember(InetAddressAndPort endpoint) { - if (fullCMSIds == null) - this.fullCMSIds = placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet()); - return fullCMSIds; + if (epoch.isAfter(Epoch.FIRST)) + return fullCMSMembers().contains(endpoint); + + // special case to handle initialization of the CMS for the first time + return epoch.isEqualOrBefore(Epoch.FIRST) && cmsDataPlacement.reads.byEndpoint() + .keySet() + .contains(endpoint); + } + + public Set<InetAddressAndPort> fullCMSMembers() + { + if (fullCMSEndpoints == null) + { + EndpointLookup lookup = endpointLookup(); + fullCMSEndpoints = ImmutableSet.copyOf(cmsMembership.fullMembers() + .stream() + .map(lookup::endpoint) + .collect(Collectors.toSet())); + } + return fullCMSEndpoints; } public EndpointsForRange fullCMSMembersAsReplicas() { if (fullCMSReplicas == null) - fullCMSReplicas = placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get(); + { + EndpointLookup lookup = endpointLookup(); + EndpointsForRange.Builder builder = EndpointsForRange.builder(MetaStrategy.entireRange); + for (NodeId nodeId : fullCMSMemberIds()) + builder.add(MetaStrategy.replica(lookup.endpoint(nodeId))); + fullCMSReplicas = builder.build(); + } return fullCMSReplicas; } - public boolean isCMSMember(InetAddressAndPort endpoint) + // Synchronization is probably not necessary as this should only be called by a log listener + public synchronized void refreshCMSLookup(ClusterMetadata prev, boolean fromSnapshot) + { + CMSLookup prevLookup = prev.cmsLookup; + CMSLookup proposedLookup = prevLookup.rebuild(prev, this, fromSnapshot); + // rebuild returns `this` if no changes + if (prevLookup == proposedLookup) + logger.debug("No changes to CMSLookup between epochs {} and {}", prev.epoch.getEpoch(), epoch.getEpoch()); + else + { + logger.debug("Replacing CMSLookup, Current: {}, Proposed: {}", prevLookup, proposedLookup); + // recalculate CMS placement using new lookup + cmsDataPlacement = calculateCMSPlacement(placements, cmsMembership, proposedLookup); + // We shouldn't need to null out the other CMS fields when we refresh the CMSLookup as that is done in a + // precommit listener, meaning nothing should be accessing the ClusterMetadata instance yet. + } + + // either way, set the lookup for _this_ epoch + cmsLookup = proposedLookup; + } + + // Synchronization is probably not necessary as this should only be called once, during startup + public synchronized boolean initCMSLookup(CMSLookup lookup) + { + logger.info("Initializing CMS lookup on ClusterMetadata at epoch {}", epoch.getEpoch()); + boolean isInitial = cmsLookup.isUninitialized(); + logger.debug("Current CMS lookup: {}, proposed: {}", cmsLookup, lookup); + if (isInitial) + { + cmsLookup = lookup; + cmsDataPlacement = calculateCMSPlacement(placements, cmsMembership, lookup); + // We need to null out these fields when we init the CMSLookup because post-commit listeners may have + // accessed them during log replay, which happens before this. + fullCMSReplicas = null; + fullCMSEndpoints = null; + } + else + logger.warn("Invalid CMSLookup state for initialization: {}", cmsLookup); + return isInitial; + } + + private DataPlacement calculateCMSPlacement(DataPlacements placements, CMSMembership cms, CMSLookup lookup) { - return fullCMSMembers().contains(endpoint); + if (epoch.isBefore(Epoch.FIRST) || schema.getKeyspaces().get(SchemaConstants.METADATA_KEYSPACE_NAME).isEmpty()) + return DataPlacement.empty(); + + if (directory.isEmpty()) + { + if (epoch.is(Epoch.FIRST)) + { + // PRE_INITIALIZE_CMS: placements need to be hardcoded to the local address so that the subsequent + // INITIALIZE_CMS can be committed + Replica localReplica = MetaStrategy.replica(FBUtilities.getBroadcastAddressAndPort()); + return DataPlacement.builder() + .withReadReplica(Epoch.FIRST, localReplica) + .withWriteReplica(Epoch.FIRST, localReplica) + .build(); + } + else + { + // This cluster did not previously upgrade from a gossip based version (i.e. pre-6.0) but did at some point + // run a version prior to MetadataVersion.V7 where we started to encode CMS membership directly. This Review Comment: V9 ########## src/java/org/apache/cassandra/tcm/ClusterMetadata.java: ########## @@ -1108,6 +1339,26 @@ public ClusterMetadata deserialize(DataInputPlus in, Version version) throws IOE value.deserialize(in, version); extensions.put(key, value); } + + CMSMembership cmsMembership = CMSMembership.EMPTY; + if (version.isAtLeast(Version.V9)) + cmsMembership = CMSMembership.serializer.deserialize(in, version); + else + { + KeyspaceMetadata metadataKs = schema.getKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME); + if (!dir.isEmpty()) + { + // Pre-V9 the membership of the CMS was always inferred from the placement of the distributed + // metadata keyspace. This is true for the initial cluster metadata created during upgrade from + // gossip and for subsequent epochs. The endpoints in the placement must belong to registered nodes, + // so we can derive the CMSMembership using the data placement and directory. + DataPlacement placement = placements.get(metadataKs.params.replication); + cmsMembership = CMSMembership.reconstruct(placement, dir); + placements = placements.unbuild().without(metadataKs.params.replication).build(); Review Comment: I think this is unnecessary - we do the same thing directly after the if stmt ########## src/java/org/apache/cassandra/tcm/Startup.java: ########## @@ -184,6 +239,128 @@ public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapPro } } + + /** + * If the broadcast address of this node has changed, we must verify the endpoints it knows for + * the members of the CMS are still reachable and valid. This is necessary for the node to submit + * a STARTUP transformation which updates its broadcast address in ClusterMetadata. + * + * If the node is itself a CMS member, it is also a requirement to be able to contact a + * majority of the other CMS members in order to perform the serial reads and writes which + * constitute committing to and fetching from the distributed metadata log. + * + * To do this, we use a simple protocol: + * 1. For each CMS member in our replayed ClusterMetadata, ping the associated broadcast address + * to query for id of the node at that address. This determines whether the endpoint still + * belongs to that same node (which is/was a CMS member). + * 2. While we don't have confirmed current addresses for a majority of CMS nodes: + * 2a. Run discovery to locate as many peer addresses as possible. + * 2b. Query every discovered endpoint and ask for its node id. + * If we still don't have confirmed addresses for a majority of CMS members, go to 2a and + * repeat as peers may themselves still be starting up and so may have become discoverable. + * + * This process builds up a mapping of id -> current address for CMS members which can then be + * used to construct a set of temporary redirects between addresses according to ClusterMetadata + * and the newly discovered ones. + * + * As each CMS node with a changed address goes through the startup process, it will commit its + * STARTUP transformation and the new broadcast address will be found in ClusterMetadata. A log + * listener is used to react to these transformations by removing redundant address overrides + * as they are enacted. + * + * @param nodeId derived from the persisted id of this node from the system.peers table + * @param replayed current ClusterMetadata after replaying the metadata log for startup + */ + private static ClusterMetadata initializeCMSLookup(NodeId nodeId, ClusterMetadata replayed) + { + InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId); + InetAddressAndPort newAddress = FBUtilities.getBroadcastAddressAndPort(); + if (newAddress.equals(oldAddress)) + return replayed; + + Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>(); + replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, replayed.directory.endpoint(id))); + Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>(); + + Set<InetAddressAndPort> candidates = new HashSet<>(previousCMS.values()); + candidates.add(newAddress); + + int maxRounds = 5; + int currentRound = 0; + long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4), + DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / maxRounds); + // TODO a non-CMS node only needs to be able to contact a single CMS member to commit its STARTUP Review Comment: should we fix this? It feels like we'll most often discover the full CMS if its up and if it is not yet up, it might be better to wait here before trying to commit Startup? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

