frankgh commented on code in PR #58:
URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1300509025
##########
README.md:
##########
@@ -49,6 +49,19 @@ The build script supports two parameters:
Remove any versions you may not want to test with. We recommend at least the
latest (released) 4.X series and `trunk`.
See Testing for more details on how to choose which Cassandra versions to use
while testing.
+For multi-node in-jvm dtests, network aliases will need to be setup for each
Cassandra node. The tests assume each node's
+ip address is 127.0.0.x, where x is the node id.
+
+For example if you populated your cluster with 3 nodes, create interfaces for
127.0.0.2 and 127.0.0.3 (the first node of course uses 127.0.0.1).
+
+### macOS network aliases
+To get up and running, create a temporary alias for every node except the
first:
+
+```
+sudo ifconfig lo0 alias 127.0.0.2
Review Comment:
I think we have up to 10 for the existing tests, should we just tell devs to
do this:
```suggestion
for i in {2..10}; do sudo ifconfig lo0 alias "127.0.0.${i}"; done
```
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+
+/**
+ * Represents types of Partitioners supported and the corresponding starting
token values
+ */
+public enum Partitioner
+{
+ Murmur3(BigInteger.valueOf(Long.MIN_VALUE),
BigInteger.valueOf(Long.MAX_VALUE)),
+ Random(BigInteger.ONE.negate(),
BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE));
Review Comment:
so looking at the Cassandra code, for Murmur3 the maximum is Long.MAX_VALUE
(https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java#L51).
but for Random in Cassandra the maximum is `2^127`
(https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java#L53)
, however here we declare it as `(2^127) - 1`. Any reason we subtract one for
random here?
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+ private final JmxClient jmxClient;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+ public TokenRangeReplicaProvider(JmxClient jmxClient)
+ {
+ this.jmxClient = jmxClient;
+ }
+
+ public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace,
Partitioner partitioner)
+ {
+ Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+ StorageJmxOperations storage =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
Review Comment:
Refer to `RingProvider`, we need to provide an initialization method in case
an extension class wishes to reuse the code with a custom StorageJmxOperation
interface (i.e. Cassandra 3.11 implementation)
```suggestion
StorageJmxOperations storage = initializeStorageOps();
```
##########
src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.net.UnknownHostException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasRequest;
+import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler which provides token range to read and write replica mapping
+ *
+ * <p>This handler provides token range replicas along with the state of the
replicas. For the purpose
+ * of identifying the state of a newly joining node to replace a dead node
from a newly joining node,
+ * a new state 'Replacing' has been added.
+ * It is represented by
+ * {@code
org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider.StateWithReplacement}
+ */
+@Singleton
+public class TokenRangeReplicaMapHandler extends
AbstractHandler<TokenRangeReplicasRequest>
+{
+
+ @Inject
+ public TokenRangeReplicaMapHandler(InstanceMetadataFetcher metadataFetcher,
+ CassandraInputValidator validator,
+ ExecutorPools executorPools)
+ {
+ super(metadataFetcher, executorPools, validator);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ String host,
+ SocketAddress remoteAddress,
+ TokenRangeReplicasRequest request)
+ {
+ CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+
+ StorageOperations storageOperations = delegate.storageOperations();
+ Metadata metadata = delegate.metadata();
+ if (storageOperations == null || metadata == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
+
+ executorPools.service().executeBlocking(promise -> {
+ try
+ {
+
context.json(storageOperations.tokenRangeReplicas(request.keyspace(),
metadata.getPartitioner()));
+ }
+ catch (UnknownHostException e)
+ {
+ processFailure(e, context, host, remoteAddress, request);
+ }
+ }).onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ @Override
+ protected TokenRangeReplicasRequest extractParamsOrThrow(RoutingContext
context)
+ {
+ return new TokenRangeReplicasRequest(keyspace(context, true));
+ }
+
+ @Override
+ protected void processFailure(Throwable cause, RoutingContext context,
String host, SocketAddress remoteAddress,
+ TokenRangeReplicasRequest request)
+ {
+ if (cause instanceof AssertionError &&
+ StringUtils.contains(cause.getMessage(), "Unknown keyspace"))
+ {
+ context.fail(new
HttpException(HttpResponseStatus.NOT_FOUND.code(), cause.getMessage()));
Review Comment:
nit, since we have these helper methods:
```suggestion
context.fail(HttpExceptions.wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
```
##########
src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.net.UnknownHostException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasRequest;
+import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+
+/**
+ * Handler which provides token range to read and write replica mapping
+ *
+ * <p>This handler provides token range replicas along with the state of the
replicas. For the purpose
+ * of identifying the state of a newly joining node to replace a dead node
from a newly joining node,
+ * a new state 'Replacing' has been added.
+ * It is represented by
+ * {@code
org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider.StateWithReplacement}
+ */
+@Singleton
+public class TokenRangeReplicaMapHandler extends
AbstractHandler<TokenRangeReplicasRequest>
+{
+
+ @Inject
+ public TokenRangeReplicaMapHandler(InstanceMetadataFetcher metadataFetcher,
+ CassandraInputValidator validator,
+ ExecutorPools executorPools)
+ {
+ super(metadataFetcher, executorPools, validator);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ String host,
+ SocketAddress remoteAddress,
+ TokenRangeReplicasRequest request)
+ {
+ CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+
+ StorageOperations storageOperations = delegate.storageOperations();
+ Metadata metadata = delegate.metadata();
+ if (storageOperations == null || metadata == null)
+ {
+ context.fail(cassandraServiceUnavailable());
+ return;
+ }
+
+ executorPools.service().executeBlocking(promise -> {
+ try
+ {
+
context.json(storageOperations.tokenRangeReplicas(request.keyspace(),
metadata.getPartitioner()));
+ }
+ catch (UnknownHostException e)
+ {
+ processFailure(e, context, host, remoteAddress, request);
Review Comment:
it looks like we never throw the `UnknownHostException` exception, we are
wrapping it in `RuntimeException`, should we catch that instead here?
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -162,6 +163,23 @@ public CompletableFuture<TimeSkewResponse> timeSkew(List<?
extends SidecarInstan
.build());
}
+ /**
+ * Executes the token-range replicas request using the default retry
policy and configured selection policy
+ *
+ * @param instances the list of Sidecar instances to try for this request
+ * @param keyspace the keyspace in Cassandra
+ * @return a completable future of the token-range replicas
+ */
+ public CompletableFuture<TokenRangeReplicasResponse>
tokenRangeReplicas(List<? extends SidecarInstance> instances,
+
String keyspace)
+ {
+ SidecarInstancesProvider instancesProvider = new
SimpleSidecarInstancesProvider(instances);
+ InstanceSelectionPolicy instanceSelectionPolicy = new
RandomInstanceSelectionPolicy(instancesProvider);
+ return executeRequestAsync(requestBuilder()
+
.instanceSelectionPolicy(instanceSelectionPolicy)
+
.tokenRangeReplicasRequest(keyspace).build());
Review Comment:
minor nit
```suggestion
.tokenRangeReplicasRequest(keyspace)
.build());
```
##########
common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java:
##########
@@ -240,21 +240,20 @@ public boolean isUp()
public void close()
{
- Session activeSession = cqlSessionProvider.localCql();
+ nodeSettings = null;
+ Session activeSession = cqlSessionProvider.close();
if (activeSession != null)
{
maybeUnregisterHostListener(activeSession);
- cqlSessionProvider.close();
}
- nodeSettings = null;
try
{
jmxClient.close();
}
catch (IOException e)
{
// Can't throw unchecked exceptions here, so wrap and rethrow
- throw new RuntimeException(e);
+ LOGGER.warn("Unable to close JMX client", e);
Review Comment:
update comment above, since we are not re-throwing the exception anymore
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+ private final JmxClient jmxClient;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+ public TokenRangeReplicaProvider(JmxClient jmxClient)
+ {
+ this.jmxClient = jmxClient;
+ }
+
+ public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace,
Partitioner partitioner)
+ {
+ Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+ StorageJmxOperations storage =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+ // Retrieve map of primary token ranges to endpoints that describe the
ring topology
+ Map<List<String>, List<String>> rangeToEndpointMappings =
storage.getRangeToEndpointWithPortMap(keyspace);
+ // Pending ranges include bootstrap tokens and leaving endpoints as
represented in the Cassandra TokenMetadata
+ Map<List<String>, List<String>> pendingRangeMappings =
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+ Set<String> replicaSet =
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+
pendingRangeMappings.values().stream().flatMap(List::stream))
+ .collect(Collectors.toSet());
+
+ Map<String, String> hostToDatacenter =
groupHostsByDatacenter(replicaSet);
+
+ // Retrieve map of all token ranges (pending & primary) to endpoints
+ List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+ writeReplicasFromPendingRanges(rangeToEndpointMappings,
+ pendingRangeMappings,
+ hostToDatacenter,
+ partitioner,
+ keyspace);
+
+ Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet,
storage);
+
+ return new TokenRangeReplicasResponse(
+ replicaToStateMap,
+ writeReplicas,
+ mappingsToUnwrappedReplicaSet(rangeToEndpointMappings,
hostToDatacenter, partitioner));
+ }
+
+ private Map<String, String> replicaToStateMap(Set<String> replicaSet,
StorageJmxOperations storage)
+ {
+ List<String> joiningNodes = storage.getJoiningNodesWithPort();
+ List<String> leavingNodes = storage.getLeavingNodesWithPort();
+ List<String> movingNodes = storage.getMovingNodesWithPort();
+
+ String rawGossipInfo = getRawGossipInfo();
+ GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+ StateWithReplacement state = new StateWithReplacement(joiningNodes,
leavingNodes, movingNodes, gossipInfo);
+
+ return replicaSet.stream()
+ .collect(Collectors.toMap(Function.identity(),
state::of));
+ }
+
+ private String getRawGossipInfo()
+ {
+ return jmxClient.proxy(ClusterMembershipJmxOperations.class,
FAILURE_DETECTOR_OBJ_NAME)
+ .getAllEndpointStatesWithPort();
+ }
+
+ private List<TokenRangeReplicasResponse.ReplicaInfo>
+ writeReplicasFromPendingRanges(Map<List<String>, List<String>>
naturalReplicaMappings,
+ Map<List<String>, List<String>>
pendingRangeMappings,
+ Map<String, String> hostToDatacenter,
+ Partitioner partitioner,
+ String keyspace)
+ {
+ LOGGER.debug("Pending token ranges for keyspace={},
pendingRangeMappings={}", keyspace, pendingRangeMappings);
+ // Merge natural and pending range replicas to generate candidates for
write-replicas
+ List<TokenRangeReplicas> replicas = Stream.concat(
+
naturalReplicaMappings.entrySet().stream(),
+
pendingRangeMappings.entrySet().stream())
+ .map(entry ->
TokenRangeReplicas.generateTokenRangeReplicas(
+ new
BigInteger(entry.getKey().get(0)),
+ new
BigInteger(entry.getKey().get(1)),
+ partitioner,
+ new
HashSet<>(entry.getValue())))
+ .flatMap(Collection::stream)
+
.collect(Collectors.toList());
+
+ // Candidate write-replica mappings (merged from natural and pending
ranges) are normalized
+ // by consolidating overlapping ranges
+ return TokenRangeReplicas.normalize(replicas).stream()
+ .map(range -> {
+ Map<String, List<String>> replicasByDc =
+ replicasByDataCenter(hostToDatacenter,
range.replicaSet());
+ return new
TokenRangeReplicasResponse.ReplicaInfo(range.start().toString(),
+
range.end().toString(),
+
replicasByDc);
+ })
+ .collect(Collectors.toList());
+ }
+
+ private List<TokenRangeReplicasResponse.ReplicaInfo>
+ mappingsToUnwrappedReplicaSet(Map<List<String>, List<String>>
replicasByTokenRange,
+ Map<String, String> hostToDatacenter,
+ Partitioner partitioner)
+ {
+ return replicasByTokenRange.entrySet().stream()
+ .map(entry ->
TokenRangeReplicas.generateTokenRangeReplicas(
+ new BigInteger(entry.getKey().get(0)),
+ new BigInteger(entry.getKey().get(1)),
+ partitioner,
+ new HashSet<>(entry.getValue())))
+ .flatMap(Collection::stream)
+ .sorted()
+ .map(rep -> {
+ Map<String, List<String>> replicasByDc =
+ replicasByDataCenter(hostToDatacenter,
rep.replicaSet());
+ return new
TokenRangeReplicasResponse.ReplicaInfo(rep.start().toString(),
+
rep.end().toString(),
+
replicasByDc);
+ })
+ .collect(Collectors.toList());
+ }
+
+ private Map<String, String> groupHostsByDatacenter(Set<String> replicaSet)
+ {
+ EndpointSnitchJmxOperations endpointSnitchInfo =
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+
ENDPOINT_SNITCH_INFO_OBJ_NAME);
Review Comment:
Similar comment
```suggestion
EndpointSnitchJmxOperations endpointSnitchInfo =
initializeEndpointProxy();
```
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the replica-set by token range
+ */
+public class TokenRangeReplicaProvider
+{
+ private final JmxClient jmxClient;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
+
+ public TokenRangeReplicaProvider(JmxClient jmxClient)
+ {
+ this.jmxClient = jmxClient;
+ }
+
+ public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace,
Partitioner partitioner)
+ {
+ Objects.requireNonNull(keyspace, "keyspace must be non-null");
+
+ StorageJmxOperations storage =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+
+ // Retrieve map of primary token ranges to endpoints that describe the
ring topology
+ Map<List<String>, List<String>> rangeToEndpointMappings =
storage.getRangeToEndpointWithPortMap(keyspace);
+ // Pending ranges include bootstrap tokens and leaving endpoints as
represented in the Cassandra TokenMetadata
+ Map<List<String>, List<String>> pendingRangeMappings =
storage.getPendingRangeToEndpointWithPortMap(keyspace);
+
+ Set<String> replicaSet =
Stream.concat(rangeToEndpointMappings.values().stream().flatMap(List::stream),
+
pendingRangeMappings.values().stream().flatMap(List::stream))
+ .collect(Collectors.toSet());
+
+ Map<String, String> hostToDatacenter =
groupHostsByDatacenter(replicaSet);
+
+ // Retrieve map of all token ranges (pending & primary) to endpoints
+ List<TokenRangeReplicasResponse.ReplicaInfo> writeReplicas =
+ writeReplicasFromPendingRanges(rangeToEndpointMappings,
+ pendingRangeMappings,
+ hostToDatacenter,
+ partitioner,
+ keyspace);
+
+ Map<String, String> replicaToStateMap = replicaToStateMap(replicaSet,
storage);
+
+ return new TokenRangeReplicasResponse(
+ replicaToStateMap,
+ writeReplicas,
+ mappingsToUnwrappedReplicaSet(rangeToEndpointMappings,
hostToDatacenter, partitioner));
+ }
+
+ private Map<String, String> replicaToStateMap(Set<String> replicaSet,
StorageJmxOperations storage)
+ {
+ List<String> joiningNodes = storage.getJoiningNodesWithPort();
+ List<String> leavingNodes = storage.getLeavingNodesWithPort();
+ List<String> movingNodes = storage.getMovingNodesWithPort();
+
+ String rawGossipInfo = getRawGossipInfo();
+ GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
+
+ StateWithReplacement state = new StateWithReplacement(joiningNodes,
leavingNodes, movingNodes, gossipInfo);
+
+ return replicaSet.stream()
+ .collect(Collectors.toMap(Function.identity(),
state::of));
+ }
+
+ private String getRawGossipInfo()
+ {
+ return jmxClient.proxy(ClusterMembershipJmxOperations.class,
FAILURE_DETECTOR_OBJ_NAME)
Review Comment:
Similar comment to above, also make sure the initialization methods are
protected so they can be overridden
```suggestion
return initializeClusterMembershipOps()
```
##########
common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Class response for the {@link TokenRangeReplicasRequest}
+ */
+public class TokenRangeReplicasResponse
+{
+ private final Map<String, String> replicaState;
+ private final List<ReplicaInfo> writeReplicas;
+ private final List<ReplicaInfo> readReplicas;
+
+ /**
+ * Constructs token range replicas response object with given params.
+ *
+ * @param replicaState mapping replica to it's state information
+ * @param writeReplicas list of write replicas {@link ReplicaInfo}
instances breakdown by token range
+ * @param readReplicas list of read replica {@link ReplicaInfo} instances
breakdown by token range
+ */
+ public TokenRangeReplicasResponse(@JsonProperty("replicaState")
Map<String, String> replicaState,
+ @JsonProperty("writeReplicas")
List<ReplicaInfo> writeReplicas,
+ @JsonProperty("readReplicas")
List<ReplicaInfo> readReplicas)
+ {
+ this.replicaState = replicaState;
+ this.writeReplicas = writeReplicas;
+ this.readReplicas = readReplicas;
+ }
+
+ /**
+ * @return returns replica to it's state information mapping
+ */
+ @JsonProperty("replicaState")
+ public Map<String, String> replicaState()
+ {
+ return replicaState;
+ }
+
+ /**
+ * @return returns the {@link ReplicaInfo} instances representing write
replicas for each token range
+ */
+ @JsonProperty("writeReplicas")
+ public List<ReplicaInfo> writeReplicas()
+ {
+ return writeReplicas;
+ }
+
+ /**
+ * @return returns the {@link ReplicaInfo} instances representing read
replicas for each token range
Review Comment:
```suggestion
* @return the {@link ReplicaInfo} instances representing read replicas
for each token range
```
##########
common/src/main/java/org/apache/cassandra/sidecar/common/data/GossipInfoResponse.java:
##########
@@ -53,6 +53,33 @@
*/
public class GossipInfoResponse extends HashMap<String,
GossipInfoResponse.GossipInfo>
{
+ /**
+ * Overrides the {@link #get(Object)} method. The gossip info keys usually
start with the format
+ * {@code /ip:port}. Some clients may be unaware of the preceding {@code
slash}, and lookups can
Review Comment:
this bug is exhibited in analytics library as seen here:
https://github.com/apache/cassandra-analytics/blob/trunk/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java#L523
##########
common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Class response for the {@link TokenRangeReplicasRequest}
+ */
+public class TokenRangeReplicasResponse
+{
+ private final Map<String, String> replicaState;
+ private final List<ReplicaInfo> writeReplicas;
+ private final List<ReplicaInfo> readReplicas;
+
+ /**
+ * Constructs token range replicas response object with given params.
+ *
+ * @param replicaState mapping replica to it's state information
+ * @param writeReplicas list of write replicas {@link ReplicaInfo}
instances breakdown by token range
+ * @param readReplicas list of read replica {@link ReplicaInfo} instances
breakdown by token range
+ */
+ public TokenRangeReplicasResponse(@JsonProperty("replicaState")
Map<String, String> replicaState,
+ @JsonProperty("writeReplicas")
List<ReplicaInfo> writeReplicas,
+ @JsonProperty("readReplicas")
List<ReplicaInfo> readReplicas)
+ {
+ this.replicaState = replicaState;
+ this.writeReplicas = writeReplicas;
+ this.readReplicas = readReplicas;
+ }
+
+ /**
+ * @return returns replica to it's state information mapping
Review Comment:
```suggestion
* @return the replica to state information mapping
```
##########
common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Class response for the {@link TokenRangeReplicasRequest}
+ */
+public class TokenRangeReplicasResponse
+{
+ private final Map<String, String> replicaState;
+ private final List<ReplicaInfo> writeReplicas;
+ private final List<ReplicaInfo> readReplicas;
+
+ /**
+ * Constructs token range replicas response object with given params.
+ *
+ * @param replicaState mapping replica to it's state information
+ * @param writeReplicas list of write replicas {@link ReplicaInfo}
instances breakdown by token range
+ * @param readReplicas list of read replica {@link ReplicaInfo} instances
breakdown by token range
+ */
+ public TokenRangeReplicasResponse(@JsonProperty("replicaState")
Map<String, String> replicaState,
+ @JsonProperty("writeReplicas")
List<ReplicaInfo> writeReplicas,
+ @JsonProperty("readReplicas")
List<ReplicaInfo> readReplicas)
+ {
+ this.replicaState = replicaState;
+ this.writeReplicas = writeReplicas;
+ this.readReplicas = readReplicas;
+ }
+
+ /**
+ * @return returns replica to it's state information mapping
+ */
+ @JsonProperty("replicaState")
+ public Map<String, String> replicaState()
+ {
+ return replicaState;
+ }
+
+ /**
+ * @return returns the {@link ReplicaInfo} instances representing write
replicas for each token range
Review Comment:
```suggestion
* @return the {@link ReplicaInfo} instances representing write replicas
for each token range
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]