yifan-c commented on code in PR #45:
URL: https://github.com/apache/cassandra-sidecar/pull/45#discussion_r1191781069


##########
cassandra40/build.gradle:
##########
@@ -12,7 +13,34 @@ repositories {
     mavenCentral()
 }
 
+test {
+    useJUnitPlatform()
+}
+
 dependencies {
-    compile project(":common")
-    implementation('org.jetbrains:annotations:23.0.0')
+    implementation(project(":common"))
+    compileOnly('org.jetbrains:annotations:23.0.0')
+    compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
+    
implementation("com.google.guava:guava:${project.rootProject.guavaVersion}")
+    implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
+
+    testImplementation 
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
+    testImplementation 
"org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
+    testImplementation "org.assertj:assertj-core:3.24.2"
+    testRuntimeOnly 
"org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"
+
+    testImplementation('org.mockito:mockito-core:4.10.0')
+    testImplementation('org.mockito:mockito-inline:4.10.0')
+
+}
+
+publishing {

Review Comment:
   nit: maybe declare it in the `allprojects` block of the root project. 



##########
cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/TokenRangeReplicas.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cassandra40;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Representation of a token range and the corresponding mapping to 
replica-set hosts
+ */
+public class TokenRangeReplicas implements Comparable<TokenRangeReplicas>
+{
+    private final BigInteger start;
+    private final BigInteger end;
+
+    private final Partitioner partitioner;
+
+    private final Set<String> replicaSet;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeReplicas.class);
+
+    public TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner 
partitioner, Set<String> replicaSet)
+    {
+        this.start = start;
+        this.end = end;
+        this.partitioner = partitioner;
+        this.replicaSet = replicaSet;
+    }
+
+
+    public BigInteger start()
+    {
+        return start;
+    }
+
+    public BigInteger end()
+    {
+        return end;
+    }
+
+    public Set<String> replicaSet()
+    {
+        return replicaSet;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int compareTo(@NotNull TokenRangeReplicas other)
+    {
+        if (this.partitioner != other.partitioner)
+            throw new IllegalStateException("Token ranges being compared do 
not have the same partitioner");
+
+        // TODO
+        BigInteger maxValue = this.partitioner.maxToken;
+        if (this.start.compareTo(other.start) == 0)
+        {
+            if (this.end.equals(maxValue)) return 1;
+            else if (other.end.equals(maxValue)) return -1;
+            else return this.end.compareTo(other.end);
+        }
+        else return this.start.compareTo(other.start);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof TokenRangeReplicas))
+        {
+            return false;
+        }
+        TokenRangeReplicas that = (TokenRangeReplicas) o;
+        return (this.start.equals(that.start) && this.end.equals(that.end) && 
this.partitioner == that.partitioner);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(start, end, partitioner);
+    }
+
+    private boolean isWrapAround()
+    {
+        return start.compareTo(end) >= 0;
+    }
+
+    private boolean isSubsetOf(TokenRangeReplicas other)
+    {
+        if (this.partitioner != other.partitioner)
+            throw new IllegalStateException("Token ranges being compared do 
not have the same partitioner");
+
+        BigInteger maxValue = this.partitioner.maxToken;
+        if (this.start.compareTo(other.start) >= 0)
+        {
+            // TODO:
+            if (other.end.equals(maxValue)) return true;
+            if (this.end.equals(maxValue)) return false;
+            if (this.end.compareTo(other.end) <= 0) return true;
+        }
+        return false;
+    }
+
+    /**
+     * For subset ranges, this is used to determine if a range is larger than 
the other by comparing start-end lengths
+     * If both ranges end at the min, we compare starting points to determine 
the result.
+     * When the left range is the only one ending at min, it is always the 
larger one since all subsequent ranges
+     * in the sorted range list have to be smaller.
+     *
+     * @param other the next range in the range list to compare
+     * @return true if "this" range is larger than the other
+     */
+    private boolean isLarger(TokenRangeReplicas other)
+    {
+        if (this.partitioner != other.partitioner)
+            throw new IllegalStateException("Token ranges being compared do 
not have the same partitioner");
+
+        // If both ranges end at min, we compare start of ranges
+        if (this.end.equals(partitioner.maxToken) && 
other.end.equals(partitioner.maxToken))
+        {
+            return this.start.compareTo(other.start) < 0;
+        }
+
+        if (this.end.equals(partitioner.maxToken)) return true;
+        if (other.end.equals(partitioner.maxToken)) return false;
+
+        return 
this.end.subtract(this.start).compareTo(other.end.subtract(other.start)) > 0;
+    }
+
+    /**
+     * Determines intersection if the next range starts before the current 
range ends.
+     * When the current range ending at min, we determine intersection merely 
if the next range starts after the current
+     * since all subsequent ranges have to be subsets.
+     *
+     * @param other the range we are currently processing to check if "this" 
intersects it
+     * @return true if "this" range intersects the other
+     */
+    private boolean intersects(TokenRangeReplicas other)
+    {
+        return (other.end.compareTo(partitioner.maxToken) == 0 && 
this.start.compareTo(other.start) > 0) ||
+               this.start.compareTo(other.end) < 0;
+    }

Review Comment:
   I think this implementation is wrong. You want to compare 
`this.end.compareTo(other.start) < 0;`. Before the comparison, it needs to sort 
the 2 ranges by `start`, and make sure to compare left range with right range.
   Beside that, the first condition that checks with the maxToken of the 
partitioner seems not necessary. It only wants to compare 2 ranges.  
   
   The below test fails 
   
   ```java
       @Test
       void testInterests()
       {
           TokenRangeReplicas range1 = new 
TokenRangeReplicas(BigInteger.valueOf(1), BigInteger.valueOf(10), 
Partitioner.Murmur3, new HashSet<>());
           TokenRangeReplicas range2 = new 
TokenRangeReplicas(BigInteger.valueOf(9), BigInteger.valueOf(12), 
Partitioner.Murmur3, new HashSet<>());
           assertThat(range1.intersects(range2)).isTrue();
           assertThat(range2.intersects(range1)).isTrue();
   
           TokenRangeReplicas range3 = new 
TokenRangeReplicas(BigInteger.valueOf(1), BigInteger.valueOf(10), 
Partitioner.Murmur3, new HashSet<>());
           TokenRangeReplicas range4 = new 
TokenRangeReplicas(BigInteger.valueOf(11), BigInteger.valueOf(20), 
Partitioner.Murmur3, new HashSet<>());
           assertThat(range3.intersects(range4)).isFalse();
           assertThat(range4.intersects(range3)).isFalse();
       }
   ```



##########
cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/RingProvider.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cassandra40;
+
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.net.HostAndPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.data.RingEntry;
+import org.apache.cassandra.sidecar.common.data.RingResponse;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.cassandra.sidecar.cassandra40.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.cassandra40.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+
+/**
+ * Aggregates the ring view of cluster
+ */
+public class RingProvider
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RingProvider.class);
+    private static final String UNKNOWN_SHORT = "?";
+    private static final String UNKNOWN = "Unknown";
+    private static final String STATUS_UP = "Up";
+    private static final String STATUS_DOWN = "Down";
+    private static final String STATE_JOINING = "Joining";
+    private static final String STATE_LEAVING = "Leaving";
+    private static final String STATE_MOVING = "Moving";
+    private static final String STATE_NORMAL = "Normal";
+    private static final String DECIMAL_FORMAT = "##0.00%";
+
+    private final JmxClient jmxClient;
+    private final DnsResolver dnsResolver;
+
+    public RingProvider(JmxClient jmxClient, DnsResolver dnsResolver)
+    {
+        this.jmxClient = jmxClient;
+        this.dnsResolver = dnsResolver;
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    public RingResponse ring(@Nullable String keyspace) throws 
UnknownHostException
+    {
+        StorageJmxOperations probe = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        EndpointSnitchJmxOperations epSnitchInfo = 
jmxClient.proxy(EndpointSnitchJmxOperations.class,
+                                                                   
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        // Collect required data from the probe
+        List<String> liveNodes = probe.getLiveNodesWithPort();
+        List<String> deadNodes = probe.getUnreachableNodesWithPort();
+        Status status = new Status(liveNodes, deadNodes);
+        List<String> joiningNodes = probe.getJoiningNodesWithPort();
+        List<String> leavingNodes = probe.getLeavingNodesWithPort();
+        List<String> movingNodes = probe.getMovingNodesWithPort();
+        State state = new State(joiningNodes, leavingNodes, movingNodes);
+        Map<String, String> loadMap = probe.getLoadMapWithPort();
+        Map<String, String> tokensToEndpoints = 
probe.getTokenToEndpointWithPortMap();
+        Map<String, String> endpointsToHostIds = 
probe.getEndpointWithPortToHostId();
+
+        boolean showEffectiveOwnership = true;
+        // Calculate per-token ownership of the ring
+        Map<String, Float> ownerships;
+        try
+        {
+            ownerships = probe.effectiveOwnershipWithPort(keyspace);
+        }
+        catch (IllegalStateException ex)
+        {
+            ownerships = probe.getOwnershipWithPort();
+            LOGGER.warn("Unable to retrieve effective ownership information 
for keyspace={}", keyspace, ex);
+            showEffectiveOwnership = false;
+        }
+
+        // DecimalFormat is not thread-safe, so we need to create a new 
instance per thread
+        DecimalFormat ownsFormat = new DecimalFormat(DECIMAL_FORMAT);
+        RingResponse response = new RingResponse(tokensToEndpoints.size());
+        for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
+        {
+            String endpoint = entry.getValue();
+            String token = entry.getKey();
+            HostAndPort hap = resolve(endpoint, dnsResolver);
+            Float owns = ownerships.get(endpoint);
+            RingEntry ringEntry = new RingEntry.Builder()
+                                  
.datacenter(epSnitchInfo.getDatacenter(endpoint))
+                                  .rack(queryRack(epSnitchInfo, endpoint))
+                                  .status(status.of(endpoint))
+                                  .state(state.of(endpoint))
+                                  .load(loadMap.getOrDefault(endpoint, 
UNKNOWN_SHORT))
+                                  .owns(formatOwns(showEffectiveOwnership, 
ownsFormat, owns))
+                                  .token(token)
+                                  .address(hap.getHost())
+                                  .port(hap.getPort())
+                                  
.fqdn(dnsResolver.reverseResolve(hap.getHost()))
+                                  
.hostId(endpointsToHostIds.getOrDefault(endpoint, UNKNOWN))
+                                  .build();
+            response.add(ringEntry);
+        }
+
+        return response;
+    }
+
+    /**
+     * Resolves the endpoint to the format "IP_ADDRESS:PORT"
+     *
+     * @return host and port. Port defaults to -1 if not included in endpoint.
+     * @throws UnknownHostException when endpoint cannot be resolved
+     */
+    @SuppressWarnings("UnstableApiUsage")
+    private static HostAndPort resolve(String endpoint, DnsResolver resolver) 
throws UnknownHostException
+    {
+        HostAndPort hap = HostAndPort.fromString(endpoint);
+        String address = resolver.resolve(hap.getHost());
+        return HostAndPort.fromParts(address, hap.getPortOrDefault(-1));
+    }
+
+    private static String formatOwns(boolean showEffectiveOwnership, 
DecimalFormat ownsFormat, Float owns)
+    {
+        if (showEffectiveOwnership && owns != null)
+            return ownsFormat.format(owns);
+        return UNKNOWN_SHORT;
+    }
+
+    /**
+     * Data class to get status of endppints

Review Comment:
   nit
   ```suggestion
        * Data class to get status of endpoints
   ```



##########
cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/StorageJmxOperations.java:
##########
@@ -1,12 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.cassandra.sidecar.cassandra40;
 
+import java.util.List;
 import java.util.Map;
 
 /**
- * An interface that pulls a method from the Cassandra Storage Service Proxy
+ * An interface that pulls methods from the Cassandra Storage Service Proxy
  */
 public interface StorageJmxOperations
 {
+    String STORAGE_SERVICE_OBJ_NAME = 
"org.apache.cassandra.db:type=StorageService";
+
+    /**
+     * Retrieve the list of live nodes in the cluster, where "liveness" is
+     * determined by the failure detector of the node being queried.
+     *
+     * @return set of IP addresses, as Strings
+     */
+    List<String> getLiveNodesWithPort();
+
+    /**
+     * Retrieve the list of unreachable nodes in the cluster, as determined
+     * by this node's failure detector.
+     *
+     * @return set of IP addresses, as Strings
+     */
+    List<String> getUnreachableNodesWithPort();
+
+    /**
+     * Retrieve the list of nodes currently bootstrapping into the ring.
+     *
+     * @return set of IP addresses, as Strings
+     */
+    List<String> getJoiningNodesWithPort();
+
+    /**
+     * Retrieve the list of nodes currently leaving the ring.
+     *
+     * @return set of IP addresses, as Strings
+     */
+    List<String> getLeavingNodesWithPort();
+
+    /**
+     * Retrieve the list of nodes currently moving in the ring.
+     *
+     * @return set of IP addresses, as Strings
+     */
+    List<String> getMovingNodesWithPort();
+
+    /**
+     * Human-readable load value.  Keys are IP addresses.
+     */
+    Map<String, String> getLoadMapWithPort();
+
+    /**
+     * Retrieve a map of tokens to endpoints, including the bootstrapping
+     * ones.
+     *
+     * @return a map of tokens to endpoints in ascending order
+     */
+    Map<String, String> getTokenToEndpointWithPortMap();
+
+    /**
+     * Effective ownership is % of the data each node owns given the keyspace
+     * we calculate the percentage using replication factor.
+     * If Keyspace == null, this method will try to verify if all the keyspaces
+     * in the cluster have the same replication strategies and if yes then we 
will
+     * use the first else a empty Map is returned.

Review Comment:
   nit
   ```suggestion
        * use the first else an empty Map is returned.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to