krummas commented on code in PR #4613:
URL: https://github.com/apache/cassandra/pull/4613#discussion_r2811093721


##########
src/java/org/apache/cassandra/tcm/CMSMembership.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public class CMSMembership implements MetadataValue<CMSMembership>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final CMSMembership EMPTY = new CMSMembership();
+
+    private final Epoch lastModified;
+    private final BTreeSet<NodeId> fullMembers;
+    private final BTreeSet<NodeId> joiningMembers;
+
+    /**
+     * Used to derive a CMSMembership when deserializing a ClusterMetadata 
instance written with a metadata version
+     * prior to V7. At that time, CMS membership was always inferred from the 
data placements of the distributed

Review Comment:
   "prior to V9" I think?



##########
src/java/org/apache/cassandra/tcm/CMSLookup.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public class CMSLookup
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CMSLookup.class);
+
+    public enum State { PRE_INIT, ACTIVE, RETIRED };
+
+    public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, 
Epoch.EMPTY, new HashMap<>());
+    public static InitialBuilder builder(ClusterMetadata metadata)
+    {
+        return new InitialBuilder(metadata);
+    }
+
+    private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> 
overrides;
+    private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap;

Review Comment:
   `addressMap` is only used in the `toString` method



##########
src/java/org/apache/cassandra/tcm/CMSLookup.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public class CMSLookup
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CMSLookup.class);
+
+    public enum State { PRE_INIT, ACTIVE, RETIRED };
+
+    public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, 
Epoch.EMPTY, new HashMap<>());
+    public static InitialBuilder builder(ClusterMetadata metadata)
+    {
+        return new InitialBuilder(metadata);
+    }
+
+    private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> 
overrides;
+    private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap;
+    private final Epoch lastModified;
+    private final State state;
+
+    private CMSLookup(State state, Epoch epoch, Map<NodeId, 
Pair<InetAddressAndPort, InetAddressAndPort>> overrides)
+    {
+        this.state = state;
+        this.lastModified = epoch;
+        this.addressMap = HashBiMap.create(overrides.size());
+        this.overrides = Maps.newHashMapWithExpectedSize(overrides.size());
+        for (Map.Entry<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> e 
: overrides.entrySet())
+        {
+            this.overrides.put(e.getKey(), e.getValue());
+            this.addressMap.put(e.getValue().left, e.getValue().right);
+        }
+    }
+
+    public boolean isUninitialized()
+    {
+        return state == State.PRE_INIT;
+    }
+
+    public boolean isActive()
+    {
+        return state == State.ACTIVE;
+    }
+
+    public InetAddressAndPort getAddressOverride(NodeId id)

Review Comment:
   unused



##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -184,6 +239,128 @@ public static void 
initializeAsNonCmsNode(Function<Processor, Processor> wrapPro
         }
     }
 
+
+    /**
+     * If the broadcast address of this node has changed, we must verify the 
endpoints it knows for
+     * the members of the CMS are still reachable and valid. This is necessary 
for the node to submit
+     * a STARTUP transformation which updates its broadcast address in 
ClusterMetadata.
+     *
+     * If the node is itself a CMS member, it is also a requirement to be able 
to contact a
+     * majority of the other CMS members in order to perform the serial reads 
and writes which
+     * constitute committing to and fetching from the distributed metadata log.
+     *
+     * To do this, we use a simple protocol:
+     * 1. For each CMS member in our replayed ClusterMetadata, ping the 
associated broadcast address
+     *   to query for id of the node at that address. This determines whether 
the endpoint still
+     *   belongs to that same node (which is/was a CMS member).
+     * 2. While we don't have confirmed current addresses for a majority of 
CMS nodes:
+     * 2a. Run discovery to locate as many peer addresses as possible.
+     * 2b. Query every discovered endpoint and ask for its node id.
+     * If we still don't have confirmed addresses for a majority of CMS 
members, go to 2a and
+     * repeat as peers may themselves still be starting up and so may have 
become discoverable.
+     *
+     * This process builds up a mapping of id -> current address for CMS 
members which can then be
+     * used to construct a set of temporary redirects between addresses 
according to ClusterMetadata
+     * and the newly discovered ones.
+     *
+     * As each CMS node with a changed address goes through the startup 
process, it will commit its
+     * STARTUP transformation and the new broadcast address will be found in 
ClusterMetadata. A log
+     * listener is used to react to these transformations by removing 
redundant address overrides
+     * as they are enacted.
+     *
+     * @param nodeId derived from the persisted id of this node from the 
system.peers table
+     * @param replayed current ClusterMetadata after replaying the metadata 
log for startup
+     */
+    private static ClusterMetadata initializeCMSLookup(NodeId nodeId, 
ClusterMetadata replayed)
+    {
+        InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId);
+        InetAddressAndPort newAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        if (newAddress.equals(oldAddress))
+            return replayed;
+
+        Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>();
+        replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, 
replayed.directory.endpoint(id)));
+        Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>();
+
+        Set<InetAddressAndPort> candidates = new 
HashSet<>(previousCMS.values());
+        candidates.add(newAddress);

Review Comment:
   any reason we don't add the seeds to candidates here? Feels like it could 
save us a discovery round



