jacek-lewandowski commented on code in PR #2807:
URL: https://github.com/apache/cassandra/pull/2807#discussion_r1362467675


##########
src/java/org/apache/cassandra/tcm/ClusterMetadata.java:
##########
@@ -0,0 +1,986 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.extensions.ExtensionKey;
+import org.apache.cassandra.tcm.extensions.ExtensionValue;
+import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener;
+import org.apache.cassandra.tcm.log.LocalLog;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
+import org.apache.cassandra.tcm.ownership.PlacementForRange;
+import org.apache.cassandra.tcm.ownership.TokenMap;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+
+/**
+ * Represents all transactional metadata of the cluster. It is versioned, 
immutable and serializable.
+ * CMS guarantees that all the nodes in the cluster see the same cluster 
metadata for the given epoch.
+ * When the metadata gets updated by a node, the new version must be 
associated with the new epoch.
+ *
+ * Epochs are groupped into periods. The number of epoch that can fit into a 
period is defined by
+ * {@link DatabaseDescriptor#getMetadataSnapshotFrequency()}. When a period is 
completed, its number is incremented by
+ * {@link LocalLog#snapshotListener()}, and then, a snapshot is created by the 
{@link MetadataSnapshotListener}.
+ * Both are triggered by the {@link LocalLog#processPendingInternal()} method, 
which processes the log entries.
+ *
+ * @see MetadataSnapshots for more information about cluster metadata snapshots
+ */
+public class ClusterMetadata
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final Epoch epoch;
+    public final long period;
+    public final boolean lastInPeriod;
+    public final IPartitioner partitioner;       // Set during (initial) 
construction and not modifiable via Transformer
+
+    public final DistributedSchema schema;
+    public final Directory directory;
+    public final TokenMap tokenMap;
+    public final DataPlacements placements;
+    public final LockedRanges lockedRanges;
+    public final InProgressSequences inProgressSequences;
+    public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
+    private final Set<Replica> fullCMSReplicas;
+    private final Set<InetAddressAndPort> fullCMSEndpoints;
+
+    public ClusterMetadata(IPartitioner partitioner)
+    {
+        this(partitioner, Directory.EMPTY);
+    }
+
+    @VisibleForTesting
+    public ClusterMetadata(IPartitioner partitioner, Directory directory)
+    {
+        this(partitioner, directory, DistributedSchema.first());
+    }
+
+    @VisibleForTesting
+    public ClusterMetadata(IPartitioner partitioner, Directory directory, 
DistributedSchema schema)
+    {
+        this(Epoch.EMPTY,
+             Period.EMPTY,
+             true,
+             partitioner,
+             schema,
+             directory,
+             new TokenMap(partitioner),
+             DataPlacements.EMPTY,
+             LockedRanges.EMPTY,
+             InProgressSequences.EMPTY,
+             ImmutableMap.of());
+    }
+
+    public ClusterMetadata(Epoch epoch,
+                           long period,
+                           boolean lastInPeriod,
+                           IPartitioner partitioner,
+                           DistributedSchema schema,
+                           Directory directory,
+                           TokenMap tokenMap,
+                           DataPlacements placements,
+                           LockedRanges lockedRanges,
+                           InProgressSequences inProgressSequences,
+                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
+    {
+        // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
+        //  ClusterMetadata in the long term. We need to consider how the 
actual components of metadata can be evolved
+        //  over time.
+        assert tokenMap == null || 
tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner 
for TokenMap doesn't match base partitioner";
+        this.epoch = epoch;
+        this.period = period;
+        this.lastInPeriod = lastInPeriod;
+        this.partitioner = partitioner;
+        this.schema = schema;
+        this.directory = directory;
+        this.tokenMap = tokenMap;
+        this.placements = placements;
+        this.lockedRanges = lockedRanges;
+        this.inProgressSequences = inProgressSequences;
+        this.extensions = ImmutableMap.copyOf(extensions);
+
+        this.fullCMSReplicas = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().flattenValues());
+        this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet());
+    }
+
+    public Set<InetAddressAndPort> fullCMSMembers()
+    {
+        return fullCMSEndpoints;
+    }
+
+    public Set<Replica> fullCMSMembersAsReplicas()
+    {
+        return fullCMSReplicas;
+    }
+
+    public boolean isCMSMember(InetAddressAndPort endpoint)
+    {
+        return fullCMSMembers().contains(endpoint);
+    }
+
+    public Transformer transformer()
+    {
+        return new Transformer(this, this.nextEpoch(), false);
+    }
+
+    public Transformer transformer(boolean sealPeriod)
+    {
+        return new Transformer(this, this.nextEpoch(), sealPeriod);
+    }
+
+    public ClusterMetadata forceEpoch(Epoch epoch)
+    {
+        // In certain circumstances, the last modified epoch of the individual
+        // components may have been updated beyond the epoch we're specifying 
here.
+        // An example is the execution of an UnsafeJoin transformation, where 
the
+        // sub-steps (Start/Mid/Finish) are executed in series, each updating a
+        // single ClusterMetadata and its individual components. At the end of 
that
+        // sequence, the CM epoch is then set forcibly to ensure the 
UnsafeJoin only
+        // increments the published epoch by one. As each component has its 
own last
+        // modified epoch, we may also need to coerce those, but only if they 
are
+        // greater than the epoch we're forcing here.
+        return new ClusterMetadata(epoch,
+                                   period,
+                                   lastInPeriod,
+                                   partitioner,
+                                   capLastModified(schema, epoch),
+                                   capLastModified(directory, epoch),
+                                   capLastModified(tokenMap, epoch),
+                                   capLastModified(placements, epoch),
+                                   capLastModified(lockedRanges, epoch),
+                                   capLastModified(inProgressSequences, epoch),
+                                   capLastModified(extensions, epoch));
+    }
+
+    public ClusterMetadata forcePeriod(long period)
+    {
+        return new ClusterMetadata(epoch,
+                                   period,
+                                   false,
+                                   partitioner,
+                                   schema,
+                                   directory,
+                                   tokenMap,
+                                   placements,
+                                   lockedRanges,
+                                   inProgressSequences,
+                                   extensions);
+    }
+
+    private static Map<ExtensionKey<?,?>, ExtensionValue<?>> 
capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch 
maxEpoch)
+    {
+        Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>();
+        original.forEach((key, value) -> {
+            ExtensionValue<?> newValue = value == null || 
value.lastModified().isEqualOrBefore(maxEpoch)
+                                         ? value
+                                         : 
(ExtensionValue<?>)value.withLastModified(maxEpoch);
+            updated.put(key, newValue);
+        });
+        return updated;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <V> V capLastModified(MetadataValue<V> value, Epoch 
maxEpoch)
+    {
+        return value == null || value.lastModified().isEqualOrBefore(maxEpoch)
+               ? (V)value
+               : value.withLastModified(maxEpoch);
+    }
+
+    public Epoch nextEpoch()
+    {
+        return epoch.nextEpoch();
+    }
+
+    public long nextPeriod()
+    {
+        return lastInPeriod ? period + 1 : period;
+    }
+
+    public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
+    {
+        List<NodeId> joining = new ArrayList<>();
+        List<NodeId> leaving = new ArrayList<>();
+        List<NodeId> moving = new ArrayList<>();
+
+        for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet())
+        {
+            switch (entry.getValue())
+            {
+                case BOOTSTRAPPING:
+                    joining.add(entry.getKey());
+                    break;
+                case LEAVING:
+                    leaving.add(entry.getKey());
+                    break;
+                case MOVING:
+                    moving.add(entry.getKey());
+                    break;
+            }
+        }
+
+        Transformer t = transformer();
+        for (NodeId node: joining)
+        {
+            InProgressSequence<?> joinSequence = inProgressSequences.get(node);
+            assert joinSequence instanceof BootstrapAndJoin;
+            Set<Token> tokens = 
((BootstrapAndJoin)joinSequence).finishJoin.tokens;
+            t = t.proposeToken(node, tokens);
+        }
+        for (NodeId node : leaving)
+            t = t.proposeRemoveNode(node);
+        // todo: add tests for move!
+        for (NodeId node : moving)
+            t = t.proposeRemoveNode(node).proposeToken(node, 
tokenMap.tokens(node));
+
+        ClusterMetadata proposed = t.build().metadata;
+        return ClusterMetadataService.instance()
+                                     .placementProvider()
+                                     .calculatePlacements(epoch, 
proposed.tokenMap.toRanges(), proposed, Keyspaces.of(ksm))
+                                     .get(ksm.params.replication);
+    }
+
+    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
+    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token)
+    {
+        PlacementForRange writes = 
placements.get(ksm.params.replication).writes;
+        PlacementForRange reads = placements.get(ksm.params.replication).reads;
+        return !reads.forToken(token).equals(writes.forToken(token));
+    }
+
+    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
+    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, 
InetAddressAndPort endpoint)
+    {
+        PlacementForRange writes = 
placements.get(ksm.params.replication).writes;
+        PlacementForRange reads = placements.get(ksm.params.replication).reads;
+        return 
!writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
+    }
+
+    public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
+    {
+        return 
placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges();
+    }
+
+    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
+    public Map<Range<Token>, VersionedEndpoints.ForRange> 
pendingRanges(KeyspaceMetadata metadata)
+    {
+        Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>();
+        PlacementForRange writes = 
placements.get(metadata.params.replication).writes;
+        PlacementForRange reads = 
placements.get(metadata.params.replication).reads;
+
+        // first, pending ranges as the result of range splitting or merging
+        // i.e. new ranges being created through join/leave
+        List<Range<Token>> pending = new ArrayList<>(writes.ranges());
+        pending.removeAll(reads.ranges());
+        for (Range<Token> p : pending)
+            map.put(p, 
placements.get(metadata.params.replication).writes.forRange(p));
+
+        // next, ranges where the ranges themselves are not changing, but the 
replicas are
+        // i.e. replacement or RF increase
+        writes.replicaGroups().forEach((range, endpoints) -> {
+            VersionedEndpoints.ForRange readGroup = reads.forRange(range);
+            if (!readGroup.equals(endpoints))
+                map.put(range, 
VersionedEndpoints.forRange(endpoints.lastModified(),
+                                                           
endpoints.get().filter(r -> !readGroup.get().contains(r))));
+        });
+
+        return map;
+    }
+
+    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending endpoints
+    public VersionedEndpoints.ForToken pendingEndpointsFor(KeyspaceMetadata 
metadata, Token t)
+    {
+        VersionedEndpoints.ForToken writeEndpoints = 
placements.get(metadata.params.replication).writes.forToken(t);
+        VersionedEndpoints.ForToken readEndpoints = 
placements.get(metadata.params.replication).reads.forToken(t);
+        EndpointsForToken.Builder endpointsForToken = 
writeEndpoints.get().newBuilder(writeEndpoints.size() - readEndpoints.size());
+
+        for (Replica writeReplica : writeEndpoints.get())
+        {
+            if (!readEndpoints.get().contains(writeReplica))
+                endpointsForToken.add(writeReplica);
+        }
+        return VersionedEndpoints.forToken(writeEndpoints.lastModified(), 
endpointsForToken.build());
+    }
+
+    /**
+     * Builds a new cluster metadata based on top of the provided one, 
registering the keys of all the overridden
+     * items.
+     */
+    public static class Transformer
+    {
+        private final ClusterMetadata base;
+        private final Epoch epoch;
+        private final long period;
+        private final boolean lastInPeriod;
+        private final IPartitioner partitioner;
+        private DistributedSchema schema;
+        private Directory directory;
+        private TokenMap tokenMap;
+        private DataPlacements placements;
+        private LockedRanges lockedRanges;
+        private InProgressSequences inProgressSequences;
+        private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
+        private final Set<MetadataKey> modifiedKeys;
+
+        private Transformer(ClusterMetadata metadata, Epoch epoch, boolean 
lastInPeriod)
+        {
+            this.base = metadata;
+            this.epoch = epoch;
+            this.period = metadata.nextPeriod();
+            this.lastInPeriod = lastInPeriod;
+            this.partitioner = metadata.partitioner;
+            this.schema = metadata.schema;
+            this.directory = metadata.directory;
+            this.tokenMap = metadata.tokenMap;
+            this.placements = metadata.placements;
+            this.lockedRanges = metadata.lockedRanges;
+            this.inProgressSequences = metadata.inProgressSequences;
+            extensions = new HashMap<>(metadata.extensions);
+            modifiedKeys = new HashSet<>();
+        }
+
+        public Transformer with(DistributedSchema schema)
+        {
+            this.schema = schema;
+            return this;
+        }
+
+        public Transformer with(Directory directory)
+        {
+            this.directory = directory;
+            return this;
+        }
+
+        public Transformer register(NodeAddresses addresses, Location 
location, NodeVersion version)
+        {
+            directory = directory.with(addresses, location, version);
+            return this;
+        }
+
+        public Transformer unregister(NodeId nodeId)
+        {
+            directory = directory.without(nodeId);
+            return this;
+        }
+
+        public Transformer withNewAddresses(NodeId nodeId, NodeAddresses 
addresses)
+        {
+            directory = directory.withNodeAddresses(nodeId, addresses);
+            return this;
+        }
+
+        public Transformer withVersion(NodeId nodeId, NodeVersion version)
+        {
+            directory = directory.withNodeVersion(nodeId, version);
+            return this;
+        }
+
+        public Transformer withNodeState(NodeId id, NodeState state)
+        {
+            directory = directory.withNodeState(id, state);
+            return this;
+        }
+
+        public Transformer proposeToken(NodeId nodeId, Collection<Token> 
tokens)
+        {
+            tokenMap = tokenMap.assignTokens(nodeId, tokens);
+            return this;
+        }
+
+        public Transformer addToRackAndDC(NodeId nodeId)
+        {
+            directory = directory.withRackAndDC(nodeId);
+            return this;
+        }
+
+        public Transformer unproposeTokens(NodeId nodeId)
+        {
+            tokenMap = tokenMap.unassignTokens(nodeId);
+            directory = directory.withoutRackAndDC(nodeId);
+            return this;
+        }
+
+        public Transformer moveTokens(NodeId nodeId, Collection<Token> tokens)
+        {
+            tokenMap = tokenMap.unassignTokens(nodeId)
+                               .assignTokens(nodeId, tokens);
+            return this;
+        }
+
+        public Transformer join(NodeId nodeId)
+        {
+            directory = directory.withNodeState(nodeId, NodeState.JOINED);
+            return this;
+        }
+
+        public Transformer replaced(NodeId replaced, NodeId replacement)
+        {
+            Collection<Token> transferringTokens = tokenMap.tokens(replaced);
+            tokenMap = tokenMap.unassignTokens(replaced)
+                               .assignTokens(replacement, transferringTokens);
+            directory = directory.without(replaced)
+                                 .withRackAndDC(replacement)
+                                 .withNodeState(replacement, NodeState.JOINED);
+            return this;
+        }
+
+        public Transformer proposeRemoveNode(NodeId id)
+        {
+            tokenMap = tokenMap.unassignTokens(id);
+            return this;
+        }
+
+        public Transformer left(NodeId id)
+        {
+            tokenMap = tokenMap.unassignTokens(id);
+            directory = directory.withNodeState(id, NodeState.LEFT)
+                                 .withoutRackAndDC(id);
+            return this;
+        }
+
+        public Transformer with(DataPlacements placements)
+        {
+            this.placements = placements;
+            return this;
+        }
+
+        public Transformer with(LockedRanges lockedRanges)
+        {
+            this.lockedRanges = lockedRanges;
+            return this;
+        }
+
+        public Transformer with(InProgressSequences sequences)
+        {
+            this.inProgressSequences = sequences;
+            return this;
+        }
+
+        public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj)
+        {
+            if (MetadataKeys.CORE_METADATA.contains(key))
+                throw new IllegalArgumentException("Core cluster metadata 
objects should be addressed directly, " +
+                                                   "not using the associated 
MetadataKey");
+
+            if (!key.valueType.isInstance(obj))
+                throw new IllegalArgumentException("Value of type " + 
obj.getClass() +
+                                                   " is incompatible with type 
for key " + key +
+                                                   " (" + key.valueType + ")");
+
+            extensions.put(key, obj);
+            modifiedKeys.add(key);
+            return this;
+        }
+
+        public Transformer withIfAbsent(ExtensionKey<?, ?> key, 
ExtensionValue<?> obj)
+        {
+            if (extensions.containsKey(key))
+                return this;
+            return with(key, obj);
+        }
+
+        public Transformer without(ExtensionKey<?, ?> key)
+        {
+            if (MetadataKeys.CORE_METADATA.contains(key))
+                throw new IllegalArgumentException("Core cluster metadata 
objects should be addressed directly, " +
+                                                   "not using the associated 
MetadataKey");
+            if (extensions.remove(key) != null)
+                modifiedKeys.add(key);
+            return this;
+        }
+
+        public Transformed build()
+        {
+            // Process extension first as a) these are actually mutable and b) 
they are added to the set of
+            // modified keys when added/updated/removed
+            for (MetadataKey key : modifiedKeys)
+            {
+                ExtensionValue<?> mutable = extensions.get(key);
+                if (null != mutable)
+                    mutable.withLastModified(epoch);
+            }
+
+            if (schema != base.schema)

Review Comment:
   why do we compare all those things by references instead of values (like 
dumpDiff does)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to