yifan-c commented on code in PR #58:
URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1306204877


##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Representation of a token range (exclusive start and inclusive end - 
(start, end]) and the
+ * corresponding mapping to replica-set hosts. Static factory ensures that 
ranges are always unwrapped.
+ * Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) 
<= 0 implies that
+ * range A occurs before range B, not their sizes.
+ */
+public class TokenRangeReplicas implements Comparable<TokenRangeReplicas>
+{
+    private final BigInteger start;
+    private final BigInteger end;
+
+    private final Partitioner partitioner;
+
+    private final Set<String> replicaSet;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicas.class);
+
+    private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner 
partitioner, Set<String> replicaSet)
+    {
+        this.start = start;
+        this.end = end;
+        this.partitioner = partitioner;
+        this.replicaSet = replicaSet;
+    }
+
+    public static List<TokenRangeReplicas> 
generateTokenRangeReplicas(BigInteger start,
+                                                                      
BigInteger end,
+                                                                      
Partitioner partitioner,
+                                                                      
Set<String> replicaSet)
+    {
+        if (start.compareTo(end) > 0)
+        {
+            return unwrapRange(start, end, partitioner, replicaSet);
+        }
+
+        return Collections.singletonList(new TokenRangeReplicas(start, end, 
partitioner, replicaSet));
+    }
+
+
+    public BigInteger start()
+    {
+        return start;
+    }
+
+    public BigInteger end()
+    {
+        return end;
+    }
+
+    public Set<String> replicaSet()
+    {
+        return replicaSet;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int compareTo(@NotNull TokenRangeReplicas other)
+    {
+        validateRangesForComparison(other);
+        int compareStart = this.start.compareTo(other.start);
+        return (compareStart != 0) ? compareStart : 
this.end.compareTo(other.end);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TokenRangeReplicas that = (TokenRangeReplicas) o;
+
+        return Objects.equals(start, that.start)
+               && Objects.equals(end, that.end)
+               && partitioner == that.partitioner;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(start, end, partitioner);
+    }
+
+    private void validateRangesForComparison(@NotNull TokenRangeReplicas other)
+    {
+        if (this.partitioner != other.partitioner)
+            throw new IllegalStateException("Token ranges being compared do 
not have the same partitioner");
+    }
+
+    protected boolean contains(TokenRangeReplicas other)

Review Comment:
   The `protected` methods are not for extension, since there is no subclass. 
Please not use `protected`.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());

Review Comment:
   nit: add some indentation to make it easier to read
   
   ```suggestion
           List<TokenRangeReplicas> replicas = 
Stream.concat(naturalReplicaMappings.entrySet().stream(), 
                                                             
pendingRangeMappings.entrySet().stream())
                                                     .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
                                                         new 
BigInteger(entry.getKey().get(0)),
                                                         new 
BigInteger(entry.getKey().get(1)),
                                                         partitioner,
                                                         new 
HashSet<>(entry.getValue())))
                                                     
.flatMap(Collection::stream)
                                                     
.collect(Collectors.toList());
   ```



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)

Review Comment:
   nit: readability
   ```suggestion
           return replicasByTokenRange.entrySet().stream()
                                      .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
                                          new BigInteger(entry.getKey().get(0)),
                                          new BigInteger(entry.getKey().get(1)),
                                          partitioner,
                                          new HashSet<>(entry.getValue())))
                                      .flatMap(Collection::stream)
   ```



##########
adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java:
##########
@@ -0,0 +1,847 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for TokenRangeReplicas
+ */
+public class TokenRangeReplicasTest
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicasTest.class);
+
+    private boolean hasOverlaps(List<TokenRangeReplicas> rangeList)
+    {
+        Collections.sort(rangeList);
+        for (int c = 0, i = 1; i < rangeList.size(); i++)
+        {
+            if (rangeList.get(c++).end().compareTo(rangeList.get(i).start()) > 
0) return true;
+        }
+        return false;
+    }
+
+    private boolean checkContains(List<TokenRangeReplicas> resultList, 
TokenRangeReplicas expected)
+    {
+        return resultList.stream()
+                         .map(TokenRangeReplicas::toString)
+                         .anyMatch(r -> r.equals(expected.toString()));
+    }
+
+    // non-overlapping ranges
+    @Test
+    public void simpleTest()
+    {
+        List<TokenRangeReplicas> simpleList = 
createSimpleTokenRangeReplicaList();
+        LOGGER.info("Input:" + simpleList);
+        List<TokenRangeReplicas> rangeList = 
TokenRangeReplicas.normalize(simpleList);
+        LOGGER.info("Result:" + rangeList);
+        assertThat(hasOverlaps(rangeList)).isFalse();
+    }
+
+    // TODO: Validate unwrapping separately

Review Comment:
   Not sure if it is addressed by `wrappedMultiOverlapTest` already. 



