JeetKunDoug commented on code in PR #58: URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1306067825
########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) Review Comment: Same (make things protected) goes for any members that need to be accessible by the derived class (or provide a protected getter if you feel that's necessary, but I think it's mostly the `private final JmxClient jmxClient`, which would be OK to be protected). ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .sorted() + .map(rep -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, rep.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(), + rep.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet) Review Comment: This method is misnamed - it doesn't group anything, it just enriches the host with a datacenter - maybe just `buildHostToDatacenterMapping`? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .sorted() + .map(rep -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, rep.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(), + rep.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet) + { + EndpointSnitchJmxOperations endpointSnitchInfo = jmxClient.proxy(EndpointSnitchJmxOperations.class, + ENDPOINT_SNITCH_INFO_OBJ_NAME); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), + (String host) -> getDatacenter(endpointSnitchInfo, host))); + } + + private String getDatacenter(EndpointSnitchJmxOperations endpointSnitchInfo, String host) + { + try + { + return endpointSnitchInfo.getDatacenter(host); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @NotNull + private static Map<String, List<String>> replicasByDataCenter(Map<String, String> hostToDatacenter, + Collection<String> replicas) + { + return replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get)); + } + + /** + * We want to identity a joining node, to replace a dead node, differently from a newly joining node. To + * do this we analyze gossip info and set 'Replacing' state for node replacing a dead node. + * {@link StateWithReplacement} is used to set replacing state for a node. + * + * <p>We are adding this state for token range replica provider endpoint. To send out replicas for a + * range along with state of replicas including replacing state. + */ + static class StateWithReplacement extends RingProvider.State + { + private static final String STATE_REPLACING = "Replacing"; + private final Set<String> joiningNodes; + private final GossipInfoResponse gossipInfo; + + StateWithReplacement(List<String> joiningNodes, List<String> leavingNodes, List<String> movingNodes, + GossipInfoResponse gossipInfo) + { + super(joiningNodes, leavingNodes, movingNodes); + this.joiningNodes = new HashSet<>(joiningNodes); + this.gossipInfo = gossipInfo; + } + + /** + * This method returns state of a node and accounts for a new 'Replacing' state if the node is + * replacing a dead node. For returning this state, the method checks status of the node in gossip + * information. + * + * @param endpoint node information represented usually in form of 'ip:port' + * @return Node status + */ + @Override + String of(String endpoint) + { + if (joiningNodes.contains(endpoint)) + { + GossipInfoResponse.GossipInfo gossipInfoEntry = gossipInfo.get(endpoint); + + if (gossipInfoEntry != null) + { + LOGGER.info("Found gossipInfoEntry={}", gossipInfoEntry); Review Comment: NIT: Debug? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); Review Comment: The indentation/formatting here is horrible to follow - I realize this is the default the project uses, but is there a way we can improve this, as it's really quite difficult to understand what's at what level of the concat/map/etc. here... Alternately, break this down into smaller chunks and assign each to an intermediate variable with a good name. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); Review Comment: General comment about this line of code + most of the rest of this class - It's _really hard_ to figure out what's what in this class with everything being maps of native types - I realize that we can't change the return types of the JMX endpoints, but can we please try to transform things as soon as possible after we get them to something more meaningful types (Datacenter, Range, ReplicaInfo, Token, some of which I realize we don't have yet)? It would make all of this logic significantly easier to understand, and prevent bugs in the future. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java: ########## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jetbrains.annotations.NotNull; + + +/** + * Representation of a token range (exclusive start and inclusive end - (start, end]) and the + * corresponding mapping to replica-set hosts. Static factory ensures that ranges are always unwrapped. + * Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) <= 0 implies that + * range A occurs before range B, not their sizes. + */ +public class TokenRangeReplicas implements Comparable<TokenRangeReplicas> +{ + private final BigInteger start; + private final BigInteger end; + + private final Partitioner partitioner; + + private final Set<String> replicaSet; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicas.class); + + private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner partitioner, Set<String> replicaSet) + { + this.start = start; + this.end = end; + this.partitioner = partitioner; + this.replicaSet = replicaSet; + } + + public static List<TokenRangeReplicas> generateTokenRangeReplicas(BigInteger start, + BigInteger end, + Partitioner partitioner, + Set<String> replicaSet) + { + if (start.compareTo(end) > 0) + { + return unwrapRange(start, end, partitioner, replicaSet); + } + + return Collections.singletonList(new TokenRangeReplicas(start, end, partitioner, replicaSet)); + } + + + public BigInteger start() + { + return start; + } + + public BigInteger end() + { + return end; + } + + public Set<String> replicaSet() + { + return replicaSet; + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(@NotNull TokenRangeReplicas other) + { + validateRangesForComparison(other); + int compareStart = this.start.compareTo(other.start); + return (compareStart != 0) ? compareStart : this.end.compareTo(other.end); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + TokenRangeReplicas that = (TokenRangeReplicas) o; + + return Objects.equals(start, that.start) + && Objects.equals(end, that.end) + && partitioner == that.partitioner; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() + { + return Objects.hash(start, end, partitioner); + } + + private void validateRangesForComparison(@NotNull TokenRangeReplicas other) + { + if (this.partitioner != other.partitioner) + throw new IllegalStateException("Token ranges being compared do not have the same partitioner"); + } + + protected boolean contains(TokenRangeReplicas other) + { + validateRangesForComparison(other); + return (other.start.compareTo(this.start) >= 0 && other.end.compareTo(this.end) <= 0); + } + + /** + * For subset ranges, this is used to determine if a range is larger than the other by comparing start-end lengths + * If both ranges end at the min, we compare starting points to determine the result. + * When the left range is the only one ending at min, it is always the larger one since all subsequent ranges + * in the sorted range list have to be smaller. + * <p> + * This method assumes that the ranges are normalized and unwrapped, i.e. + * 'this' comes before 'other' AND there's no wrapping around the min token Review Comment: This assumption should be checked with a precondition, or we should write the method to not care about if `this` comes before `other`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