##########
src/java/org/apache/cassandra/tcm/CMSLookup.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public class CMSLookup
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CMSLookup.class);
+
+    public enum State { PRE_INIT, ACTIVE, RETIRED };
+
+    public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, 
Epoch.EMPTY, new HashMap<>());
+    public static InitialBuilder builder(ClusterMetadata metadata)
+    {
+        return new InitialBuilder(metadata);
+    }
+
+    private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> 
overrides;
+    private final BiMap<InetAddressAndPort, InetAddressAndPort> addressMap;
+    private final Epoch lastModified;
+    private final State state;
+
+    private CMSLookup(State state, Epoch epoch, Map<NodeId, 
Pair<InetAddressAndPort, InetAddressAndPort>> overrides)
+    {
+        this.state = state;
+        this.lastModified = epoch;
+        this.addressMap = HashBiMap.create(overrides.size());
+        this.overrides = Maps.newHashMapWithExpectedSize(overrides.size());
+        for (Map.Entry<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> e 
: overrides.entrySet())
+        {
+            this.overrides.put(e.getKey(), e.getValue());
+            this.addressMap.put(e.getValue().left, e.getValue().right);
+        }
+    }
+
+    public boolean isUninitialized()
+    {
+        return state == State.PRE_INIT;
+    }
+
+    public boolean isActive()
+    {
+        return state == State.ACTIVE;
+    }
+
+    public InetAddressAndPort getAddressOverride(NodeId id)
+    {
+        Pair<InetAddressAndPort, InetAddressAndPort> override = 
overrides.get(id);
+        return override != null ? override.right : null;
+    }
+
+    public EndpointLookup asNodeLookup(EndpointLookup lookup)
+    {
+        return new EndpointLookup()
+        {
+            @Override
+            public InetAddressAndPort endpoint(NodeId id)
+            {
+                if (overrides.containsKey(id))
+                    return overrides.get(id).right;
+                return lookup.endpoint(id);
+            }
+        };
+    }
+
+    public CMSLookup rebuild(ClusterMetadata prev, ClusterMetadata next, 
boolean fromSnapshot)
+    {
+        logger.debug("Rebuilding CMS lookup {} with metadata from epoch {}", 
this, next.epoch.getEpoch());
+
+        // All address changes have been enacted, nothing to do
+        if (state == State.RETIRED)
+            return this;
+
+        if (!next.epoch.isEqualOrBefore(Epoch.FIRST)
+            && !fromSnapshot
+            && 
next.directory.lastModified().equals(prev.directory.lastModified()))
+            return this;
+
+        // Filters from the override list those which are no longer necessary 
as a transformation has now
+        // replaced the old address with the new one for that node id
+        Predicate<Map.Entry<NodeId, Pair<InetAddressAndPort, 
InetAddressAndPort>>> overrideNowEnacted = entry -> {
+            NodeId nodeId = entry.getKey();
+            if (!Objects.equals(prev.directory.getNodeAddresses(nodeId), 
next.directory.getNodeAddresses(nodeId)))
+            {
+                Pair<InetAddressAndPort, InetAddressAndPort> override = 
overrides.get(nodeId);
+                if (override != null)
+                {
+                    // If there was an override for this nodeId && the old/new 
addresses match the prev/next directory
+                    // entries, filter the override from the map which will be 
used to build the new version
+                    return 
!override.left.equals(prev.directory.endpoint(nodeId))
+                           || 
!override.right.equals(next.directory.endpoint(nodeId));
+                }
+            }
+            return true;
+        };
+
+        logger.debug("Current endpoint overrides: {}", overrides);
+        Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> nextOverrides
+            = overrides.entrySet()
+                       .stream()
+                       .filter(overrideNowEnacted)
+                       .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        logger.debug("Proposed endpoint overrides: {}", nextOverrides);
+        State state = nextOverrides.isEmpty() ? State.RETIRED : State.ACTIVE;

Review Comment:
   should we remove the log listener if `state == retired`?



##########
src/java/org/apache/cassandra/tcm/Discovery.java:
##########
@@ -94,12 +118,14 @@ public Discovery(Supplier<MessageDelivery> messaging, 
Supplier<Set<InetAddressAn
 
     public DiscoveredNodes discover()
     {
-        return discover(5);
+        return discover(5, false);
     }
 
-    public DiscoveredNodes discover(int rounds)
+    public DiscoveredNodes discover(int rounds, boolean allPeers)
     {
         boolean res = state.compareAndSet(State.NOT_STARTED, 
State.IN_PROGRESS);
+        if (!res)

Review Comment:
   I think we could replace state with an `inProgress` `AtomicBoolean` - 
`State.FOUND_CMS` is not used and `State.FINISHED` and `State.NOT_STARTED` are 
equivalent with this change



##########
src/java/org/apache/cassandra/tcm/ClusterMetadata.java:
##########
@@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier,
         this.schema = schema;
         this.directory = directory;
         this.tokenMap = tokenMap;
-        this.placements = placements;
         this.accordFastPath = accordFastPath;
+        this.placements = placements;
         this.lockedRanges = lockedRanges;
         this.inProgressSequences = inProgressSequences;
         this.consensusMigrationState = consensusMigrationState;
         this.extensions = ImmutableMap.copyOf(extensions);
         this.locator = Locator.usingDirectory(directory);
         this.accordStaleReplicas = accordStaleReplicas;
+        this.cmsMembership = cmsMembership;
+        // Build CMS placement using no-op CMS lookup, i.e. using only 
committed node addresses
+        this.cmsDataPlacement = calculateCMSPlacement(placements, 
cmsMembership, CMSLookup.NO_OP);
     }
 
-    public Set<InetAddressAndPort> fullCMSMembers()
+    public Set<NodeId> fullCMSMemberIds()
     {
-        if (fullCMSEndpoints == null)
-            this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
-        return fullCMSEndpoints;
+        return cmsMembership.fullMembers();
     }
 
-    public Set<NodeId> fullCMSMemberIds()
+    public boolean isCMSMember(InetAddressAndPort endpoint)

Review Comment:
   most of the callers of this method use 
`FBUtilities.getBroadcastAddressAndPort()` - should we add a convenience method 
to look this up using `myNodeId()` and `cmsMembership`? Or maybe set a boolean 
in `cmsMembership` constructor to avoid the lookups



##########
src/java/org/apache/cassandra/tcm/CMSMembership.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public class CMSMembership implements MetadataValue<CMSMembership>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final CMSMembership EMPTY = new CMSMembership();
+
+    private final Epoch lastModified;
+    private final BTreeSet<NodeId> fullMembers;
+    private final BTreeSet<NodeId> joiningMembers;
+
+    /**
+     * Used to derive a CMSMembership when deserializing a ClusterMetadata 
instance written with a metadata version
+     * prior to V7. At that time, CMS membership was always inferred from the 
data placements of the distributed
+     * cluster metadata keyspace. Read replicas are full members of the CMS 
and write-only replicas are in the process
+     * of joining. Note: every read replica must also be a write replica, 
leaving the CMS is atomic in respect of the
+     * placements.
+     * @param placement
+     * @param directory
+     * @return
+     */
+    public static CMSMembership reconstruct(DataPlacement placement, Directory 
directory)
+    {
+        SortedSet<NodeId> full = new TreeSet<>();
+        SortedSet<NodeId> joining = new TreeSet<>();
+        Epoch lm = Epoch.EMPTY;
+        for (VersionedEndpoints.ForRange endpoints : placement.reads.endpoints)
+        {
+            lm = endpoints.lastModified().isAfter(lm) ? 
endpoints.lastModified() : lm;
+            endpoints.get().endpoints().forEach(e -> 
full.add(directory.peerId(e)));
+        }
+
+        for (VersionedEndpoints.ForRange endpoints : 
placement.writes.endpoints)
+        {
+            lm = endpoints.lastModified().isAfter(lm) ? 
endpoints.lastModified() : lm;
+            endpoints.get().endpoints().forEach(e -> {
+                NodeId id = directory.peerId(e);
+                if (!full.contains(id))
+                    joining.add(id);
+            });
+        }
+        return new CMSMembership(lm, BTreeSet.of(full), BTreeSet.of(joining));
+    }
+
+    public DataPlacement toPlacement(EndpointLookup lookup)
+    {
+        DataPlacement.Builder builder = DataPlacement.builder();
+        for (NodeId id : fullMembers)
+        {
+            Replica replica = MetaStrategy.replica(lookup.endpoint(id));
+            builder.withReadReplica(lastModified, replica);
+            builder.withWriteReplica(lastModified, replica);
+        }
+        for(NodeId id : joiningMembers)
+        {
+            builder.withWriteReplica(lastModified, 
MetaStrategy.replica(lookup.endpoint(id)));
+        }
+        return builder.build();
+    }
+
+    private CMSMembership()
+    {
+        this(Epoch.EMPTY,
+             BTreeSet.empty(NodeId::compareTo),
+             BTreeSet.empty(NodeId::compareTo));
+    }
+
+    private CMSMembership(Epoch lastModified, BTreeSet<NodeId> fullMembers, 
BTreeSet<NodeId> joiningMembers)
+    {
+        this.lastModified = lastModified;
+        this.fullMembers = fullMembers;
+        this.joiningMembers = joiningMembers;
+    }
+
+
+    @Override
+    public CMSMembership withLastModified(Epoch epoch)
+    {
+        return lastModified.is(epoch) ? this : new CMSMembership(epoch, 
fullMembers, joiningMembers);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    public ImmutableSet<NodeId> joiningMembers()
+    {
+        return ImmutableSet.copyOf(joiningMembers);

Review Comment:
   nit; BTreeSet is already immutable, we probably don't need to copy it



##########
src/java/org/apache/cassandra/tcm/ClusterMetadata.java:
##########
@@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier,
         this.schema = schema;
         this.directory = directory;
         this.tokenMap = tokenMap;
-        this.placements = placements;
         this.accordFastPath = accordFastPath;
+        this.placements = placements;
         this.lockedRanges = lockedRanges;
         this.inProgressSequences = inProgressSequences;
         this.consensusMigrationState = consensusMigrationState;
         this.extensions = ImmutableMap.copyOf(extensions);
         this.locator = Locator.usingDirectory(directory);
         this.accordStaleReplicas = accordStaleReplicas;
+        this.cmsMembership = cmsMembership;
+        // Build CMS placement using no-op CMS lookup, i.e. using only 
committed node addresses
+        this.cmsDataPlacement = calculateCMSPlacement(placements, 
cmsMembership, CMSLookup.NO_OP);
     }
 
-    public Set<InetAddressAndPort> fullCMSMembers()
+    public Set<NodeId> fullCMSMemberIds()
     {
-        if (fullCMSEndpoints == null)
-            this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
-        return fullCMSEndpoints;
+        return cmsMembership.fullMembers();
     }
 
-    public Set<NodeId> fullCMSMemberIds()
+    public boolean isCMSMember(InetAddressAndPort endpoint)
     {
-        if (fullCMSIds == null)
-            this.fullCMSIds = 
placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet());
-        return fullCMSIds;
+        if (epoch.isAfter(Epoch.FIRST))
+            return fullCMSMembers().contains(endpoint);
+
+        // special case to handle initialization of the CMS for the first time
+        return epoch.isEqualOrBefore(Epoch.FIRST) && 
cmsDataPlacement.reads.byEndpoint()

Review Comment:
   nit; guess we can remove the `isEqualOrBefore` check here



##########
src/java/org/apache/cassandra/tcm/CMSLookup.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public class CMSLookup
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CMSLookup.class);
+
+    public enum State { PRE_INIT, ACTIVE, RETIRED };
+
+    public final static CMSLookup NO_OP = new CMSLookup(State.PRE_INIT, 
Epoch.EMPTY, new HashMap<>());
+    public static InitialBuilder builder(ClusterMetadata metadata)
+    {
+        return new InitialBuilder(metadata);
+    }
+
+    private final Map<NodeId, Pair<InetAddressAndPort, InetAddressAndPort>> 
overrides;

Review Comment:
   this should probably be an `ImmutableMap` for clarity?
   
   and if we make `InitialBuilder` and `rebuild` below build immutablemaps we 
can avoid the copying



##########
src/java/org/apache/cassandra/tcm/CMSMembership.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public class CMSMembership implements MetadataValue<CMSMembership>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final CMSMembership EMPTY = new CMSMembership();
+
+    private final Epoch lastModified;
+    private final BTreeSet<NodeId> fullMembers;
+    private final BTreeSet<NodeId> joiningMembers;
+
+    /**
+     * Used to derive a CMSMembership when deserializing a ClusterMetadata 
instance written with a metadata version
+     * prior to V7. At that time, CMS membership was always inferred from the 
data placements of the distributed
+     * cluster metadata keyspace. Read replicas are full members of the CMS 
and write-only replicas are in the process
+     * of joining. Note: every read replica must also be a write replica, 
leaving the CMS is atomic in respect of the
+     * placements.
+     * @param placement
+     * @param directory
+     * @return
+     */
+    public static CMSMembership reconstruct(DataPlacement placement, Directory 
directory)
+    {
+        SortedSet<NodeId> full = new TreeSet<>();

Review Comment:
   we could probably build the BTreeSets directly and avoid the copying below?



##########
src/java/org/apache/cassandra/tcm/CMSMembership.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.EndpointLookup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public class CMSMembership implements MetadataValue<CMSMembership>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final CMSMembership EMPTY = new CMSMembership();
+
+    private final Epoch lastModified;
+    private final BTreeSet<NodeId> fullMembers;
+    private final BTreeSet<NodeId> joiningMembers;
+
+    /**
+     * Used to derive a CMSMembership when deserializing a ClusterMetadata 
instance written with a metadata version
+     * prior to V7. At that time, CMS membership was always inferred from the 
data placements of the distributed
+     * cluster metadata keyspace. Read replicas are full members of the CMS 
and write-only replicas are in the process
+     * of joining. Note: every read replica must also be a write replica, 
leaving the CMS is atomic in respect of the
+     * placements.
+     * @param placement
+     * @param directory
+     * @return
+     */
+    public static CMSMembership reconstruct(DataPlacement placement, Directory 
directory)
+    {
+        SortedSet<NodeId> full = new TreeSet<>();
+        SortedSet<NodeId> joining = new TreeSet<>();
+        Epoch lm = Epoch.EMPTY;
+        for (VersionedEndpoints.ForRange endpoints : placement.reads.endpoints)
+        {
+            lm = endpoints.lastModified().isAfter(lm) ? 
endpoints.lastModified() : lm;
+            endpoints.get().endpoints().forEach(e -> 
full.add(directory.peerId(e)));
+        }
+
+        for (VersionedEndpoints.ForRange endpoints : 
placement.writes.endpoints)
+        {
+            lm = endpoints.lastModified().isAfter(lm) ? 
endpoints.lastModified() : lm;
+            endpoints.get().endpoints().forEach(e -> {
+                NodeId id = directory.peerId(e);
+                if (!full.contains(id))
+                    joining.add(id);
+            });
+        }
+        return new CMSMembership(lm, BTreeSet.of(full), BTreeSet.of(joining));
+    }
+
+    public DataPlacement toPlacement(EndpointLookup lookup)
+    {
+        DataPlacement.Builder builder = DataPlacement.builder();
+        for (NodeId id : fullMembers)
+        {
+            Replica replica = MetaStrategy.replica(lookup.endpoint(id));
+            builder.withReadReplica(lastModified, replica);
+            builder.withWriteReplica(lastModified, replica);
+        }
+        for(NodeId id : joiningMembers)
+        {
+            builder.withWriteReplica(lastModified, 
MetaStrategy.replica(lookup.endpoint(id)));
+        }
+        return builder.build();
+    }
+
+    private CMSMembership()
+    {
+        this(Epoch.EMPTY,
+             BTreeSet.empty(NodeId::compareTo),
+             BTreeSet.empty(NodeId::compareTo));
+    }
+
+    private CMSMembership(Epoch lastModified, BTreeSet<NodeId> fullMembers, 
BTreeSet<NodeId> joiningMembers)
+    {
+        this.lastModified = lastModified;
+        this.fullMembers = fullMembers;
+        this.joiningMembers = joiningMembers;
+    }
+
+
+    @Override
+    public CMSMembership withLastModified(Epoch epoch)
+    {
+        return lastModified.is(epoch) ? this : new CMSMembership(epoch, 
fullMembers, joiningMembers);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    public ImmutableSet<NodeId> joiningMembers()
+    {
+        return ImmutableSet.copyOf(joiningMembers);
+    }
+
+    public ImmutableSet<NodeId> fullMembers()
+    {
+        return ImmutableSet.copyOf(fullMembers);
+    }
+
+    public CMSMembership startJoining(NodeId id)
+    {
+        if (joiningMembers.contains(id))
+            throw new IllegalStateException(id + " is already joining the 
CMS");
+        if (fullMembers.contains(id))
+            throw new IllegalStateException(id + " has already fully joined 
the CMS");
+
+        return new CMSMembership(lastModified, fullMembers, 
joiningMembers.with(id));
+    }
+
+    public CMSMembership cancelJoining(NodeId id)
+    {
+        if (!joiningMembers.contains(id))
+            throw new IllegalStateException(id + " is not currently joining 
the CMS");
+        if (fullMembers.contains(id))
+            throw new IllegalStateException(id + " has already fully joined 
the CMS");
+
+        return new CMSMembership(lastModified, fullMembers, 
joiningMembers.without(id));
+    }
+
+    public CMSMembership finishJoining(NodeId id)
+    {
+        if (!joiningMembers.contains(id))
+            throw new IllegalStateException(id + " is not currently joining 
the CMS");
+        if (fullMembers.contains(id))
+            throw new IllegalStateException(id + " has already fully joined 
the CMS");
+
+        return new CMSMembership(lastModified, fullMembers.with(id), 
joiningMembers.without(id));
+    }
+
+    public CMSMembership leave(NodeId id)
+    {
+        if (joiningMembers.contains(id))
+            throw new IllegalStateException(id + " is currently joining the 
CMS, ");
+        if (!fullMembers.contains(id))
+            throw new IllegalStateException(id + " is not a CMS member");
+
+        return new CMSMembership(lastModified, fullMembers.without(id), 
joiningMembers);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CMSMembership{" +
+               "lastModified=" + lastModified +
+               ", fullMembers=" + fullMembers +
+               ", joiningMembers=" + joiningMembers +
+               '}';
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if (!(o instanceof CMSMembership)) return false;
+
+        CMSMembership that = (CMSMembership) o;
+        return Objects.equals(lastModified, that.lastModified) &&
+               Objects.equals(fullMembers, that.fullMembers) &&
+               Objects.equals(joiningMembers, that.joiningMembers);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = Objects.hashCode(lastModified);
+        result = 31 * result + Objects.hashCode(fullMembers);
+        result = 31 * result + Objects.hashCode(joiningMembers);
+        return result;
+    }
+
+    public static class Serializer implements MetadataSerializer<CMSMembership>
+    {
+        @Override
+        public void serialize(CMSMembership t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            Epoch.serializer.serialize(t.lastModified, out);
+
+            out.writeUnsignedVInt32(t.fullMembers.size());
+            for (NodeId id : t.fullMembers)
+                NodeId.serializer.serialize(id, out, version);
+
+            out.writeUnsignedVInt32(t.joiningMembers.size());
+            for (NodeId id : t.joiningMembers)
+                NodeId.serializer.serialize(id, out, version);
+        }
+
+        @Override
+        public CMSMembership deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            Epoch lastModified = Epoch.serializer.deserialize(in, version);
+
+            int fullMemberCount = in.readUnsignedVInt32();
+            SortedSet<NodeId> fullMembers = new TreeSet<>();

Review Comment:
   maybe build the btreesets directly



##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -184,6 +239,128 @@ public static void 
initializeAsNonCmsNode(Function<Processor, Processor> wrapPro
         }
     }
 
+
+    /**
+     * If the broadcast address of this node has changed, we must verify the 
endpoints it knows for
+     * the members of the CMS are still reachable and valid. This is necessary 
for the node to submit
+     * a STARTUP transformation which updates its broadcast address in 
ClusterMetadata.
+     *
+     * If the node is itself a CMS member, it is also a requirement to be able 
to contact a
+     * majority of the other CMS members in order to perform the serial reads 
and writes which
+     * constitute committing to and fetching from the distributed metadata log.
+     *
+     * To do this, we use a simple protocol:
+     * 1. For each CMS member in our replayed ClusterMetadata, ping the 
associated broadcast address
+     *   to query for id of the node at that address. This determines whether 
the endpoint still
+     *   belongs to that same node (which is/was a CMS member).
+     * 2. While we don't have confirmed current addresses for a majority of 
CMS nodes:
+     * 2a. Run discovery to locate as many peer addresses as possible.
+     * 2b. Query every discovered endpoint and ask for its node id.
+     * If we still don't have confirmed addresses for a majority of CMS 
members, go to 2a and
+     * repeat as peers may themselves still be starting up and so may have 
become discoverable.
+     *
+     * This process builds up a mapping of id -> current address for CMS 
members which can then be
+     * used to construct a set of temporary redirects between addresses 
according to ClusterMetadata
+     * and the newly discovered ones.
+     *
+     * As each CMS node with a changed address goes through the startup 
process, it will commit its
+     * STARTUP transformation and the new broadcast address will be found in 
ClusterMetadata. A log
+     * listener is used to react to these transformations by removing 
redundant address overrides
+     * as they are enacted.
+     *
+     * @param nodeId derived from the persisted id of this node from the 
system.peers table
+     * @param replayed current ClusterMetadata after replaying the metadata 
log for startup
+     */
+    private static ClusterMetadata initializeCMSLookup(NodeId nodeId, 
ClusterMetadata replayed)
+    {
+        InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId);
+        InetAddressAndPort newAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        if (newAddress.equals(oldAddress))
+            return replayed;
+
+        Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>();
+        replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, 
replayed.directory.endpoint(id)));
+        Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>();
+
+        Set<InetAddressAndPort> candidates = new 
HashSet<>(previousCMS.values());
+        candidates.add(newAddress);
+
+        int maxRounds = 5;
+        int currentRound = 0;
+        long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4),
+                                       
DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / maxRounds);
+        // TODO a non-CMS node only needs to be able to contact a single CMS 
member to commit its STARTUP
+        int quorum = (previousCMS.size() / 2) + 1;
+        logger.info("Running survey and discovery for CMS nodes {} (quorum = 
{})", previousCMS, quorum);
+        while (confirmedCMS.size() < quorum && currentRound < maxRounds)
+        {
+            logger.info("In round {} sending survey to {}", currentRound, 
candidates);
+            Collection<Pair<InetAddressAndPort, NodeId>> surveyed =
+            MessageDelivery.fanoutAndWait(MessagingService.instance(),
+                                          candidates,
+                                          Verb.TCM_DISCOVER_SURVEY_REQ,
+                                          NoPayload.noPayload,
+                                          roundTimeNanos,
+                                          TimeUnit.NANOSECONDS);
+            logger.info("Survey of {} discovered {}", candidates, surveyed);
+            surveyed.forEach(pair -> {
+                if (previousCMS.containsKey(pair.right))
+                    confirmedCMS.put(pair.right, pair.left);
+
+            });
+
+            logger.info("Confirmed CMS members {}", confirmedCMS);
+            if (confirmedCMS.size() < quorum || (previousCMS.size() == 1 && 
confirmedCMS.containsKey(nodeId)))
+            {
+                // In the single node CMS case, run discovery simply to 
propagate this node's new address to the rest of
+                // the cluster via the seeds & discovery meshing. Otherwise, 
if every node has a new address the non-CMS
+                // members have no way to discover the CMS and it has no way 
to know the new places to push updates to.
+                logger.info("Running discovery round; either CMS quorum was 
not confirmed or this is the only CMS member");
+                Discovery.DiscoveredNodes nodes = 
Discovery.instance.discover(5, true);
+                candidates.addAll(nodes.nodes());
+                logger.info("Rediscovery completed, discovered nodes: {}", 
nodes);
+            }
+            currentRound++;
+        }
+
+        if (confirmedCMS.size() >= quorum)
+        {
+            logger.info("Identified a quorum of CMS members (found {}, 
required {}).", confirmedCMS.size(), quorum);
+            if (confirmedCMS.values().containsAll(previousCMS.values()))
+            {
+                logger.info("No endpoint changes found for CMS members");
+                return replayed;
+            }
+            else
+            {
+                logger.info("Applying temporary address overrides for 
uncommitted CMS endpoint changes");
+                CMSLookup.InitialBuilder builder = CMSLookup.builder(replayed);
+                for (NodeId confirmed : confirmedCMS.keySet())
+                {
+                    InetAddressAndPort prev = previousCMS.get(confirmed);
+                    InetAddressAndPort next = confirmedCMS.get(confirmed);
+                    if (!next.equals(prev))
+                    {
+                        logger.info("Added override for {}, ({} -> {})", 
confirmed, previousCMS.get(confirmed), confirmedCMS.get(confirmed));
+                        builder = builder.withOverride(confirmed, 
previousCMS.get(confirmed), confirmedCMS.get(confirmed));

Review Comment:
   nit; use prev/next here



##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -184,6 +239,128 @@ public static void 
initializeAsNonCmsNode(Function<Processor, Processor> wrapPro
         }
     }
 
+
+    /**
+     * If the broadcast address of this node has changed, we must verify the 
endpoints it knows for
+     * the members of the CMS are still reachable and valid. This is necessary 
for the node to submit
+     * a STARTUP transformation which updates its broadcast address in 
ClusterMetadata.
+     *
+     * If the node is itself a CMS member, it is also a requirement to be able 
to contact a
+     * majority of the other CMS members in order to perform the serial reads 
and writes which
+     * constitute committing to and fetching from the distributed metadata log.
+     *
+     * To do this, we use a simple protocol:
+     * 1. For each CMS member in our replayed ClusterMetadata, ping the 
associated broadcast address
+     *   to query for id of the node at that address. This determines whether 
the endpoint still
+     *   belongs to that same node (which is/was a CMS member).
+     * 2. While we don't have confirmed current addresses for a majority of 
CMS nodes:
+     * 2a. Run discovery to locate as many peer addresses as possible.
+     * 2b. Query every discovered endpoint and ask for its node id.
+     * If we still don't have confirmed addresses for a majority of CMS 
members, go to 2a and
+     * repeat as peers may themselves still be starting up and so may have 
become discoverable.
+     *
+     * This process builds up a mapping of id -> current address for CMS 
members which can then be
+     * used to construct a set of temporary redirects between addresses 
according to ClusterMetadata
+     * and the newly discovered ones.
+     *
+     * As each CMS node with a changed address goes through the startup 
process, it will commit its
+     * STARTUP transformation and the new broadcast address will be found in 
ClusterMetadata. A log
+     * listener is used to react to these transformations by removing 
redundant address overrides
+     * as they are enacted.
+     *
+     * @param nodeId derived from the persisted id of this node from the 
system.peers table
+     * @param replayed current ClusterMetadata after replaying the metadata 
log for startup
+     */
+    private static ClusterMetadata initializeCMSLookup(NodeId nodeId, 
ClusterMetadata replayed)
+    {
+        InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId);
+        InetAddressAndPort newAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        if (newAddress.equals(oldAddress))
+            return replayed;
+
+        Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>();
+        replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, 
replayed.directory.endpoint(id)));
+        Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>();
+
+        Set<InetAddressAndPort> candidates = new 
HashSet<>(previousCMS.values());
+        candidates.add(newAddress);
+
+        int maxRounds = 5;
+        int currentRound = 0;
+        long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4),

Review Comment:
   is 4s enough here? Should we add another "discover survey" config setting?



##########
src/java/org/apache/cassandra/tcm/ClusterMetadata.java:
##########
@@ -202,40 +215,164 @@ private ClusterMetadata(int metadataIdentifier,
         this.schema = schema;
         this.directory = directory;
         this.tokenMap = tokenMap;
-        this.placements = placements;
         this.accordFastPath = accordFastPath;
+        this.placements = placements;
         this.lockedRanges = lockedRanges;
         this.inProgressSequences = inProgressSequences;
         this.consensusMigrationState = consensusMigrationState;
         this.extensions = ImmutableMap.copyOf(extensions);
         this.locator = Locator.usingDirectory(directory);
         this.accordStaleReplicas = accordStaleReplicas;
+        this.cmsMembership = cmsMembership;
+        // Build CMS placement using no-op CMS lookup, i.e. using only 
committed node addresses
+        this.cmsDataPlacement = calculateCMSPlacement(placements, 
cmsMembership, CMSLookup.NO_OP);
     }
 
-    public Set<InetAddressAndPort> fullCMSMembers()
+    public Set<NodeId> fullCMSMemberIds()
     {
-        if (fullCMSEndpoints == null)
-            this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
-        return fullCMSEndpoints;
+        return cmsMembership.fullMembers();
     }
 
-    public Set<NodeId> fullCMSMemberIds()
+    public boolean isCMSMember(InetAddressAndPort endpoint)
     {
-        if (fullCMSIds == null)
-            this.fullCMSIds = 
placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet());
-        return fullCMSIds;
+        if (epoch.isAfter(Epoch.FIRST))
+            return fullCMSMembers().contains(endpoint);
+
+        // special case to handle initialization of the CMS for the first time
+        return epoch.isEqualOrBefore(Epoch.FIRST) && 
cmsDataPlacement.reads.byEndpoint()
+                                                                           
.keySet()
+                                                                           
.contains(endpoint);
+    }
+
+    public Set<InetAddressAndPort> fullCMSMembers()
+    {
+        if (fullCMSEndpoints == null)
+        {
+            EndpointLookup lookup = endpointLookup();
+            fullCMSEndpoints = ImmutableSet.copyOf(cmsMembership.fullMembers()
+                                                                .stream()
+                                                                
.map(lookup::endpoint)
+                                                                
.collect(Collectors.toSet()));
+        }
+        return fullCMSEndpoints;
     }
 
     public EndpointsForRange fullCMSMembersAsReplicas()
     {
         if (fullCMSReplicas == null)
-            fullCMSReplicas = 
placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get();
+        {
+            EndpointLookup lookup = endpointLookup();
+            EndpointsForRange.Builder builder = 
EndpointsForRange.builder(MetaStrategy.entireRange);
+            for (NodeId nodeId : fullCMSMemberIds())
+                builder.add(MetaStrategy.replica(lookup.endpoint(nodeId)));
+            fullCMSReplicas = builder.build();
+        }
         return fullCMSReplicas;
     }
 
-    public boolean isCMSMember(InetAddressAndPort endpoint)
+    // Synchronization is probably not necessary as this should only be called 
by a log listener
+    public synchronized void refreshCMSLookup(ClusterMetadata prev, boolean 
fromSnapshot)
+    {
+        CMSLookup prevLookup = prev.cmsLookup;
+        CMSLookup proposedLookup = prevLookup.rebuild(prev, this, 
fromSnapshot);
+        // rebuild returns `this` if no changes
+        if (prevLookup == proposedLookup)
+            logger.debug("No changes to CMSLookup between epochs {} and {}", 
prev.epoch.getEpoch(), epoch.getEpoch());
+        else
+        {
+            logger.debug("Replacing CMSLookup, Current: {}, Proposed: {}", 
prevLookup, proposedLookup);
+            // recalculate CMS placement using new lookup
+            cmsDataPlacement = calculateCMSPlacement(placements, 
cmsMembership, proposedLookup);
+            // We shouldn't need to null out the other CMS fields when we 
refresh the CMSLookup as that is done in a
+            // precommit listener, meaning nothing should be accessing the 
ClusterMetadata instance yet.
+        }
+
+        // either way, set the lookup for _this_ epoch
+        cmsLookup = proposedLookup;
+    }
+
+    // Synchronization is probably not necessary as this should only be called 
once, during startup
+    public synchronized boolean initCMSLookup(CMSLookup lookup)
+    {
+        logger.info("Initializing CMS lookup on ClusterMetadata at epoch {}", 
epoch.getEpoch());
+        boolean isInitial = cmsLookup.isUninitialized();
+        logger.debug("Current CMS lookup: {}, proposed: {}", cmsLookup, 
lookup);
+        if (isInitial)
+        {
+            cmsLookup = lookup;
+            cmsDataPlacement = calculateCMSPlacement(placements, 
cmsMembership, lookup);
+            // We need to null out these fields when we init the CMSLookup 
because post-commit listeners may have
+            // accessed them during log replay, which happens before this.
+            fullCMSReplicas = null;
+            fullCMSEndpoints = null;
+        }
+        else
+            logger.warn("Invalid CMSLookup state for initialization: {}", 
cmsLookup);
+        return isInitial;
+    }
+
+    private DataPlacement calculateCMSPlacement(DataPlacements placements, 
CMSMembership cms, CMSLookup lookup)
     {
-        return fullCMSMembers().contains(endpoint);
+        if (epoch.isBefore(Epoch.FIRST) || 
schema.getKeyspaces().get(SchemaConstants.METADATA_KEYSPACE_NAME).isEmpty())
+            return DataPlacement.empty();
+
+        if (directory.isEmpty())
+        {
+            if (epoch.is(Epoch.FIRST))
+            {
+                // PRE_INITIALIZE_CMS: placements need to be hardcoded to the 
local address so that the subsequent
+                // INITIALIZE_CMS can be committed
+                Replica localReplica = 
MetaStrategy.replica(FBUtilities.getBroadcastAddressAndPort());
+                return DataPlacement.builder()
+                                    .withReadReplica(Epoch.FIRST, localReplica)
+                                    .withWriteReplica(Epoch.FIRST, 
localReplica)
+                                    .build();
+            }
+            else
+            {
+                // This cluster did not previously upgrade from a gossip based 
version (i.e. pre-6.0) but did at some point
+                // run a version prior to MetadataVersion.V7 where we started 
to encode CMS membership directly. This

Review Comment:
   V9



##########
src/java/org/apache/cassandra/tcm/ClusterMetadata.java:
##########
@@ -1108,6 +1339,26 @@ public ClusterMetadata deserialize(DataInputPlus in, 
Version version) throws IOE
                 value.deserialize(in, version);
                 extensions.put(key, value);
             }
+
+            CMSMembership cmsMembership = CMSMembership.EMPTY;
+            if (version.isAtLeast(Version.V9))
+                cmsMembership = CMSMembership.serializer.deserialize(in, 
version);
+            else
+            {
+                KeyspaceMetadata metadataKs = 
schema.getKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME);
+                if (!dir.isEmpty())
+                {
+                    // Pre-V9 the membership of the CMS was always inferred 
from the placement of the distributed
+                    // metadata keyspace. This is true for the initial cluster 
metadata created during upgrade from
+                    // gossip and for subsequent epochs. The endpoints in the 
placement must belong to registered nodes,
+                    // so we can derive the CMSMembership using the data 
placement and directory.
+                    DataPlacement placement = 
placements.get(metadataKs.params.replication);
+                    cmsMembership = CMSMembership.reconstruct(placement, dir);
+                    placements = 
placements.unbuild().without(metadataKs.params.replication).build();

Review Comment:
   I think this is unnecessary - we do the same thing directly after the if stmt



##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -184,6 +239,128 @@ public static void 
initializeAsNonCmsNode(Function<Processor, Processor> wrapPro
         }
     }
 
+
+    /**
+     * If the broadcast address of this node has changed, we must verify the 
endpoints it knows for
+     * the members of the CMS are still reachable and valid. This is necessary 
for the node to submit
+     * a STARTUP transformation which updates its broadcast address in 
ClusterMetadata.
+     *
+     * If the node is itself a CMS member, it is also a requirement to be able 
to contact a
+     * majority of the other CMS members in order to perform the serial reads 
and writes which
+     * constitute committing to and fetching from the distributed metadata log.
+     *
+     * To do this, we use a simple protocol:
+     * 1. For each CMS member in our replayed ClusterMetadata, ping the 
associated broadcast address
+     *   to query for id of the node at that address. This determines whether 
the endpoint still
+     *   belongs to that same node (which is/was a CMS member).
+     * 2. While we don't have confirmed current addresses for a majority of 
CMS nodes:
+     * 2a. Run discovery to locate as many peer addresses as possible.
+     * 2b. Query every discovered endpoint and ask for its node id.
+     * If we still don't have confirmed addresses for a majority of CMS 
members, go to 2a and
+     * repeat as peers may themselves still be starting up and so may have 
become discoverable.
+     *
+     * This process builds up a mapping of id -> current address for CMS 
members which can then be
+     * used to construct a set of temporary redirects between addresses 
according to ClusterMetadata
+     * and the newly discovered ones.
+     *
+     * As each CMS node with a changed address goes through the startup 
process, it will commit its
+     * STARTUP transformation and the new broadcast address will be found in 
ClusterMetadata. A log
+     * listener is used to react to these transformations by removing 
redundant address overrides
+     * as they are enacted.
+     *
+     * @param nodeId derived from the persisted id of this node from the 
system.peers table
+     * @param replayed current ClusterMetadata after replaying the metadata 
log for startup
+     */
+    private static ClusterMetadata initializeCMSLookup(NodeId nodeId, 
ClusterMetadata replayed)
+    {
+        InetAddressAndPort oldAddress = replayed.directory.endpoint(nodeId);
+        InetAddressAndPort newAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        if (newAddress.equals(oldAddress))
+            return replayed;
+
+        Map<NodeId, InetAddressAndPort> previousCMS = new HashMap<>();
+        replayed.fullCMSMemberIds().forEach(id -> previousCMS.put(id, 
replayed.directory.endpoint(id)));
+        Map<NodeId, InetAddressAndPort> confirmedCMS = new HashMap<>();
+
+        Set<InetAddressAndPort> candidates = new 
HashSet<>(previousCMS.values());
+        candidates.add(newAddress);
+
+        int maxRounds = 5;
+        int currentRound = 0;
+        long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4),
+                                       
DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / maxRounds);
+        // TODO a non-CMS node only needs to be able to contact a single CMS 
member to commit its STARTUP

Review Comment:
   should we fix this? It feels like we'll most often discover the full CMS if 
its up
   
   and if it is not yet up, it might be better to wait here before trying to 
commit Startup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to