JeetKunDoug commented on code in PR #58:
URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1306067825


##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)

Review Comment:
   Same (make things protected) goes for any members that need to be accessible 
by the derived class (or provide a protected getter if you feel that's 
necessary, but I think it's mostly the `private final JmxClient jmxClient`, 
which would be OK to be protected).



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)
+                                   .sorted()
+                                   .map(rep -> {
+                                       Map<String, List<String>> replicasByDc =
+                                       replicasByDataCenter(hostToDatacenter, 
rep.replicaSet());
+                                       return new 
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+                                                                               
          rep.end().toString(),
+                                                                               
          replicasByDc);
+                                   })
+                                   .collect(Collectors.toList());
+    }
+
+    private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)

Review Comment:
   This method is misnamed - it doesn't group anything, it just enriches the 
host with a datacenter - maybe just `buildHostToDatacenterMapping`?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());
+
+        // Candidate write-replica mappings (merged from natural and pending 
ranges) are normalized
+        // by consolidating overlapping ranges
+        return TokenRangeReplicas.normalize(replicas).stream()
+                                 .map(range -> {
+                                     Map<String, List<String>> replicasByDc =
+                                     replicasByDataCenter(hostToDatacenter, 
range.replicaSet());
+                                     return new 
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+                                                                               
        range.end().toString(),
+                                                                               
        replicasByDc);
+                                 })
+                                 .collect(Collectors.toList());
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> 
replicasByTokenRange,
+                                  Map<String, String> hostToDatacenter,
+                                  Partitioner partitioner)
+    {
+        return replicasByTokenRange.entrySet().stream()
+                                   .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                   new BigInteger(entry.getKey().get(0)),
+                                   new BigInteger(entry.getKey().get(1)),
+                                   partitioner,
+                                   new HashSet<>(entry.getValue())))
+                                   .flatMap(Collection::stream)
+                                   .sorted()
+                                   .map(rep -> {
+                                       Map<String, List<String>> replicasByDc =
+                                       replicasByDataCenter(hostToDatacenter, 
rep.replicaSet());
+                                       return new 
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+                                                                               
          rep.end().toString(),
+                                                                               
          replicasByDc);
+                                   })
+                                   .collect(Collectors.toList());
+    }
+
+    private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)
+    {
+        EndpointSnitchJmxOperations endpointSnitchInfo = 
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+                                                                         
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(),
+                                                   (String host) -> 
getDatacenter(endpointSnitchInfo, host)));
+    }
+
+    private String getDatacenter(EndpointSnitchJmxOperations 
endpointSnitchInfo, String host)
+    {
+        try
+        {
+            return endpointSnitchInfo.getDatacenter(host);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NotNull
+    private static Map<String, List<String>> replicasByDataCenter(Map<String, 
String> hostToDatacenter,
+                                                                  
Collection<String> replicas)
+    {
+        return 
replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get));
+    }
+
+    /**
+     * We want to identity a joining node, to replace a dead node, differently 
from a newly joining node. To
+     * do this we analyze gossip info and set 'Replacing' state for node 
replacing a dead node.
+     * {@link StateWithReplacement} is used to set replacing state for a node.
+     *
+     * <p>We are adding this state for token range replica provider endpoint. 
To send out replicas for a
+     * range along with state of replicas including replacing state.
+     */
+    static class StateWithReplacement extends RingProvider.State
+    {
+        private static final String STATE_REPLACING = "Replacing";
+        private final Set<String> joiningNodes;
+        private final GossipInfoResponse gossipInfo;
+
+        StateWithReplacement(List<String> joiningNodes, List<String> 
leavingNodes, List<String> movingNodes,
+                             GossipInfoResponse gossipInfo)
+        {
+            super(joiningNodes, leavingNodes, movingNodes);
+            this.joiningNodes = new HashSet<>(joiningNodes);
+            this.gossipInfo = gossipInfo;
+        }
+
+        /**
+         * This method returns state of a node and accounts for a new 
'Replacing' state if the node is
+         * replacing a dead node. For returning this state, the method checks 
status of the node in gossip
+         * information.
+         *
+         * @param endpoint node information represented usually in form of 
'ip:port'
+         * @return Node status
+         */
+        @Override
+        String of(String endpoint)
+        {
+            if (joiningNodes.contains(endpoint))
+            {
+                GossipInfoResponse.GossipInfo gossipInfoEntry = 
gossipInfo.get(endpoint);
+
+                if (gossipInfoEntry != null)
+                {
+                    LOGGER.info("Found gossipInfoEntry={}", gossipInfoEntry);

Review Comment:
   NIT: Debug?



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);
+        // Pending ranges include bootstrap tokens and leaving endpoints as 
represented in the Cassandra TokenMetadata
+        Map<List<String>, List<String>> pendingRangeMappings = 
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+        Set<String> replicaSet = 
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+                                               
pendingRangeMappings.values().stream().flatMap(List::stream))
+                                       .collect(Collectors.toSet());
+
+        Map<String, String> hostToDatacenter = 
groupHostsByDatacenter(replicaSet);
+
+        // Retrieve map of all token ranges (pending & primary) to endpoints
+        List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+        writeReplicasFromPendingRanges(rangeToEndpointMappings,
+                                       pendingRangeMappings,
+                                       hostToDatacenter,
+                                       partitioner,
+                                       keyspace);
+
+        Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, 
storage);
+
+        return new TokenRangeReplicasResponse(
+        replicaToStateMap,
+        writeReplicas,
+        mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, 
hostToDatacenter, partitioner));
+    }
+
+    private Map<String, String> replicaToStateMap(Set<String> replicaSet, 
StorageJmxOperations storage)
+    {
+        List<String> joiningNodes = storage.getJoiningNodesWithPort();
+        List<String> leavingNodes = storage.getLeavingNodesWithPort();
+        List<String> movingNodes = storage.getMovingNodesWithPort();
+
+        String rawGossipInfo = getRawGossipInfo();
+        GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+        StateWithReplacement state = new StateWithReplacement(joiningNodes, 
leavingNodes, movingNodes, gossipInfo);
+
+        return replicaSet.stream()
+                         .collect(Collectors.toMap(Function.identity(), 
state::of));
+    }
+
+    private String getRawGossipInfo()
+    {
+        return jmxClient.proxy(ClusterMembershipJmxOperations.class, 
FAILURE_DETECTOR_OBJ_NAME)
+                        .getAllEndpointStatesWithPort();
+    }
+
+    private List<TokenRangeReplicasResponse.ReplicaInfo>
+    writeReplicasFromPendingRanges(Map<List<String>, List<String>> 
naturalReplicaMappings,
+                                   Map<List<String>, List<String>> 
pendingRangeMappings,
+                                   Map<String, String> hostToDatacenter,
+                                   Partitioner partitioner,
+                                   String keyspace)
+    {
+        LOGGER.debug("Pending token ranges for keyspace={}, 
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+        // Merge natural and pending range replicas to generate candidates for 
write-replicas
+        List<TokenRangeReplicas> replicas = Stream.concat(
+                                                  
naturalReplicaMappings.entrySet().stream(),
+                                                  
pendingRangeMappings.entrySet().stream())
+                                                  .map(entry -> 
TokenRangeReplicas.generateTokenRangeReplicas(
+                                                  new 
BigInteger(entry.getKey().get(0)),
+                                                  new 
BigInteger(entry.getKey().get(1)),
+                                                  partitioner,
+                                                  new 
HashSet<>(entry.getValue())))
+                                                  .flatMap(Collection::stream)
+                                                  
.collect(Collectors.toList());

Review Comment:
   The indentation/formatting here is horrible to follow - I realize this is 
the default the project uses, but is there a way we can improve this, as it's 
really quite difficult to understand what's at what level of the 
concat/map/etc. here...
   Alternately, break this down into smaller chunks and assign each to an 
intermediate variable with a good name.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+    private final JmxClient jmxClient;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+    public TokenRangeReplicaProvider(JmxClient jmxClient)
+    {
+        this.jmxClient = jmxClient;
+    }
+
+    public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, 
Partitioner partitioner)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+        StorageJmxOperations storage = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+        // Retrieve map of primary token ranges to endpoints that describe the 
ring topology
+        Map<List<String>, List<String>> rangeToEndpointMappings = 
storage.getRangeToEndpointWithPortMap(keyspace);

Review Comment:
   General comment about this line of code + most of the rest of this class - 
It's _really hard_ to figure out what's what in this class with everything 
being maps of native types - I realize that we can't change the return types of 
the JMX endpoints, but can we please try to transform things as soon as 
possible after we get them to something more meaningful types (Datacenter, 
Range, ReplicaInfo, Token, some of which I realize we don't have yet)? It would 
make all of this logic significantly easier to understand, and prevent bugs in 
the future.



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Representation of a token range (exclusive start and inclusive end - 
(start, end]) and the
+ * corresponding mapping to replica-set hosts. Static factory ensures that 
ranges are always unwrapped.
+ * Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) 
<= 0 implies that
+ * range A occurs before range B, not their sizes.
+ */
+public class TokenRangeReplicas implements Comparable<TokenRangeReplicas>
+{
+    private final BigInteger start;
+    private final BigInteger end;
+
+    private final Partitioner partitioner;
+
+    private final Set<String> replicaSet;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicas.class);
+
+    private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner 
partitioner, Set<String> replicaSet)
+    {
+        this.start = start;
+        this.end = end;
+        this.partitioner = partitioner;
+        this.replicaSet = replicaSet;
+    }
+
+    public static List<TokenRangeReplicas> 
generateTokenRangeReplicas(BigInteger start,
+                                                                      
BigInteger end,
+                                                                      
Partitioner partitioner,
+                                                                      
Set<String> replicaSet)
+    {
+        if (start.compareTo(end) > 0)
+        {
+            return unwrapRange(start, end, partitioner, replicaSet);
+        }
+
+        return Collections.singletonList(new TokenRangeReplicas(start, end, 
partitioner, replicaSet));
+    }
+
+
+    public BigInteger start()
+    {
+        return start;
+    }
+
+    public BigInteger end()
+    {
+        return end;
+    }
+
+    public Set<String> replicaSet()
+    {
+        return replicaSet;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int compareTo(@NotNull TokenRangeReplicas other)
+    {
+        validateRangesForComparison(other);
+        int compareStart = this.start.compareTo(other.start);
+        return (compareStart != 0) ? compareStart : 
this.end.compareTo(other.end);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TokenRangeReplicas that = (TokenRangeReplicas) o;
+
+        return Objects.equals(start, that.start)
+               && Objects.equals(end, that.end)
+               && partitioner == that.partitioner;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(start, end, partitioner);
+    }
+
+    private void validateRangesForComparison(@NotNull TokenRangeReplicas other)
+    {
+        if (this.partitioner != other.partitioner)
+            throw new IllegalStateException("Token ranges being compared do 
not have the same partitioner");
+    }
+
+    protected boolean contains(TokenRangeReplicas other)
+    {
+        validateRangesForComparison(other);
+        return (other.start.compareTo(this.start) >= 0 && 
other.end.compareTo(this.end) <= 0);
+    }
+
+    /**
+     * For subset ranges, this is used to determine if a range is larger than 
the other by comparing start-end lengths
+     * If both ranges end at the min, we compare starting points to determine 
the result.
+     * When the left range is the only one ending at min, it is always the 
larger one since all subsequent ranges
+     * in the sorted range list have to be smaller.
+     * <p>
+     * This method assumes that the ranges are normalized and unwrapped, i.e.
+     * 'this' comes before 'other' AND there's no wrapping around the min token

Review Comment:
   This assumption should be checked with a precondition, or we should write 
the method to not care about if `this` comes before `other`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to