beobal commented on code in PR #4149:
URL: https://github.com/apache/cassandra/pull/4149#discussion_r2230736993


##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version

Review Comment:
   I know we did this elsewhere in `TransformClusterMetadataHelper` but I think 
we should use this version to deserialize the partitioner. 
   We may want to specify a different version to for the commands which write 
an updated CM back out, but I'm not sure whether the best default is 
`NodeVersion.CURRENT.serializationVersion()` or the one read from the input 
file. wdyt? 
   



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)

Review Comment:
   `addtocms` and `assigntokens` take an endpoint address to identify the node, 
whereas `forgetnode` and `forcejoin` take a node/host id. We should be 
consistent across the subcommands and I would recommend we use node ids 
everywhere. 
   At the moment, it's possible to add an endpoint to the CMS which doesn't 
belong to any node in the cluster, but that will change with 
https://issues.apache.org/jira/browse/CASSANDRA-20736 (patch for that is coming 
soon). 



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)
+                   .withWriteReplica(metadata.epoch, newCMS);
+            return 
metadata.transformer().with(metadata.placements.unbuild().with(metaParams,
+                                                                               
   builder.build())
+                                                                  .build())
+                           .build().metadata;
+        }
+    }
+
+    @Command(name = "assigntokens", description = "Assigns a token for given 
instance")
+    public static class AssignTokens extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ip;
+
+        @Option(name = { "-t", "--token" }, description = "Token to assign. 
Pass it multiple times to assign multiple tokens to node.", required = true)
+        private List<String> tokenList = new ArrayList<>();
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ip);
+            NodeId nodeId = metadata.directory.peerId(nodeAddress);
+            if (nodeId == null)
+            {
+                throw new IllegalArgumentException("Cassandra node with 
address " + ip + " does not exist.");
+            }
+
+            Token.TokenFactory tokenFactory = 
metadata.partitioner.getTokenFactory();
+            List<Token> tokens = 
tokenList.stream().map(tokenFactory::fromString).collect(Collectors.toList());
+            ClusterMetadata updateMetadata = 
metadata.transformer().proposeToken(nodeId, tokens).build().metadata;
+            writeMetadata(output, updateMetadata, outputFilePath);
+        }
+    }
+
+    @Command(name = "describe", description = "Describes the cluster metadata")
+    public static class Describe extends ClusterMetadataToolCmd
+    {
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();

Review Comment:
   It might be useful to add an option here to print the full `toString`. That 
would be overkill in the vast majority of cases, but could be handy for 
debugging. If we do add that, we should make it clear that the content/format 
of the output is not stable & shouldn't be relied on by tooling etc.  



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)
+                   .withWriteReplica(metadata.epoch, newCMS);
+            return 
metadata.transformer().with(metadata.placements.unbuild().with(metaParams,
+                                                                               
   builder.build())
+                                                                  .build())
+                           .build().metadata;
+        }
+    }
+
+    @Command(name = "assigntokens", description = "Assigns a token for given 
instance")
+    public static class AssignTokens extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ip;
+
+        @Option(name = { "-t", "--token" }, description = "Token to assign. 
Pass it multiple times to assign multiple tokens to node.", required = true)
+        private List<String> tokenList = new ArrayList<>();
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ip);
+            NodeId nodeId = metadata.directory.peerId(nodeAddress);
+            if (nodeId == null)
+            {
+                throw new IllegalArgumentException("Cassandra node with 
address " + ip + " does not exist.");
+            }
+
+            Token.TokenFactory tokenFactory = 
metadata.partitioner.getTokenFactory();
+            List<Token> tokens = 
tokenList.stream().map(tokenFactory::fromString).collect(Collectors.toList());
+            ClusterMetadata updateMetadata = 
metadata.transformer().proposeToken(nodeId, tokens).build().metadata;
+            writeMetadata(output, updateMetadata, outputFilePath);
+        }
+    }
+
+    @Command(name = "describe", description = "Describes the cluster metadata")
+    public static class Describe extends ClusterMetadataToolCmd
+    {
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            String members = 
metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
+            output.out.printf("Cluster Metadata Service:%n");
+            output.out.printf("Members: %s%n", members);
+            output.out.printf("Needs reconfiguration: %s%n", 
needsReconfiguration(metadata));
+            output.out.printf("Service State: %s%n", 
ClusterMetadataService.state(metadata));
+            output.out.printf("Epoch: %s%n", metadata.epoch.getEpoch());
+            output.out.printf("Replication factor: %s%n", 
ReplicationParams.meta(metadata).toString());
+        }
+    }
+
+    @Command(name = "forcejoin", description = "Forces a node to move to 
JOINED stated")
+    public static class ForceJoin extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-id", "--node-id" }, description = "Node ID. It can 
be integer ID assigned to node or the node uuid", required = true)
+        private String id;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            NodeId nodeId = NodeId.fromString(id);
+
+            if (!metadata.directory.peerIds().contains(nodeId))
+            {
+                throw new IllegalArgumentException("Node with id " + id + " 
does not exist.");
+            }
+
+            ClusterMetadata updatedMetadata = 
metadata.transformer().join(nodeId).build().metadata;
+            writeMetadata(output, updatedMetadata, outputFilePath);
+        }
+    }
+
+    @Command(name = "forgetnode", description = "Removes a nodes from given 
cluster metadata")
+    public static class ForgetNode extends ClusterMetadataToolCmd

Review Comment:
   This is 100% a tool for superusers, but I'm a bit concerned that this can 
put the CM/cluster into a bad state which wouldn't be recoverable using the 
tool itself as it removes the node from the directory, but it doesn't update 
data placements/CMS membership/inprogress sequences etc. If an operator 
inadvertently removed the wrong node here, the only way to repair CM would be 
to revert to an earlier version entirely. 
   To make this a bit safer, if the node is `JOINED`, we could make this behave 
like an offline `assassinate`.  If there are inflight operations for it (i.e. 
the node is moving/leaving/joining) we could cancel the op and revert any 
changes already made (i.e. what `CancelInProgressSequence::execute` does). 



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)
+                   .withWriteReplica(metadata.epoch, newCMS);
+            return 
metadata.transformer().with(metadata.placements.unbuild().with(metaParams,
+                                                                               
   builder.build())
+                                                                  .build())
+                           .build().metadata;
+        }
+    }
+
+    @Command(name = "assigntokens", description = "Assigns a token for given 
instance")
+    public static class AssignTokens extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ip;
+
+        @Option(name = { "-t", "--token" }, description = "Token to assign. 
Pass it multiple times to assign multiple tokens to node.", required = true)
+        private List<String> tokenList = new ArrayList<>();
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ip);
+            NodeId nodeId = metadata.directory.peerId(nodeAddress);
+            if (nodeId == null)
+            {
+                throw new IllegalArgumentException("Cassandra node with 
address " + ip + " does not exist.");
+            }
+
+            Token.TokenFactory tokenFactory = 
metadata.partitioner.getTokenFactory();
+            List<Token> tokens = 
tokenList.stream().map(tokenFactory::fromString).collect(Collectors.toList());
+            ClusterMetadata updateMetadata = 
metadata.transformer().proposeToken(nodeId, tokens).build().metadata;

Review Comment:
   This adds a token to any existing tokens already owned by the node. So in a 
single-token config, using this command will result in the target node owning 
multiple tokens. There is no way to remove/unassign tokens at the moment, so 
that state isn't really fixable with the tooling alone.
   I'm not sure this is going to be very intuitive, maybe it should fully 
replace the tokens currently assigned?
   e.g. if the intention is to emulate a single-token move, the operator would 
just supply the new token. To add a new token to an existing set, *all* the 
desired tokens, including those already assigned, to the node would need to be 
passed.



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)

Review Comment:
   nit: as every subcommand requires an input file, does it make sense to make 
this an `@Argument` instead of an `@Option`?



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)
+                   .withWriteReplica(metadata.epoch, newCMS);
+            return 
metadata.transformer().with(metadata.placements.unbuild().with(metaParams,
+                                                                               
   builder.build())
+                                                                  .build())
+                           .build().metadata;
+        }
+    }
+
+    @Command(name = "assigntokens", description = "Assigns a token for given 
instance")
+    public static class AssignTokens extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ip;
+
+        @Option(name = { "-t", "--token" }, description = "Token to assign. 
Pass it multiple times to assign multiple tokens to node.", required = true)
+        private List<String> tokenList = new ArrayList<>();
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ip);
+            NodeId nodeId = metadata.directory.peerId(nodeAddress);
+            if (nodeId == null)
+            {
+                throw new IllegalArgumentException("Cassandra node with 
address " + ip + " does not exist.");
+            }
+
+            Token.TokenFactory tokenFactory = 
metadata.partitioner.getTokenFactory();
+            List<Token> tokens = 
tokenList.stream().map(tokenFactory::fromString).collect(Collectors.toList());
+            ClusterMetadata updateMetadata = 
metadata.transformer().proposeToken(nodeId, tokens).build().metadata;
+            writeMetadata(output, updateMetadata, outputFilePath);
+        }
+    }
+
+    @Command(name = "describe", description = "Describes the cluster metadata")
+    public static class Describe extends ClusterMetadataToolCmd
+    {
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            String members = 
metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
+            output.out.printf("Cluster Metadata Service:%n");
+            output.out.printf("Members: %s%n", members);
+            output.out.printf("Needs reconfiguration: %s%n", 
needsReconfiguration(metadata));
+            output.out.printf("Service State: %s%n", 
ClusterMetadataService.state(metadata));
+            output.out.printf("Epoch: %s%n", metadata.epoch.getEpoch());
+            output.out.printf("Replication factor: %s%n", 
ReplicationParams.meta(metadata).toString());
+        }
+    }
+
+    @Command(name = "forcejoin", description = "Forces a node to move to 
JOINED stated")

Review Comment:
   How/when do you see this subcommand being used?
    
   Forcing the states like this could put the cluster into a state which makes 
it harder to recover as it allows a node to be `JOINED` without actually having 
any tokens which is an unexpected state. Although that doesn't seem to actually 
cause any problems for C* , it does result in a  missing `STATUS_WITH_PORT` 
gossip state for the node which was forcibly joined, which may cause issues 
elsewhere.
   
   One thing to note, if the node in question is in the middle of a 
join/move/leave operation, I suspect this will leave the operation in a state 
where it can't progress as it will not expect the node to be `JOINED`. This is 
probably easier to mitigate than the similar issue with `forgetnode` as the 
in-flight operation should be cancellable after the cluster has been 
re-initialised with the modified CM. 
     
   If you have a specific scenario in mind where this would be useful, perhaps 
we should add restrictions tailored to that?  



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)
+                   .withWriteReplica(metadata.epoch, newCMS);
+            return 
metadata.transformer().with(metadata.placements.unbuild().with(metaParams,
+                                                                               
   builder.build())
+                                                                  .build())
+                           .build().metadata;
+        }
+    }
+
+    @Command(name = "assigntokens", description = "Assigns a token for given 
instance")
+    public static class AssignTokens extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ip;
+
+        @Option(name = { "-t", "--token" }, description = "Token to assign. 
Pass it multiple times to assign multiple tokens to node.", required = true)
+        private List<String> tokenList = new ArrayList<>();
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ip);
+            NodeId nodeId = metadata.directory.peerId(nodeAddress);
+            if (nodeId == null)
+            {
+                throw new IllegalArgumentException("Cassandra node with 
address " + ip + " does not exist.");
+            }
+
+            Token.TokenFactory tokenFactory = 
metadata.partitioner.getTokenFactory();
+            List<Token> tokens = 
tokenList.stream().map(tokenFactory::fromString).collect(Collectors.toList());
+            ClusterMetadata updateMetadata = 
metadata.transformer().proposeToken(nodeId, tokens).build().metadata;
+            writeMetadata(output, updateMetadata, outputFilePath);
+        }
+    }
+
+    @Command(name = "describe", description = "Describes the cluster metadata")
+    public static class Describe extends ClusterMetadataToolCmd
+    {
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();

Review Comment:
   ```
   @Option(name = { "-r", "--raw" }, description = "Output in raw string form. 
This is not a stable output format and is highly likely to change between 
releases")
   private boolean raw;
   ```
   



##########
src/java/org/apache/cassandra/tools/CMSOfflineTool.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Command;
+import io.airlift.airline.Help;
+import io.airlift.airline.Option;
+import io.airlift.airline.OptionType;
+import io.airlift.airline.ParseException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
+/**
+ * Offline tool to print or update cluster metadata dump.
+ */
+public class CMSOfflineTool
+{
+
+    private static final String TOOL_NAME = "cmsofflinetool";
+    private final Output output;
+
+    public CMSOfflineTool(Output output)
+    {
+        this.output = output;
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        //noinspection UseOfSystemOutOrSystemErr
+        System.exit(new CMSOfflineTool(new Output(System.out, 
System.err)).execute(args));
+    }
+
+    public int execute(String... args)
+    {
+        Cli.CliBuilder<ClusterMetadataToolRunnable> builder = 
Cli.builder(TOOL_NAME);
+
+        List<Class<? extends ClusterMetadataToolRunnable>> commands = new 
ArrayList<>()
+        {{
+            add(ClusterMetadataToolHelp.class);
+            add(AddToCMS.class);
+            add(AssignTokens.class);
+            add(Describe.class);
+            add(ForceJoin.class);
+            add(ForgetNode.class);
+            add(PrintDataPlacements.class);
+            add(PrintDirectoryCmd.class);
+        }};
+
+        builder.withDescription("Offline tool to print or update cluster 
metadata dump")
+               .withDefaultCommand(ClusterMetadataToolHelp.class)
+               .withCommands(commands);
+
+        Cli<ClusterMetadataToolRunnable> parser = builder.build();
+        int status = 0;
+        try
+        {
+            ClusterMetadataToolRunnable parse = parser.parse(args);
+            parse.run(output);
+        }
+        catch (ParseException pe)
+        {
+            status = 1;
+            badUse(pe);
+        }
+        catch (Exception e)
+        {
+            status = 2;
+            err(e);
+        }
+        return status;
+    }
+
+
+    private void badUse(Exception e)
+    {
+        output.err.println(TOOL_NAME + ": " + e.getMessage());
+        output.err.printf("See '%s help' or '%s help <command>'.%n", 
TOOL_NAME, TOOL_NAME);
+    }
+
+    private void err(Exception e)
+    {
+        output.err.println("error: " + e.getMessage());
+        output.err.println("-- StackTrace --");
+        output.err.println(getStackTraceAsString(e));
+    }
+
+
+    interface ClusterMetadataToolRunnable
+    {
+        void run(Output output) throws IOException;
+    }
+
+    public static abstract class ClusterMetadataToolCmd implements 
ClusterMetadataToolRunnable
+    {
+        @Option(type = OptionType.COMMAND, name = { "-f", "--file" }, 
description = "Cluster metadata dump file path", required = true)
+        protected String metadataDumpPath;
+
+        @Option(type = OptionType.COMMAND, name = { "-sv", 
"--serialization-version" }, description = "Serialization version to use")
+        private Version serializationVersion;
+
+
+        public ClusterMetadata parseClusterMetadata() throws IOException
+        {
+            File file = new File(metadataDumpPath);
+            if (!file.exists())
+            {
+                throw new IllegalArgumentException("Cluster metadata dump file 
" + metadataDumpPath + " does not exist");
+            }
+
+            Version serializationVersion = 
NodeVersion.CURRENT.serializationVersion();
+            // Make sure the partitioner we use to manipulate the metadata is 
the same one used to generate it
+            IPartitioner partitioner;
+            try (FileInputStreamPlus fisp = new 
FileInputStreamPlus(metadataDumpPath))
+            {
+                // skip over the prefix specifying the metadata version
+                fisp.readUnsignedVInt32();
+                partitioner = ClusterMetadata.Serializer.getPartitioner(fisp, 
serializationVersion);
+            }
+            DatabaseDescriptor.toolInitialization();
+            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
+            ClusterMetadataService.initializeForTools(false);
+
+            return 
ClusterMetadataService.deserializeClusterMetadata(metadataDumpPath);
+        }
+
+        public void writeMetadata(Output output, ClusterMetadata metadata, 
String outputFilePath) throws IOException
+        {
+            Path p = outputFilePath != null ?
+                     Files.createFile(Path.of(outputFilePath)) :
+                     Files.createTempFile("clustermetadata", "dump");
+
+
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(p))
+            {
+                VerboseMetadataSerializer.serialize(ClusterMetadata.serializer,
+                                                    metadata,
+                                                    out,
+                                                    getSerializationVersion());
+                output.out.println("Updated cluster metadata written to file " 
+ p.toAbsolutePath());
+            }
+        }
+
+        Version getSerializationVersion()
+        {
+            return serializationVersion != null ? serializationVersion : 
NodeVersion.CURRENT.serializationVersion();
+        }
+    }
+
+    public static class ClusterMetadataToolHelp extends Help implements 
ClusterMetadataToolRunnable
+    {
+
+        @Override
+        public void run(Output output)
+        {
+            run();
+        }
+    }
+
+    @Command(name = "addtocms", description = "Makes a node as CMS member")
+    public static class AddToCMS extends ClusterMetadataToolCmd
+    {
+        @Option(name = { "-ip", "--ip-address" }, description = "IP address of 
the target endpoint. Port can be optionally specified using a colon after the 
IP address (e.g., 127.0.0.1:9042).", required = true)
+        private String ipAddress;
+
+        @Option(type = OptionType.COMMAND, name = { "-o", "--output-file" }, 
description = "Ouput file path for storing the updated Cluster Metadata")
+        private String outputFilePath;
+
+        @Override
+        public void run(Output output) throws IOException
+        {
+            ClusterMetadata metadata = parseClusterMetadata();
+            InetAddressAndPort nodeAddress = 
InetAddressAndPort.getByNameUnchecked(ipAddress);
+            metadata = makeCMS(metadata, nodeAddress);
+            writeMetadata(output, metadata, outputFilePath);
+        }
+
+        ClusterMetadata makeCMS(ClusterMetadata metadata, InetAddressAndPort 
endpoint)
+        {
+            ReplicationParams metaParams = ReplicationParams.meta(metadata);
+            DataPlacement.Builder builder = 
metadata.placements.get(metaParams).unbuild();
+
+            Replica newCMS = MetaStrategy.replica(endpoint);
+            builder.withReadReplica(metadata.epoch, newCMS)

Review Comment:
   It may be more useful (at least in the first instance) to have this work 
like `TransformClusterMetadataHelper` and replace the CMS membership, rather 
than adding to it. In a disaster recovery scenario, you will usually want to 
start a single node first and make this the sole CMS node by initialising it 
with the modified CM. Then bring the rest of the cluster up and finally 
reconfigure the CMS to an appropriate size. Trying to start that first node if 
the CMS has multiple members will fail as it will try to trigger a metadata 
snapshot, which will fail as the rest of the CMS nodes are not available.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org


Reply via email to