yifan-c commented on code in PR #171:
URL: https://github.com/apache/cassandra-sidecar/pull/171#discussion_r1955389788


##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java:
##########
@@ -104,6 +153,86 @@ public TokenRangeReplicasResponse 
cachedReplicaByTokenRange(RestoreJob restoreJo
         return replicaByTokenRangePerKeyspace.forRestoreJob(restoreJob);
     }
 
+    /**
+     * Fetch the latest topology view
+     * <p>It is synchronized when force refreshing as there is potential 
contention from {@link #execute(Promise)}
+     *
+     * @param keyspace keyspace to determine replication
+     * @param forceRefresh whether refresh the topology view forcibly or not
+     * @return token ranges of the local Cassandra instances or an empty map 
of nothing is found
+     */
+    @Override
+    public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace, 
boolean forceRefresh)
+    {
+        TokenRangeReplicasResponse topology;
+        if (forceRefresh) // fetch the latest topology and load into cache
+        {
+            synchronized (this)
+            {
+                topology = prepareAndFetch((storageOperations, nodeSettings) 
-> {
+                    String partitioner = nodeSettings.partitioner();
+                    return replicaByTokenRangePerKeyspace.loadOne(keyspace, k 
-> storageOperations.tokenRangeReplicas(new Name(keyspace), partitioner));
+                });
+            }
+        }
+        else // get the cached value
+        {
+            topology = 
replicaByTokenRangePerKeyspace.topologyOfKeyspace(keyspace);
+        }
+
+        return calculateLocalTokenRanges(metadataFetcher, topology);
+    }
+
+    // todo: refactor to a utility class _when_ refactoring 
TokenRangeReplicasResponse data structure (separate out server and http data 
representations)
+    @NotNull
+    public static Map<Integer, Set<TokenRange>> 
calculateLocalTokenRanges(InstanceMetadataFetcher metadataFetcher, 
TokenRangeReplicasResponse topology)
+    {
+        if (topology == null)
+        {
+            return Collections.emptyMap();
+        }
+
+        // todo: this assumes one C* node per IP address
+        Map<String, Integer> allNodes = 
topology.replicaMetadata().values().stream()
+                                                
.collect(Collectors.toMap(TokenRangeReplicasResponse.ReplicaMetadata::address,
+                                                                          
TokenRangeReplicasResponse.ReplicaMetadata::port));
+
+        List<InstanceMetadata> localNodes = 
metadataFetcher.allLocalInstances();
+        Map<String, InstanceMetadata> localEndpointsToMetadata = new 
HashMap<>(localNodes.size());
+        for (InstanceMetadata instanceMetadata : localNodes)
+        {
+            populateEndpointToMetadata(instanceMetadata, allNodes, 
localEndpointsToMetadata);
+        }
+
+        Map<Integer, Set<TokenRange>> localTokenRanges = new 
HashMap<>(localEndpointsToMetadata.size());
+        for (TokenRangeReplicasResponse.ReplicaInfo ri : 
topology.writeReplicas())
+        {
+            TokenRange range = new TokenRange(Token.from(ri.start()), 
Token.from(ri.end()));
+            for (List<String> instanceOfDc : 
ri.replicasByDatacenter().values())
+            {
+                for (String instanceEndpoint : instanceOfDc)
+                {
+                    // skip the non-local nodes
+                    if 
(!localEndpointsToMetadata.containsKey(instanceEndpoint))
+                    {
+                        continue;
+                    }
+
+                    InstanceMetadata instanceMetadata = 
localEndpointsToMetadata.get(instanceEndpoint);
+                    localTokenRanges.compute(instanceMetadata.id(), (key, 
value) -> {
+                        if (value == null)
+                        {
+                            value = new HashSet<>();
+                        }
+                        value.add(range);
+                        return value;
+                    });

Review Comment:
   Agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to