##########
common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java:
##########
@@ -126,8 +126,15 @@ public synchronized Session localCql()
         return localSession;
     }
 
-    public synchronized void close()
+    public Session close()
     {
+        Session localSession;
+        synchronized (this)
+        {
+            localSession = this.localSession;
+            this.localSession = null;
+        }
+

Review Comment:
   👍 



##########
src/test/integration/org/apache/cassandra/sidecar/routes/MultiDcTokenSupplier.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
+/**
+ * Static factory holder that provides a multi-DC token supplier
+ */
+public class MultiDcTokenSupplier
+{
+
+    static TokenSupplier evenlyDistributedTokens(int numNodes, int numDcs, int 
numTokens)

Review Comment:
   I guess you mean this
   ```suggestion
       static TokenSupplier evenlyDistributedTokens(int numNodesPerDc, int 
numDcs, int numTokensPerNode)
   ```
   
   Then, what does the result of `numTokensPerNode * numDcs` mean? 



##########
src/test/integration/org/apache/cassandra/sidecar/routes/MultiDcTokenSupplier.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
+/**
+ * Static factory holder that provides a multi-DC token supplier
+ */
+public class MultiDcTokenSupplier
+{
+
+    static TokenSupplier evenlyDistributedTokens(int numNodes, int numDcs, int 
numTokens)
+    {
+        long totalTokens = (long) numNodes * numDcs * numTokens;
+        BigInteger increment = BigInteger.valueOf((Long.MAX_VALUE / 
totalTokens) * 4);

Review Comment:
   Where is `4` coming from? 



##########
src/test/integration/org/apache/cassandra/sidecar/routes/BaseTokenRangeIntegrationTest.java:
##########
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.IntegrationTestBase;
+import org.apache.cassandra.sidecar.adapters.base.Partitioner;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.testing.AbstractCassandraTestContext;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.utils.Shared;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static 
org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MOVING_NODE_IDX;
+import static 
org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MULTIDC_MOVING_NODE_IDX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the token range replica mapping endpoint with cassandra container.
+ */
+public class BaseTokenRangeIntegrationTest extends IntegrationTestBase
+{
+
+    protected void validateTokenRanges(TokenRangeReplicasResponse 
mappingsResponse,
+                                       List<Range<BigInteger>> expectedRanges)
+    {
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicaSet = 
mappingsResponse.writeReplicas();
+        List<TokenRangeReplicasResponse.ReplicaInfo> readReplicaSet = 
mappingsResponse.readReplicas();
+        List<Range<BigInteger>> writeRanges = writeReplicaSet.stream()
+                                                             .map(r -> 
Range.openClosed(new BigInteger(r.start()),
+                                                                               
         new BigInteger(r.end())))
+                                                             
.collect(Collectors.toList());
+
+        List<Range<BigInteger>> readRanges = readReplicaSet.stream()
+                                                           .map(r -> 
Range.openClosed(new BigInteger(r.start()),
+                                                                               
       new BigInteger(r.end())))
+                                                           
.collect(Collectors.toList());
+
+
+        assertThat(writeRanges.size()).isEqualTo(writeReplicaSet.size());
+        assertThat(writeRanges).containsExactlyElementsOf(expectedRanges);
+
+        //Sorted and Overlap check
+        validateOrderAndOverlaps(writeRanges);
+        validateOrderAndOverlaps(readRanges);
+    }
+
+    private void validateOrderAndOverlaps(List<Range<BigInteger>> ranges)
+    {
+        for (int r = 0; r < ranges.size() - 1; r++)
+        {
+            assertThat(ranges.get(r).upperEndpoint()).isLessThan(ranges.get(r 
+ 1).upperEndpoint());
+            assertThat(ranges.get(r).intersection(ranges.get(r + 
1)).isEmpty()).isTrue();
+        }
+    }
+
+    protected void validateNodeStates(TokenRangeReplicasResponse 
mappingResponse,
+                                      Set<String> dcReplication,
+                                      Function<Integer, String> statusFunction)
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        int expectedReplicas = (annotation.nodesPerDc() + 
annotation.newNodesPerDc()) * dcReplication.size();
+
+        AbstractCassandraTestContext cassandraTestContext = 
sidecarTestContext.cassandraTestContext();
+        
assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas);
+        for (int i = 1; i <= cassandraTestContext.cluster().size(); i++)
+        {
+            IInstanceConfig config = 
cassandraTestContext.cluster().get(i).config();
+
+            if (dcReplication.contains(config.localDatacenter()))
+            {
+                String ipAndPort = 
config.broadcastAddress().getAddress().getHostAddress() + ":"
+                                   + config.broadcastAddress().getPort();
+
+                String expectedStatus = statusFunction.apply(i);
+                
assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus);
+            }
+        }
+    }
+
+    protected UpgradeableCluster getMultiDCCluster(int numNodes,
+                                                   int numDcs,
+                                                   BiConsumer<ClassLoader, 
Integer> initializer,
+                                                   
ConfigurableCassandraTestContext cassandraTestContext)
+    throws IOException
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        TokenSupplier mdcTokenSupplier =
+        MultiDcTokenSupplier.evenlyDistributedTokens(numNodes,
+                                                     numDcs,
+                                                     1);
+
+        int totalNodeCount = (annotation.nodesPerDc() + 
annotation.newNodesPerDc()) * annotation.numDcs();
+        return cassandraTestContext.configureAndStartCluster(
+        builder -> {
+            builder.withInstanceInitializer(initializer);
+            builder.withTokenSupplier(mdcTokenSupplier);
+            builder.withNodeIdTopology(networkTopology(totalNodeCount,
+                                                       (nodeId) -> nodeId % 2 
!= 0 ?
+                                                                   
dcAndRack("datacenter1", "rack1") :
+                                                                   
dcAndRack("datacenter2", "rack2")));
+        });
+    }
+
+    protected List<Range<BigInteger>> generateExpectedRanges()
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        int nodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) 
* annotation.numDcs();
+        return generateExpectedRanges(nodeCount);
+    }
+
+    protected List<Range<BigInteger>> generateExpectedRanges(int nodeCount)
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        TokenSupplier tokenSupplier = (annotation.numDcs() > 1) ?
+                                      
MultiDcTokenSupplier.evenlyDistributedTokens(
+                                      annotation.nodesPerDc() + 
annotation.newNodesPerDc(),
+                                      annotation.numDcs(),
+                                      1) :
+                                      
TokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc() +
+                                                                            
annotation.newNodesPerDc(),
+                                                                            1);
+
+        List<Range<BigInteger>> expectedRanges = new ArrayList<>();
+        BigInteger startToken = Partitioner.Murmur3.minToken;
+        BigInteger endToken = Partitioner.Murmur3.maxToken;
+        int node = 1;
+        BigInteger prevToken = new 
BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get());
+        Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken);
+        expectedRanges.add(firstRange);
+        while (node <= nodeCount)
+        {
+            BigInteger currentToken = new 
BigInteger(tokenSupplier.tokens(node).stream().findFirst().get());
+            expectedRanges.add(Range.openClosed(prevToken, currentToken));
+            prevToken = currentToken;
+            node++;
+        }
+        expectedRanges.add(Range.openClosed(prevToken, endToken));
+        return expectedRanges;
+    }
+
+    protected Set<String> 
instancesFromReplicaSet(List<TokenRangeReplicasResponse.ReplicaInfo> replicas)
+    {
+        return replicas.stream()
+                       .map(r -> r.replicasByDatacenter().values())
+                       .flatMap(Collection::stream)
+                       .flatMap(Collection::stream)
+                       .collect(Collectors.toSet());

Review Comment:
   nit: one less line if you do this 
   ```suggestion
           return replicas.stream()
                          .flatMap(r -> 
r.replicasByDatacenter().values().stream())
                          .flatMap(Collection::stream)
                          .collect(Collectors.toSet());
   ```



##########
src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.net.UnknownHostException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasRequest;
+import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler which provides token range to read and write replica mapping
+ *
+ * <p>This handler provides token range replicas along with the state of the 
replicas. For the purpose
+ * of identifying the state of a newly joining node to replace a dead node 
from a newly joining node,
+ * a new state 'Replacing' has been added.
+ * It is represented by
+ * {@code 
org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider.StateWithReplacement}
+ */
+@Singleton
+public class TokenRangeReplicaMapHandler extends 
AbstractHandler<TokenRangeReplicasRequest>
+{
+
+    @Inject
+    public TokenRangeReplicaMapHandler(InstanceMetadataFetcher metadataFetcher,
+                                       CassandraInputValidator validator,
+                                       ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, validator);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               TokenRangeReplicasRequest request)
+    {
+        CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+
+        StorageOperations storageOperations = delegate.storageOperations();
+        Metadata metadata = delegate.metadata();
+        if (storageOperations == null || metadata == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+
+        executorPools.service().executeBlocking(promise -> {
+            try
+            {
+                
context.json(storageOperations.tokenRangeReplicas(request.keyspace(), 
metadata.getPartitioner()));
+            }
+            catch (UnknownHostException e)
+            {
+                processFailure(e, context, host, remoteAddress, request);

Review Comment:
   right. The method in the interface is declared to throw 
`UnknownHostException`, but the implementation does not. It is wrapped in 
`RuntimeException`. Can you update the interface method to remove the throws? 
   The `RuntimeException` does not need to be handled separately, it reaches to 
`onFailure`, which calls `processFailure`, if throws. 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);

Review Comment:
   Not an AI, but maybe refactor the code in a follow-up, so that `jmxClient` 
is not available in the scope to prevent calling `jmxClient.proxy` directly.



##########
build.gradle:
##########
@@ -326,6 +326,8 @@ tasks.register("integrationTest", Test) {
     useJUnitPlatform() {
         includeTags "integrationTest"
     }
+// Uncomment below to run unit tests in parallel
+//    maxParallelForks = Runtime.runtime.availableProcessors() * 2

Review Comment:
   Just curious, why it is commented out?



##########
common/build.gradle:
##########
@@ -41,6 +41,8 @@ repositories {
 
 test {
     useJUnitPlatform()
+// Uncomment below to run unit tests in parallel
+//    maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1

Review Comment:
   Another curiosity, why the `maxParallelForks` calculations are different in 
different build files?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)
+                                   .sorted()
+                                   .map(rep -> {
+                                       Map<String, List<String>> replicasByDc =
+                                       replicasByDataCenter(hostToDatacenter, 
rep.replicaSet());
+                                       return new 
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+                                                                               
          rep.end().toString(),
+                                                                               
          replicasByDc);
+                                   })
+                                   .collect(Collectors.toList());
+    }
+
+    private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)
+    {
+        EndpointSnitchJmxOperations endpointSnitchInfo = 
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+                                                                         
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(),
+                                                   (String host) -> 
getDatacenter(endpointSnitchInfo, host)));
+    }
+
+    private String getDatacenter(EndpointSnitchJmxOperations 
endpointSnitchInfo, String host)
+    {
+        try
+        {
+            return endpointSnitchInfo.getDatacenter(host);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NotNull
+    private static Map<String, List<String>> replicasByDataCenter(Map<String, 
String> hostToDatacenter,
+                                                                  
Collection<String> replicas)
+    {
+        return 
replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get));
+    }
+
+    /**
+     * We want to identity a joining node, to replace a dead node, differently 
from a newly joining node. To
+     * do this we analyze gossip info and set 'Replacing' state for node 
replacing a dead node.
+     * {@link StateWithReplacement} is used to set replacing state for a node.
+     *
+     * <p>We are adding this state for token range replica provider endpoint. 
To send out replicas for a
+     * range along with state of replicas including replacing state.
+     */
+    static class StateWithReplacement extends RingProvider.State
+    {
+        private static final String STATE_REPLACING = "Replacing";
+        private final Set<String> joiningNodes;
+        private final GossipInfoResponse gossipInfo;
+
+        StateWithReplacement(List<String> joiningNodes, List<String> 
leavingNodes, List<String> movingNodes,
+                             GossipInfoResponse gossipInfo)
+        {
+            super(joiningNodes, leavingNodes, movingNodes);
+            this.joiningNodes = new HashSet<>(joiningNodes);
+            this.gossipInfo = gossipInfo;
+        }
+
+        /**
+         * This method returns state of a node and accounts for a new 
'Replacing' state if the node is
+         * replacing a dead node. For returning this state, the method checks 
status of the node in gossip
+         * information.
+         *
+         * @param endpoint node information represented usually in form of 
'ip:port'
+         * @return Node status
+         */
+        @Override
+        String of(String endpoint)
+        {
+            if (joiningNodes.contains(endpoint))
+            {
+                GossipInfoResponse.GossipInfo gossipInfoEntry = 
gossipInfo.get(endpoint);
+
+                if (gossipInfoEntry != null)
+                {
+                    LOGGER.info("Found gossipInfoEntry={}", gossipInfoEntry);

Review Comment:
   nit: should we only print the endpoint and status? Those 2 values should be 
enough to reason about the runtime here.



##########
common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java:
##########
@@ -259,12 +259,16 @@ private Map<String, Object> buildJmxEnv()
     }
 
     @Override
-    public synchronized void close() throws IOException
+    public void close() throws IOException
     {
-        JMXConnector connector = jmxConnector;
-        if (connector != null)
+        JMXConnector connector;
+        synchronized (this)
         {
+            connector = jmxConnector;
             jmxConnector = null;

Review Comment:
   Reset `connected` to false too?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)
+                                   .sorted()
+                                   .map(rep -> {
+                                       Map<String, List<String>> replicasByDc =
+                                       replicasByDataCenter(hostToDatacenter, 
rep.replicaSet());
+                                       return new 
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+                                                                               
          rep.end().toString(),
+                                                                               
          replicasByDc);
+                                   })
+                                   .collect(Collectors.toList());
+    }
+
+    private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)
+    {
+        EndpointSnitchJmxOperations endpointSnitchInfo = 
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+                                                                         
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(),
+                                                   (String host) -> 
getDatacenter(endpointSnitchInfo, host)));
+    }
+
+    private String getDatacenter(EndpointSnitchJmxOperations 
endpointSnitchInfo, String host)
+    {
+        try
+        {
+            return endpointSnitchInfo.getDatacenter(host);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NotNull
+    private static Map<String, List<String>> replicasByDataCenter(Map<String, 
String> hostToDatacenter,
+                                                                  
Collection<String> replicas)
+    {
+        return 
replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get));
+    }
+
+    /**
+     * We want to identity a joining node, to replace a dead node, differently 
from a newly joining node. To
+     * do this we analyze gossip info and set 'Replacing' state for node 
replacing a dead node.
+     * {@link StateWithReplacement} is used to set replacing state for a node.
+     *
+     * <p>We are adding this state for token range replica provider endpoint. 
To send out replicas for a
+     * range along with state of replicas including replacing state.
+     */
+    static class StateWithReplacement extends RingProvider.State
+    {
+        private static final String STATE_REPLACING = "Replacing";

Review Comment:
   How about create an enum for the state? It helps in 2 ways, strong typed and 
comprehensive matching. 



##########
common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java:
##########
@@ -259,12 +259,16 @@ private Map<String, Object> buildJmxEnv()
     }
 
     @Override
-    public synchronized void close() throws IOException
+    public void close() throws IOException
     {
-        JMXConnector connector = jmxConnector;
-        if (connector != null)
+        JMXConnector connector;
+        synchronized (this)

Review Comment:
   Can you make `connect()` (where `jmxConnector` is update) synchronized? It 
is synchronized in its only callsite. It won't hurt performance when 
synchronizing `connnect()`, but more error-proof.



##########
common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java:
##########
@@ -126,8 +126,15 @@ public synchronized Session localCql()
         return localSession;
     }
 
-    public synchronized void close()
+    public Session close()
     {
+        Session localSession;
+        synchronized (this)
+        {
+            localSession = this.localSession;
+            this.localSession = null;
+        }
+

Review Comment:
   👍 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)
+                                   .sorted()
+                                   .map(rep -> {
+                                       Map<String, List<String>> replicasByDc =
+                                       replicasByDataCenter(hostToDatacenter, 
rep.replicaSet());
+                                       return new 
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+                                                                               
          rep.end().toString(),
+                                                                               
          replicasByDc);
+                                   })
+                                   .collect(Collectors.toList());
+    }
+
+    private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)
+    {
+        EndpointSnitchJmxOperations endpointSnitchInfo = 
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+                                                                         
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(),
+                                                   (String host) -> 
getDatacenter(endpointSnitchInfo, host)));
+    }
+
+    private String getDatacenter(EndpointSnitchJmxOperations 
endpointSnitchInfo, String host)
+    {
+        try
+        {
+            return endpointSnitchInfo.getDatacenter(host);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NotNull
+    private static Map<String, List<String>> replicasByDataCenter(Map<String, 
String> hostToDatacenter,
+                                                                  
Collection<String> replicas)
+    {
+        return 
replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get));
+    }
+
+    /**
+     * We want to identity a joining node, to replace a dead node, differently 
from a newly joining node. To
+     * do this we analyze gossip info and set 'Replacing' state for node 
replacing a dead node.
+     * {@link StateWithReplacement} is used to set replacing state for a node.
+     *
+     * <p>We are adding this state for token range replica provider endpoint. 
To send out replicas for a
+     * range along with state of replicas including replacing state.
+     */
+    static class StateWithReplacement extends RingProvider.State
+    {
+        private static final String STATE_REPLACING = "Replacing";
+        private final Set<String> joiningNodes;

Review Comment:
   Why does it declare another copy of `joiningNodes`? It is available from the 
base class (but need to make it protected or add getter).



##########
src/test/integration/org/apache/cassandra/sidecar/routes/BaseTokenRangeIntegrationTest.java:
##########
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.IntegrationTestBase;
+import org.apache.cassandra.sidecar.adapters.base.Partitioner;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.testing.AbstractCassandraTestContext;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.utils.Shared;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static 
org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MOVING_NODE_IDX;
+import static 
org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MULTIDC_MOVING_NODE_IDX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the token range replica mapping endpoint with cassandra container.
+ */
+public class BaseTokenRangeIntegrationTest extends IntegrationTestBase
+{
+
+    protected void validateTokenRanges(TokenRangeReplicasResponse 
mappingsResponse,
+                                       List<Range<BigInteger>> expectedRanges)
+    {
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicaSet = 
mappingsResponse.writeReplicas();
+        List<TokenRangeReplicasResponse.ReplicaInfo> readReplicaSet = 
mappingsResponse.readReplicas();
+        List<Range<BigInteger>> writeRanges = writeReplicaSet.stream()
+                                                             .map(r -> 
Range.openClosed(new BigInteger(r.start()),
+                                                                               
         new BigInteger(r.end())))
+                                                             
.collect(Collectors.toList());
+
+        List<Range<BigInteger>> readRanges = readReplicaSet.stream()
+                                                           .map(r -> 
Range.openClosed(new BigInteger(r.start()),
+                                                                               
       new BigInteger(r.end())))
+                                                           
.collect(Collectors.toList());
+
+
+        assertThat(writeRanges.size()).isEqualTo(writeReplicaSet.size());
+        assertThat(writeRanges).containsExactlyElementsOf(expectedRanges);
+
+        //Sorted and Overlap check
+        validateOrderAndOverlaps(writeRanges);
+        validateOrderAndOverlaps(readRanges);
+    }
+
+    private void validateOrderAndOverlaps(List<Range<BigInteger>> ranges)
+    {
+        for (int r = 0; r < ranges.size() - 1; r++)
+        {
+            assertThat(ranges.get(r).upperEndpoint()).isLessThan(ranges.get(r 
+ 1).upperEndpoint());
+            assertThat(ranges.get(r).intersection(ranges.get(r + 
1)).isEmpty()).isTrue();
+        }
+    }
+
+    protected void validateNodeStates(TokenRangeReplicasResponse 
mappingResponse,
+                                      Set<String> dcReplication,
+                                      Function<Integer, String> statusFunction)
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        int expectedReplicas = (annotation.nodesPerDc() + 
annotation.newNodesPerDc()) * dcReplication.size();
+
+        AbstractCassandraTestContext cassandraTestContext = 
sidecarTestContext.cassandraTestContext();
+        
assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas);
+        for (int i = 1; i <= cassandraTestContext.cluster().size(); i++)
+        {
+            IInstanceConfig config = 
cassandraTestContext.cluster().get(i).config();
+
+            if (dcReplication.contains(config.localDatacenter()))
+            {
+                String ipAndPort = 
config.broadcastAddress().getAddress().getHostAddress() + ":"
+                                   + config.broadcastAddress().getPort();
+
+                String expectedStatus = statusFunction.apply(i);
+                
assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus);
+            }
+        }
+    }
+
+    protected UpgradeableCluster getMultiDCCluster(int numNodes,
+                                                   int numDcs,
+                                                   BiConsumer<ClassLoader, 
Integer> initializer,
+                                                   
ConfigurableCassandraTestContext cassandraTestContext)
+    throws IOException
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        TokenSupplier mdcTokenSupplier =
+        MultiDcTokenSupplier.evenlyDistributedTokens(numNodes,
+                                                     numDcs,
+                                                     1);
+
+        int totalNodeCount = (annotation.nodesPerDc() + 
annotation.newNodesPerDc()) * annotation.numDcs();
+        return cassandraTestContext.configureAndStartCluster(
+        builder -> {
+            builder.withInstanceInitializer(initializer);
+            builder.withTokenSupplier(mdcTokenSupplier);
+            builder.withNodeIdTopology(networkTopology(totalNodeCount,
+                                                       (nodeId) -> nodeId % 2 
!= 0 ?
+                                                                   
dcAndRack("datacenter1", "rack1") :
+                                                                   
dcAndRack("datacenter2", "rack2")));
+        });
+    }
+
+    protected List<Range<BigInteger>> generateExpectedRanges()
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        int nodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) 
* annotation.numDcs();
+        return generateExpectedRanges(nodeCount);
+    }
+
+    protected List<Range<BigInteger>> generateExpectedRanges(int nodeCount)
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        TokenSupplier tokenSupplier = (annotation.numDcs() > 1) ?
+                                      
MultiDcTokenSupplier.evenlyDistributedTokens(
+                                      annotation.nodesPerDc() + 
annotation.newNodesPerDc(),
+                                      annotation.numDcs(),
+                                      1) :
+                                      
TokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc() +
+                                                                            
annotation.newNodesPerDc(),
+                                                                            1);
+
+        List<Range<BigInteger>> expectedRanges = new ArrayList<>();
+        BigInteger startToken = Partitioner.Murmur3.minToken;
+        BigInteger endToken = Partitioner.Murmur3.maxToken;
+        int node = 1;
+        BigInteger prevToken = new 
BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get());
+        Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken);
+        expectedRanges.add(firstRange);
+        while (node <= nodeCount)
+        {
+            BigInteger currentToken = new 
BigInteger(tokenSupplier.tokens(node).stream().findFirst().get());
+            expectedRanges.add(Range.openClosed(prevToken, currentToken));
+            prevToken = currentToken;
+            node++;
+        }
+        expectedRanges.add(Range.openClosed(prevToken, endToken));
+        return expectedRanges;
+    }
+
+    protected Set<String> 
instancesFromReplicaSet(List<TokenRangeReplicasResponse.ReplicaInfo> replicas)
+    {
+        return replicas.stream()
+                       .map(r -> r.replicasByDatacenter().values())
+                       .flatMap(Collection::stream)
+                       .flatMap(Collection::stream)
+                       .collect(Collectors.toSet());
+    }
+
+    protected void 
validateWriteReplicaMappings(List<TokenRangeReplicasResponse.ReplicaInfo> 
writeReplicas,
+                                              Map<String, 
Map<Range<BigInteger>, List<String>>> expectedRangeMapping)
+    {
+        CassandraIntegrationTest annotation = 
sidecarTestContext.cassandraTestContext().annotation;
+        
assertThat(writeReplicas).hasSize(expectedRangeMapping.get("datacenter1").size());
+        for (TokenRangeReplicasResponse.ReplicaInfo r: writeReplicas)
+        {
+            Range<BigInteger> range = 
Range.openClosed(BigInteger.valueOf(Long.parseLong(r.start())),
+                                                       
BigInteger.valueOf(Long.parseLong(r.end())));
+            assertThat(expectedRangeMapping).containsKey("datacenter1");
+            
assertThat(expectedRangeMapping.get("datacenter1")).containsKey(range);
+            // Replicaset for the same range match expected
+            assertThat(r.replicasByDatacenter().get("datacenter1"))
+            
.containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter1").get(range));
+
+            if (annotation.numDcs() > 1)
+            {
+                assertThat(expectedRangeMapping).containsKey("datacenter2");
+                
assertThat(expectedRangeMapping.get("datacenter2")).containsKey(range);
+                assertThat(r.replicasByDatacenter().get("datacenter2"))
+                
.containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter2").get(range));
+            }
+        }
+    }
+
+    void retrieveMappingWithKeyspace(VertxTestContext context, String keyspace,
+                                     Handler<HttpResponse<Buffer>> verifier) 
throws Exception
+    {
+        String testRoute = "/api/v1/keyspaces/" + keyspace + 
"/token-range-replicas";
+        testWithClient(context, client -> {
+            client.get(server.actualPort(), "127.0.0.1", testRoute)
+                  .send(context.succeeding(verifier));
+        });
+    }
+
+    void assertMappingResponseOK(TokenRangeReplicasResponse mappingResponse)
+    {
+        assertMappingResponseOK(mappingResponse, 1, 
Collections.singleton("datacenter1"));
+    }
+
+    void assertMappingResponseOK(TokenRangeReplicasResponse mappingResponse,
+                                 int replicationFactor,
+                                 Set<String> dcReplication)
+    {
+        assertThat(mappingResponse).isNotNull();
+        assertThat(mappingResponse.readReplicas()).isNotNull();
+        assertThat(mappingResponse.writeReplicas()).isNotNull();
+        TokenRangeReplicasResponse.ReplicaInfo readReplica = 
mappingResponse.readReplicas().get(0);
+        
assertThat(readReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size());
+        TokenRangeReplicasResponse.ReplicaInfo writeReplica = 
mappingResponse.writeReplicas().get(0);
+        
assertThat(writeReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size());
+
+        for (String dcName : dcReplication)
+        {
+            
assertThat(readReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName);
+            
assertThat(readReplica.replicasByDatacenter().get(dcName)).isNotNull().hasSize(replicationFactor);
+
+            
assertThat(writeReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName);
+            
assertThat(writeReplica.replicasByDatacenter().get(dcName)).isNotNull();
+            assertThat(writeReplica.replicasByDatacenter().get(dcName).size())
+            .isGreaterThanOrEqualTo(replicationFactor);
+        }
+    }
+
+    /**
+     * ByteBuddy helper for a single joining node
+     */
+    @Shared
+    public static class BBHelperSingleJoiningNode
+    {
+        public static final CountDownLatch TRANSIENT_STATE_START = new 
CountDownLatch(1);
+        public static final CountDownLatch TRANSIENT_STATE_END = new 
CountDownLatch(1);
+
+        public static void install(ClassLoader cl, Integer nodeNumber)
+        {
+            // Test case involves 3 node cluster with 1 joining node
+            // We intercept the bootstrap of the leaving node (4) to validate 
token ranges
+            if (nodeNumber == 6)
+            {
+                TypePool typePool = TypePool.Default.of(cl);
+                TypeDescription description = 
typePool.describe("org.apache.cassandra.service.StorageService")
+                                                      .resolve();
+                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                               
.method(named("bootstrap").and(takesArguments(2)))
+                               
.intercept(MethodDelegation.to(BBHelperSingleJoiningNode.class))
+                               // Defer class loading until all dependencies 
are loaded
+                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static boolean bootstrap(Collection<?> tokens,
+                                        long bootstrapTimeoutMillis,
+                                        @SuperCall Callable<Boolean> orig) 
throws Exception
+        {
+            boolean result = orig.call();
+            // trigger bootstrap start and wait until bootstrap is ready from 
test
+            TRANSIENT_STATE_START.countDown();
+            Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END);
+            return result;
+        }
+    }
+
+    /**
+     * ByteBuddy helper for multiple joining nodes
+     */
+    @Shared
+    public static class BBHelperMultipleJoiningNodes
+    {
+        public static final CountDownLatch TRANSIENT_STATE_START = new 
CountDownLatch(2);
+        public static final CountDownLatch TRANSIENT_STATE_END = new 
CountDownLatch(2);
+
+        public static void install(ClassLoader cl, Integer nodeNumber)
+        {
+            // Test case involves 3 node cluster with a 2 joining nodes
+            // We intercept the joining of nodes (4, 5) to validate token 
ranges
+            if (nodeNumber > 3)
+            {
+                TypePool typePool = TypePool.Default.of(cl);
+                TypeDescription description = 
typePool.describe("org.apache.cassandra.service.StorageService")
+                                                      .resolve();
+                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                               
.method(named("bootstrap").and(takesArguments(2)))
+                               
.intercept(MethodDelegation.to(BBHelperMultipleJoiningNodes.class))
+                               // Defer class loading until all dependencies 
are loaded
+                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static boolean bootstrap(Collection<?> tokens,
+                                        long bootstrapTimeoutMillis,
+                                        @SuperCall Callable<Boolean> orig) 
throws Exception
+        {
+            boolean result = orig.call();
+            // trigger bootstrap start and wait until bootstrap is ready from 
test
+            TRANSIENT_STATE_START.countDown();
+            Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END);
+            return result;
+        }
+    }
+
+    /**
+     * ByteBuddy helper for doubling cluster size
+     */
+    @Shared
+    public static class BBHelperDoubleClusterSize
+    {
+        public static final CountDownLatch TRANSIENT_STATE_START = new 
CountDownLatch(5);
+        public static final CountDownLatch TRANSIENT_STATE_END = new 
CountDownLatch(5);
+
+        public static void install(ClassLoader cl, Integer nodeNumber)
+        {
+            // Test case involves 5 node cluster doubling in size
+            // We intercept the bootstrap of the new nodes (6-10) to validate 
token ranges
+            if (nodeNumber > 5)
+            {
+                TypePool typePool = TypePool.Default.of(cl);
+                TypeDescription description = 
typePool.describe("org.apache.cassandra.service.StorageService")
+                                                      .resolve();
+                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                               
.method(named("bootstrap").and(takesArguments(2)))
+                               
.intercept(MethodDelegation.to(BBHelperDoubleClusterSize.class))
+                               // Defer class loading until all dependencies 
are loaded
+                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static boolean bootstrap(Collection<?> tokens,
+                                        long bootstrapTimeoutMillis,
+                                        @SuperCall Callable<Boolean> orig) 
throws Exception
+        {
+            boolean result = orig.call();
+            // trigger bootstrap start and wait until bootstrap is ready from 
test
+            TRANSIENT_STATE_START.countDown();
+            Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END);
+            return result;
+        }
+    }
+
+    /**
+     * ByteBuddy Helper for a single leaving node
+     */
+    @Shared
+    public static class BBHelperSingleLeavingNode
+    {
+        public static final CountDownLatch TRANSIENT_STATE_START = new 
CountDownLatch(1);
+        public static final CountDownLatch TRANSIENT_STATE_END = new 
CountDownLatch(1);
+
+        public static void install(ClassLoader cl, Integer nodeNumber)
+        {
+            // Test case involves 5 node cluster with 1 leaving node
+            // We intercept the shutdown of the leaving node (5) to validate 
token ranges
+            if (nodeNumber == 5)
+            {
+                TypePool typePool = TypePool.Default.of(cl);
+                TypeDescription description = 
typePool.describe("org.apache.cassandra.service.StorageService")
+                                                      .resolve();
+                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                               .method(named("unbootstrap"))
+                               
.intercept(MethodDelegation.to(BBHelperSingleLeavingNode.class))
+                               // Defer class loading until all dependencies 
are loaded
+                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static void unbootstrap(@SuperCall Callable<?> orig) throws 
Exception
+        {
+            TRANSIENT_STATE_START.countDown();
+            Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END);
+            orig.call();
+        }
+    }
+
+    /**
+     * ByteBuddy helper for multiple leaving nodes
+     */
+    @Shared
+    public static class BBHelperMultipleLeavingNodes

Review Comment:
   Each BBHelper class is used in a single test class, and their 
implementations (install) highly depends on the test class, e.g. nodeNumber 
assumption. Therefore, it makes it hard to share the implementation with other 
test classes. How about move it to the test class where it is actually used? 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+
+/**
+ * Represents types of Partitioners supported and the corresponding starting 
token values
+ */
+public enum Partitioner
+{
+    Murmur3(BigInteger.valueOf(Long.MIN_VALUE), 
BigInteger.valueOf(Long.MAX_VALUE)),
+    Random(BigInteger.ONE.negate(), 
BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE));

Review Comment:
   Second to this. The value range of random partition token should be the same 
as in Cassandra.
   
   Besides that, can you add a brief description to clarify the range's ends. 
The minimum token is not inclusive, and the maximum token is. In math notion, 
it is `(MIN, MAX]`



##########
src/test/resources/logback-test.xml:
##########
@@ -20,7 +20,7 @@
 
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
     <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-      <level>DEBUG</level>
+      <level>INFO</level>

Review Comment:
   I think `DEBUG` is more suitable for tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to