yifan-c commented on code in PR #58: URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1306204877
########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java: ########## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jetbrains.annotations.NotNull; + + +/** + * Representation of a token range (exclusive start and inclusive end - (start, end]) and the + * corresponding mapping to replica-set hosts. Static factory ensures that ranges are always unwrapped. + * Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) <= 0 implies that + * range A occurs before range B, not their sizes. + */ +public class TokenRangeReplicas implements Comparable<TokenRangeReplicas> +{ + private final BigInteger start; + private final BigInteger end; + + private final Partitioner partitioner; + + private final Set<String> replicaSet; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicas.class); + + private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner partitioner, Set<String> replicaSet) + { + this.start = start; + this.end = end; + this.partitioner = partitioner; + this.replicaSet = replicaSet; + } + + public static List<TokenRangeReplicas> generateTokenRangeReplicas(BigInteger start, + BigInteger end, + Partitioner partitioner, + Set<String> replicaSet) + { + if (start.compareTo(end) > 0) + { + return unwrapRange(start, end, partitioner, replicaSet); + } + + return Collections.singletonList(new TokenRangeReplicas(start, end, partitioner, replicaSet)); + } + + + public BigInteger start() + { + return start; + } + + public BigInteger end() + { + return end; + } + + public Set<String> replicaSet() + { + return replicaSet; + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(@NotNull TokenRangeReplicas other) + { + validateRangesForComparison(other); + int compareStart = this.start.compareTo(other.start); + return (compareStart != 0) ? compareStart : this.end.compareTo(other.end); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + TokenRangeReplicas that = (TokenRangeReplicas) o; + + return Objects.equals(start, that.start) + && Objects.equals(end, that.end) + && partitioner == that.partitioner; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() + { + return Objects.hash(start, end, partitioner); + } + + private void validateRangesForComparison(@NotNull TokenRangeReplicas other) + { + if (this.partitioner != other.partitioner) + throw new IllegalStateException("Token ranges being compared do not have the same partitioner"); + } + + protected boolean contains(TokenRangeReplicas other) Review Comment: The `protected` methods are not for extension, since there is no subclass. Please not use `protected`. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); Review Comment: nit: add some indentation to make it easier to read ```suggestion List<TokenRangeReplicas> replicas = Stream.concat(naturalReplicaMappings.entrySet().stream(), pendingRangeMappings.entrySet().stream()) .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( new BigInteger(entry.getKey().get(0)), new BigInteger(entry.getKey().get(1)), partitioner, new HashSet<>(entry.getValue()))) .flatMap(Collection::stream) .collect(Collectors.toList()); ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) Review Comment: nit: readability ```suggestion return replicasByTokenRange.entrySet().stream() .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( new BigInteger(entry.getKey().get(0)), new BigInteger(entry.getKey().get(1)), partitioner, new HashSet<>(entry.getValue()))) .flatMap(Collection::stream) ``` ########## adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java: ########## @@ -0,0 +1,847 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for TokenRangeReplicas + */ +public class TokenRangeReplicasTest +{ + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicasTest.class); + + private boolean hasOverlaps(List<TokenRangeReplicas> rangeList) + { + Collections.sort(rangeList); + for (int c = 0, i = 1; i < rangeList.size(); i++) + { + if (rangeList.get(c++).end().compareTo(rangeList.get(i).start()) > 0) return true; + } + return false; + } + + private boolean checkContains(List<TokenRangeReplicas> resultList, TokenRangeReplicas expected) + { + return resultList.stream() + .map(TokenRangeReplicas::toString) + .anyMatch(r -> r.equals(expected.toString())); + } + + // non-overlapping ranges + @Test + public void simpleTest() + { + List<TokenRangeReplicas> simpleList = createSimpleTokenRangeReplicaList(); + LOGGER.info("Input:" + simpleList); + List<TokenRangeReplicas> rangeList = TokenRangeReplicas.normalize(simpleList); + LOGGER.info("Result:" + rangeList); + assertThat(hasOverlaps(rangeList)).isFalse(); + } + + // TODO: Validate unwrapping separately Review Comment: Not sure if it is addressed by `wrappedMultiOverlapTest` already. ########## common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java: ########## @@ -126,8 +126,15 @@ public synchronized Session localCql() return localSession; } - public synchronized void close() + public Session close() { + Session localSession; + synchronized (this) + { + localSession = this.localSession; + this.localSession = null; + } + Review Comment: 👍 ########## src/test/integration/org/apache/cassandra/sidecar/routes/MultiDcTokenSupplier.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.distributed.api.TokenSupplier; + +/** + * Static factory holder that provides a multi-DC token supplier + */ +public class MultiDcTokenSupplier +{ + + static TokenSupplier evenlyDistributedTokens(int numNodes, int numDcs, int numTokens) Review Comment: I guess you mean this ```suggestion static TokenSupplier evenlyDistributedTokens(int numNodesPerDc, int numDcs, int numTokensPerNode) ``` Then, what does the result of `numTokensPerNode * numDcs` mean? ########## src/test/integration/org/apache/cassandra/sidecar/routes/MultiDcTokenSupplier.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.distributed.api.TokenSupplier; + +/** + * Static factory holder that provides a multi-DC token supplier + */ +public class MultiDcTokenSupplier +{ + + static TokenSupplier evenlyDistributedTokens(int numNodes, int numDcs, int numTokens) + { + long totalTokens = (long) numNodes * numDcs * numTokens; + BigInteger increment = BigInteger.valueOf((Long.MAX_VALUE / totalTokens) * 4); Review Comment: Where is `4` coming from? ########## src/test/integration/org/apache/cassandra/sidecar/routes/BaseTokenRangeIntegrationTest.java: ########## @@ -0,0 +1,795 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; +import com.google.common.util.concurrent.Uninterruptibles; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.IntegrationTestBase; +import org.apache.cassandra.sidecar.adapters.base.Partitioner; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.testing.AbstractCassandraTestContext; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import org.apache.cassandra.utils.Shared; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MOVING_NODE_IDX; +import static org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MULTIDC_MOVING_NODE_IDX; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test the token range replica mapping endpoint with cassandra container. + */ +public class BaseTokenRangeIntegrationTest extends IntegrationTestBase +{ + + protected void validateTokenRanges(TokenRangeReplicasResponse mappingsResponse, + List<Range<BigInteger>> expectedRanges) + { + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicaSet = mappingsResponse.writeReplicas(); + List<TokenRangeReplicasResponse.ReplicaInfo> readReplicaSet = mappingsResponse.readReplicas(); + List<Range<BigInteger>> writeRanges = writeReplicaSet.stream() + .map(r -> Range.openClosed(new BigInteger(r.start()), + new BigInteger(r.end()))) + .collect(Collectors.toList()); + + List<Range<BigInteger>> readRanges = readReplicaSet.stream() + .map(r -> Range.openClosed(new BigInteger(r.start()), + new BigInteger(r.end()))) + .collect(Collectors.toList()); + + + assertThat(writeRanges.size()).isEqualTo(writeReplicaSet.size()); + assertThat(writeRanges).containsExactlyElementsOf(expectedRanges); + + //Sorted and Overlap check + validateOrderAndOverlaps(writeRanges); + validateOrderAndOverlaps(readRanges); + } + + private void validateOrderAndOverlaps(List<Range<BigInteger>> ranges) + { + for (int r = 0; r < ranges.size() - 1; r++) + { + assertThat(ranges.get(r).upperEndpoint()).isLessThan(ranges.get(r + 1).upperEndpoint()); + assertThat(ranges.get(r).intersection(ranges.get(r + 1)).isEmpty()).isTrue(); + } + } + + protected void validateNodeStates(TokenRangeReplicasResponse mappingResponse, + Set<String> dcReplication, + Function<Integer, String> statusFunction) + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + int expectedReplicas = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * dcReplication.size(); + + AbstractCassandraTestContext cassandraTestContext = sidecarTestContext.cassandraTestContext(); + assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas); + for (int i = 1; i <= cassandraTestContext.cluster().size(); i++) + { + IInstanceConfig config = cassandraTestContext.cluster().get(i).config(); + + if (dcReplication.contains(config.localDatacenter())) + { + String ipAndPort = config.broadcastAddress().getAddress().getHostAddress() + ":" + + config.broadcastAddress().getPort(); + + String expectedStatus = statusFunction.apply(i); + assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus); + } + } + } + + protected UpgradeableCluster getMultiDCCluster(int numNodes, + int numDcs, + BiConsumer<ClassLoader, Integer> initializer, + ConfigurableCassandraTestContext cassandraTestContext) + throws IOException + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + TokenSupplier mdcTokenSupplier = + MultiDcTokenSupplier.evenlyDistributedTokens(numNodes, + numDcs, + 1); + + int totalNodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs(); + return cassandraTestContext.configureAndStartCluster( + builder -> { + builder.withInstanceInitializer(initializer); + builder.withTokenSupplier(mdcTokenSupplier); + builder.withNodeIdTopology(networkTopology(totalNodeCount, + (nodeId) -> nodeId % 2 != 0 ? + dcAndRack("datacenter1", "rack1") : + dcAndRack("datacenter2", "rack2"))); + }); + } + + protected List<Range<BigInteger>> generateExpectedRanges() + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + int nodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs(); + return generateExpectedRanges(nodeCount); + } + + protected List<Range<BigInteger>> generateExpectedRanges(int nodeCount) + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + TokenSupplier tokenSupplier = (annotation.numDcs() > 1) ? + MultiDcTokenSupplier.evenlyDistributedTokens( + annotation.nodesPerDc() + annotation.newNodesPerDc(), + annotation.numDcs(), + 1) : + TokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc() + + annotation.newNodesPerDc(), + 1); + + List<Range<BigInteger>> expectedRanges = new ArrayList<>(); + BigInteger startToken = Partitioner.Murmur3.minToken; + BigInteger endToken = Partitioner.Murmur3.maxToken; + int node = 1; + BigInteger prevToken = new BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get()); + Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken); + expectedRanges.add(firstRange); + while (node <= nodeCount) + { + BigInteger currentToken = new BigInteger(tokenSupplier.tokens(node).stream().findFirst().get()); + expectedRanges.add(Range.openClosed(prevToken, currentToken)); + prevToken = currentToken; + node++; + } + expectedRanges.add(Range.openClosed(prevToken, endToken)); + return expectedRanges; + } + + protected Set<String> instancesFromReplicaSet(List<TokenRangeReplicasResponse.ReplicaInfo> replicas) + { + return replicas.stream() + .map(r -> r.replicasByDatacenter().values()) + .flatMap(Collection::stream) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); Review Comment: nit: one less line if you do this ```suggestion return replicas.stream() .flatMap(r -> r.replicasByDatacenter().values().stream()) .flatMap(Collection::stream) .collect(Collectors.toSet()); ``` ########## src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.net.UnknownHostException; + +import org.apache.commons.lang3.StringUtils; + +import com.datastax.driver.core.Metadata; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.StorageOperations; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasRequest; +import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; + +/** + * Handler which provides token range to read and write replica mapping + * + * <p>This handler provides token range replicas along with the state of the replicas. For the purpose + * of identifying the state of a newly joining node to replace a dead node from a newly joining node, + * a new state 'Replacing' has been added. + * It is represented by + * {@code org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider.StateWithReplacement} + */ +@Singleton +public class TokenRangeReplicaMapHandler extends AbstractHandler<TokenRangeReplicasRequest> +{ + + @Inject + public TokenRangeReplicaMapHandler(InstanceMetadataFetcher metadataFetcher, + CassandraInputValidator validator, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, validator); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + TokenRangeReplicasRequest request) + { + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + + StorageOperations storageOperations = delegate.storageOperations(); + Metadata metadata = delegate.metadata(); + if (storageOperations == null || metadata == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + + executorPools.service().executeBlocking(promise -> { + try + { + context.json(storageOperations.tokenRangeReplicas(request.keyspace(), metadata.getPartitioner())); + } + catch (UnknownHostException e) + { + processFailure(e, context, host, remoteAddress, request); Review Comment: right. The method in the interface is declared to throw `UnknownHostException`, but the implementation does not. It is wrapped in `RuntimeException`. Can you update the interface method to remove the throws? The `RuntimeException` does not need to be handled separately, it reaches to `onFailure`, which calls `processFailure`, if throws. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); Review Comment: Not an AI, but maybe refactor the code in a follow-up, so that `jmxClient` is not available in the scope to prevent calling `jmxClient.proxy` directly. ########## build.gradle: ########## @@ -326,6 +326,8 @@ tasks.register("integrationTest", Test) { useJUnitPlatform() { includeTags "integrationTest" } +// Uncomment below to run unit tests in parallel +// maxParallelForks = Runtime.runtime.availableProcessors() * 2 Review Comment: Just curious, why it is commented out? ########## common/build.gradle: ########## @@ -41,6 +41,8 @@ repositories { test { useJUnitPlatform() +// Uncomment below to run unit tests in parallel +// maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 Review Comment: Another curiosity, why the `maxParallelForks` calculations are different in different build files? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .sorted() + .map(rep -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, rep.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(), + rep.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet) + { + EndpointSnitchJmxOperations endpointSnitchInfo = jmxClient.proxy(EndpointSnitchJmxOperations.class, + ENDPOINT_SNITCH_INFO_OBJ_NAME); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), + (String host) -> getDatacenter(endpointSnitchInfo, host))); + } + + private String getDatacenter(EndpointSnitchJmxOperations endpointSnitchInfo, String host) + { + try + { + return endpointSnitchInfo.getDatacenter(host); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @NotNull + private static Map<String, List<String>> replicasByDataCenter(Map<String, String> hostToDatacenter, + Collection<String> replicas) + { + return replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get)); + } + + /** + * We want to identity a joining node, to replace a dead node, differently from a newly joining node. To + * do this we analyze gossip info and set 'Replacing' state for node replacing a dead node. + * {@link StateWithReplacement} is used to set replacing state for a node. + * + * <p>We are adding this state for token range replica provider endpoint. To send out replicas for a + * range along with state of replicas including replacing state. + */ + static class StateWithReplacement extends RingProvider.State + { + private static final String STATE_REPLACING = "Replacing"; + private final Set<String> joiningNodes; + private final GossipInfoResponse gossipInfo; + + StateWithReplacement(List<String> joiningNodes, List<String> leavingNodes, List<String> movingNodes, + GossipInfoResponse gossipInfo) + { + super(joiningNodes, leavingNodes, movingNodes); + this.joiningNodes = new HashSet<>(joiningNodes); + this.gossipInfo = gossipInfo; + } + + /** + * This method returns state of a node and accounts for a new 'Replacing' state if the node is + * replacing a dead node. For returning this state, the method checks status of the node in gossip + * information. + * + * @param endpoint node information represented usually in form of 'ip:port' + * @return Node status + */ + @Override + String of(String endpoint) + { + if (joiningNodes.contains(endpoint)) + { + GossipInfoResponse.GossipInfo gossipInfoEntry = gossipInfo.get(endpoint); + + if (gossipInfoEntry != null) + { + LOGGER.info("Found gossipInfoEntry={}", gossipInfoEntry); Review Comment: nit: should we only print the endpoint and status? Those 2 values should be enough to reason about the runtime here. ########## common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java: ########## @@ -259,12 +259,16 @@ private Map<String, Object> buildJmxEnv() } @Override - public synchronized void close() throws IOException + public void close() throws IOException { - JMXConnector connector = jmxConnector; - if (connector != null) + JMXConnector connector; + synchronized (this) { + connector = jmxConnector; jmxConnector = null; Review Comment: Reset `connected` to false too? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .sorted() + .map(rep -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, rep.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(), + rep.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet) + { + EndpointSnitchJmxOperations endpointSnitchInfo = jmxClient.proxy(EndpointSnitchJmxOperations.class, + ENDPOINT_SNITCH_INFO_OBJ_NAME); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), + (String host) -> getDatacenter(endpointSnitchInfo, host))); + } + + private String getDatacenter(EndpointSnitchJmxOperations endpointSnitchInfo, String host) + { + try + { + return endpointSnitchInfo.getDatacenter(host); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @NotNull + private static Map<String, List<String>> replicasByDataCenter(Map<String, String> hostToDatacenter, + Collection<String> replicas) + { + return replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get)); + } + + /** + * We want to identity a joining node, to replace a dead node, differently from a newly joining node. To + * do this we analyze gossip info and set 'Replacing' state for node replacing a dead node. + * {@link StateWithReplacement} is used to set replacing state for a node. + * + * <p>We are adding this state for token range replica provider endpoint. To send out replicas for a + * range along with state of replicas including replacing state. + */ + static class StateWithReplacement extends RingProvider.State + { + private static final String STATE_REPLACING = "Replacing"; Review Comment: How about create an enum for the state? It helps in 2 ways, strong typed and comprehensive matching. ########## common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java: ########## @@ -259,12 +259,16 @@ private Map<String, Object> buildJmxEnv() } @Override - public synchronized void close() throws IOException + public void close() throws IOException { - JMXConnector connector = jmxConnector; - if (connector != null) + JMXConnector connector; + synchronized (this) Review Comment: Can you make `connect()` (where `jmxConnector` is update) synchronized? It is synchronized in its only callsite. It won't hurt performance when synchronizing `connnect()`, but more error-proof. ########## common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java: ########## @@ -126,8 +126,15 @@ public synchronized Session localCql() return localSession; } - public synchronized void close() + public Session close() { + Session localSession; + synchronized (this) + { + localSession = this.localSession; + this.localSession = null; + } + Review Comment: 👍 ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.JmxClient; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; + +/** + * Aggregates the replica-set by token range + */ +public class TokenRangeReplicaProvider +{ + private final JmxClient jmxClient; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); + + public TokenRangeReplicaProvider(JmxClient jmxClient) + { + this.jmxClient = jmxClient; + } + + public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) + { + Objects.requireNonNull(keyspace, "keyspace must be non-null"); + + StorageJmxOperations storage = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + + // Retrieve map of primary token ranges to endpoints that describe the ring topology + Map<List<String>, List<String>> rangeToEndpointMappings = storage.getRangeToEndpointWithPortMap(keyspace); + // Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata + Map<List<String>, List<String>> pendingRangeMappings = storage.getPendingRangeToEndpointWithPortMap(keyspace); + + Set<String> replicaSet = Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream), + pendingRangeMappings.values().stream().flatMap(List::stream)) + .collect(Collectors.toSet()); + + Map<String, String> hostToDatacenter = groupHostsByDatacenter(replicaSet); + + // Retrieve map of all token ranges (pending & primary) to endpoints + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas = + writeReplicasFromPendingRanges(rangeToEndpointMappings, + pendingRangeMappings, + hostToDatacenter, + partitioner, + keyspace); + + Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet, storage); + + return new TokenRangeReplicasResponse( + replicaToStateMap, + writeReplicas, + mappingsToUnwrappedReplicaSet(rangeToEndpointMappings, hostToDatacenter, partitioner)); + } + + private Map<String, String> replicaToStateMap(Set<String> replicaSet, StorageJmxOperations storage) + { + List<String> joiningNodes = storage.getJoiningNodesWithPort(); + List<String> leavingNodes = storage.getLeavingNodesWithPort(); + List<String> movingNodes = storage.getMovingNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); + GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); + + StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), state::of)); + } + + private String getRawGossipInfo() + { + return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME) + .getAllEndpointStatesWithPort(); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + writeReplicasFromPendingRanges(Map<List<String>, List<String>> naturalReplicaMappings, + Map<List<String>, List<String>> pendingRangeMappings, + Map<String, String> hostToDatacenter, + Partitioner partitioner, + String keyspace) + { + LOGGER.debug("Pending token ranges for keyspace={}, pendingRangeMappings={}", keyspace, pendingRangeMappings); + // Merge natural and pending range replicas to generate candidates for write-replicas + List<TokenRangeReplicas> replicas = Stream.concat( + naturalReplicaMappings.entrySet().stream(), + pendingRangeMappings.entrySet().stream()) + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // Candidate write-replica mappings (merged from natural and pending ranges) are normalized + // by consolidating overlapping ranges + return TokenRangeReplicas.normalize(replicas).stream() + .map(range -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, range.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(), + range.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private List<TokenRangeReplicasResponse.ReplicaInfo> + mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>> replicasByTokenRange, + Map<String, String> hostToDatacenter, + Partitioner partitioner) + { + return replicasByTokenRange.entrySet().stream() + .map(entry -> TokenRangeReplicas.generateTokenRangeReplicas( + new BigInteger(entry.getKey().get(0)), + new BigInteger(entry.getKey().get(1)), + partitioner, + new HashSet<>(entry.getValue()))) + .flatMap(Collection::stream) + .sorted() + .map(rep -> { + Map<String, List<String>> replicasByDc = + replicasByDataCenter(hostToDatacenter, rep.replicaSet()); + return new TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(), + rep.end().toString(), + replicasByDc); + }) + .collect(Collectors.toList()); + } + + private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet) + { + EndpointSnitchJmxOperations endpointSnitchInfo = jmxClient.proxy(EndpointSnitchJmxOperations.class, + ENDPOINT_SNITCH_INFO_OBJ_NAME); + + return replicaSet.stream() + .collect(Collectors.toMap(Function.identity(), + (String host) -> getDatacenter(endpointSnitchInfo, host))); + } + + private String getDatacenter(EndpointSnitchJmxOperations endpointSnitchInfo, String host) + { + try + { + return endpointSnitchInfo.getDatacenter(host); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @NotNull + private static Map<String, List<String>> replicasByDataCenter(Map<String, String> hostToDatacenter, + Collection<String> replicas) + { + return replicas.stream().collect(Collectors.groupingBy(hostToDatacenter::get)); + } + + /** + * We want to identity a joining node, to replace a dead node, differently from a newly joining node. To + * do this we analyze gossip info and set 'Replacing' state for node replacing a dead node. + * {@link StateWithReplacement} is used to set replacing state for a node. + * + * <p>We are adding this state for token range replica provider endpoint. To send out replicas for a + * range along with state of replicas including replacing state. + */ + static class StateWithReplacement extends RingProvider.State + { + private static final String STATE_REPLACING = "Replacing"; + private final Set<String> joiningNodes; Review Comment: Why does it declare another copy of `joiningNodes`? It is available from the base class (but need to make it protected or add getter). ########## src/test/integration/org/apache/cassandra/sidecar/routes/BaseTokenRangeIntegrationTest.java: ########## @@ -0,0 +1,795 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; +import com.google.common.util.concurrent.Uninterruptibles; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.IntegrationTestBase; +import org.apache.cassandra.sidecar.adapters.base.Partitioner; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.testing.AbstractCassandraTestContext; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import org.apache.cassandra.utils.Shared; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MOVING_NODE_IDX; +import static org.apache.cassandra.sidecar.routes.TokenRangeIntegrationMovingTest.MULTIDC_MOVING_NODE_IDX; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test the token range replica mapping endpoint with cassandra container. + */ +public class BaseTokenRangeIntegrationTest extends IntegrationTestBase +{ + + protected void validateTokenRanges(TokenRangeReplicasResponse mappingsResponse, + List<Range<BigInteger>> expectedRanges) + { + List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicaSet = mappingsResponse.writeReplicas(); + List<TokenRangeReplicasResponse.ReplicaInfo> readReplicaSet = mappingsResponse.readReplicas(); + List<Range<BigInteger>> writeRanges = writeReplicaSet.stream() + .map(r -> Range.openClosed(new BigInteger(r.start()), + new BigInteger(r.end()))) + .collect(Collectors.toList()); + + List<Range<BigInteger>> readRanges = readReplicaSet.stream() + .map(r -> Range.openClosed(new BigInteger(r.start()), + new BigInteger(r.end()))) + .collect(Collectors.toList()); + + + assertThat(writeRanges.size()).isEqualTo(writeReplicaSet.size()); + assertThat(writeRanges).containsExactlyElementsOf(expectedRanges); + + //Sorted and Overlap check + validateOrderAndOverlaps(writeRanges); + validateOrderAndOverlaps(readRanges); + } + + private void validateOrderAndOverlaps(List<Range<BigInteger>> ranges) + { + for (int r = 0; r < ranges.size() - 1; r++) + { + assertThat(ranges.get(r).upperEndpoint()).isLessThan(ranges.get(r + 1).upperEndpoint()); + assertThat(ranges.get(r).intersection(ranges.get(r + 1)).isEmpty()).isTrue(); + } + } + + protected void validateNodeStates(TokenRangeReplicasResponse mappingResponse, + Set<String> dcReplication, + Function<Integer, String> statusFunction) + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + int expectedReplicas = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * dcReplication.size(); + + AbstractCassandraTestContext cassandraTestContext = sidecarTestContext.cassandraTestContext(); + assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas); + for (int i = 1; i <= cassandraTestContext.cluster().size(); i++) + { + IInstanceConfig config = cassandraTestContext.cluster().get(i).config(); + + if (dcReplication.contains(config.localDatacenter())) + { + String ipAndPort = config.broadcastAddress().getAddress().getHostAddress() + ":" + + config.broadcastAddress().getPort(); + + String expectedStatus = statusFunction.apply(i); + assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus); + } + } + } + + protected UpgradeableCluster getMultiDCCluster(int numNodes, + int numDcs, + BiConsumer<ClassLoader, Integer> initializer, + ConfigurableCassandraTestContext cassandraTestContext) + throws IOException + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + TokenSupplier mdcTokenSupplier = + MultiDcTokenSupplier.evenlyDistributedTokens(numNodes, + numDcs, + 1); + + int totalNodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs(); + return cassandraTestContext.configureAndStartCluster( + builder -> { + builder.withInstanceInitializer(initializer); + builder.withTokenSupplier(mdcTokenSupplier); + builder.withNodeIdTopology(networkTopology(totalNodeCount, + (nodeId) -> nodeId % 2 != 0 ? + dcAndRack("datacenter1", "rack1") : + dcAndRack("datacenter2", "rack2"))); + }); + } + + protected List<Range<BigInteger>> generateExpectedRanges() + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + int nodeCount = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * annotation.numDcs(); + return generateExpectedRanges(nodeCount); + } + + protected List<Range<BigInteger>> generateExpectedRanges(int nodeCount) + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + TokenSupplier tokenSupplier = (annotation.numDcs() > 1) ? + MultiDcTokenSupplier.evenlyDistributedTokens( + annotation.nodesPerDc() + annotation.newNodesPerDc(), + annotation.numDcs(), + 1) : + TokenSupplier.evenlyDistributedTokens(annotation.nodesPerDc() + + annotation.newNodesPerDc(), + 1); + + List<Range<BigInteger>> expectedRanges = new ArrayList<>(); + BigInteger startToken = Partitioner.Murmur3.minToken; + BigInteger endToken = Partitioner.Murmur3.maxToken; + int node = 1; + BigInteger prevToken = new BigInteger(tokenSupplier.tokens(node++).stream().findFirst().get()); + Range<BigInteger> firstRange = Range.openClosed(startToken, prevToken); + expectedRanges.add(firstRange); + while (node <= nodeCount) + { + BigInteger currentToken = new BigInteger(tokenSupplier.tokens(node).stream().findFirst().get()); + expectedRanges.add(Range.openClosed(prevToken, currentToken)); + prevToken = currentToken; + node++; + } + expectedRanges.add(Range.openClosed(prevToken, endToken)); + return expectedRanges; + } + + protected Set<String> instancesFromReplicaSet(List<TokenRangeReplicasResponse.ReplicaInfo> replicas) + { + return replicas.stream() + .map(r -> r.replicasByDatacenter().values()) + .flatMap(Collection::stream) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + protected void validateWriteReplicaMappings(List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas, + Map<String, Map<Range<BigInteger>, List<String>>> expectedRangeMapping) + { + CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; + assertThat(writeReplicas).hasSize(expectedRangeMapping.get("datacenter1").size()); + for (TokenRangeReplicasResponse.ReplicaInfo r: writeReplicas) + { + Range<BigInteger> range = Range.openClosed(BigInteger.valueOf(Long.parseLong(r.start())), + BigInteger.valueOf(Long.parseLong(r.end()))); + assertThat(expectedRangeMapping).containsKey("datacenter1"); + assertThat(expectedRangeMapping.get("datacenter1")).containsKey(range); + // Replicaset for the same range match expected + assertThat(r.replicasByDatacenter().get("datacenter1")) + .containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter1").get(range)); + + if (annotation.numDcs() > 1) + { + assertThat(expectedRangeMapping).containsKey("datacenter2"); + assertThat(expectedRangeMapping.get("datacenter2")).containsKey(range); + assertThat(r.replicasByDatacenter().get("datacenter2")) + .containsExactlyInAnyOrderElementsOf(expectedRangeMapping.get("datacenter2").get(range)); + } + } + } + + void retrieveMappingWithKeyspace(VertxTestContext context, String keyspace, + Handler<HttpResponse<Buffer>> verifier) throws Exception + { + String testRoute = "/api/v1/keyspaces/" + keyspace + "/token-range-replicas"; + testWithClient(context, client -> { + client.get(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(verifier)); + }); + } + + void assertMappingResponseOK(TokenRangeReplicasResponse mappingResponse) + { + assertMappingResponseOK(mappingResponse, 1, Collections.singleton("datacenter1")); + } + + void assertMappingResponseOK(TokenRangeReplicasResponse mappingResponse, + int replicationFactor, + Set<String> dcReplication) + { + assertThat(mappingResponse).isNotNull(); + assertThat(mappingResponse.readReplicas()).isNotNull(); + assertThat(mappingResponse.writeReplicas()).isNotNull(); + TokenRangeReplicasResponse.ReplicaInfo readReplica = mappingResponse.readReplicas().get(0); + assertThat(readReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size()); + TokenRangeReplicasResponse.ReplicaInfo writeReplica = mappingResponse.writeReplicas().get(0); + assertThat(writeReplica.replicasByDatacenter()).isNotNull().hasSize(dcReplication.size()); + + for (String dcName : dcReplication) + { + assertThat(readReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName); + assertThat(readReplica.replicasByDatacenter().get(dcName)).isNotNull().hasSize(replicationFactor); + + assertThat(writeReplica.replicasByDatacenter().keySet()).isNotEmpty().contains(dcName); + assertThat(writeReplica.replicasByDatacenter().get(dcName)).isNotNull(); + assertThat(writeReplica.replicasByDatacenter().get(dcName).size()) + .isGreaterThanOrEqualTo(replicationFactor); + } + } + + /** + * ByteBuddy helper for a single joining node + */ + @Shared + public static class BBHelperSingleJoiningNode + { + public static final CountDownLatch TRANSIENT_STATE_START = new CountDownLatch(1); + public static final CountDownLatch TRANSIENT_STATE_END = new CountDownLatch(1); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + // Test case involves 3 node cluster with 1 joining node + // We intercept the bootstrap of the leaving node (4) to validate token ranges + if (nodeNumber == 6) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.service.StorageService") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("bootstrap").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelperSingleJoiningNode.class)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static boolean bootstrap(Collection<?> tokens, + long bootstrapTimeoutMillis, + @SuperCall Callable<Boolean> orig) throws Exception + { + boolean result = orig.call(); + // trigger bootstrap start and wait until bootstrap is ready from test + TRANSIENT_STATE_START.countDown(); + Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END); + return result; + } + } + + /** + * ByteBuddy helper for multiple joining nodes + */ + @Shared + public static class BBHelperMultipleJoiningNodes + { + public static final CountDownLatch TRANSIENT_STATE_START = new CountDownLatch(2); + public static final CountDownLatch TRANSIENT_STATE_END = new CountDownLatch(2); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + // Test case involves 3 node cluster with a 2 joining nodes + // We intercept the joining of nodes (4, 5) to validate token ranges + if (nodeNumber > 3) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.service.StorageService") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("bootstrap").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelperMultipleJoiningNodes.class)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static boolean bootstrap(Collection<?> tokens, + long bootstrapTimeoutMillis, + @SuperCall Callable<Boolean> orig) throws Exception + { + boolean result = orig.call(); + // trigger bootstrap start and wait until bootstrap is ready from test + TRANSIENT_STATE_START.countDown(); + Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END); + return result; + } + } + + /** + * ByteBuddy helper for doubling cluster size + */ + @Shared + public static class BBHelperDoubleClusterSize + { + public static final CountDownLatch TRANSIENT_STATE_START = new CountDownLatch(5); + public static final CountDownLatch TRANSIENT_STATE_END = new CountDownLatch(5); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + // Test case involves 5 node cluster doubling in size + // We intercept the bootstrap of the new nodes (6-10) to validate token ranges + if (nodeNumber > 5) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.service.StorageService") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("bootstrap").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelperDoubleClusterSize.class)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static boolean bootstrap(Collection<?> tokens, + long bootstrapTimeoutMillis, + @SuperCall Callable<Boolean> orig) throws Exception + { + boolean result = orig.call(); + // trigger bootstrap start and wait until bootstrap is ready from test + TRANSIENT_STATE_START.countDown(); + Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END); + return result; + } + } + + /** + * ByteBuddy Helper for a single leaving node + */ + @Shared + public static class BBHelperSingleLeavingNode + { + public static final CountDownLatch TRANSIENT_STATE_START = new CountDownLatch(1); + public static final CountDownLatch TRANSIENT_STATE_END = new CountDownLatch(1); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + // Test case involves 5 node cluster with 1 leaving node + // We intercept the shutdown of the leaving node (5) to validate token ranges + if (nodeNumber == 5) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.service.StorageService") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("unbootstrap")) + .intercept(MethodDelegation.to(BBHelperSingleLeavingNode.class)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + @SuppressWarnings("unused") + public static void unbootstrap(@SuperCall Callable<?> orig) throws Exception + { + TRANSIENT_STATE_START.countDown(); + Uninterruptibles.awaitUninterruptibly(TRANSIENT_STATE_END); + orig.call(); + } + } + + /** + * ByteBuddy helper for multiple leaving nodes + */ + @Shared + public static class BBHelperMultipleLeavingNodes Review Comment: Each BBHelper class is used in a single test class, and their implementations (install) highly depends on the test class, e.g. nodeNumber assumption. Therefore, it makes it hard to share the implementation with other test classes. How about move it to the test class where it is actually used? ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; + +/** + * Represents types of Partitioners supported and the corresponding starting token values + */ +public enum Partitioner +{ + Murmur3(BigInteger.valueOf(Long.MIN_VALUE), BigInteger.valueOf(Long.MAX_VALUE)), + Random(BigInteger.ONE.negate(), BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE)); Review Comment: Second to this. The value range of random partition token should be the same as in Cassandra. Besides that, can you add a brief description to clarify the range's ends. The minimum token is not inclusive, and the maximum token is. In math notion, it is `(MIN, MAX]` ########## src/test/resources/logback-test.xml: ########## @@ -20,7 +20,7 @@ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> - <level>DEBUG</level> + <level>INFO</level> Review Comment: I think `DEBUG` is more suitable for tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

