yifan-c commented on code in PR #171:
URL: https://github.com/apache/cassandra-sidecar/pull/171#discussion_r1955389788
##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java:
##########
@@ -104,6 +153,86 @@ public TokenRangeReplicasResponse
cachedReplicaByTokenRange(RestoreJob restoreJo
return replicaByTokenRangePerKeyspace.forRestoreJob(restoreJob);
}
+ /**
+ * Fetch the latest topology view
+ * <p>It is synchronized when force refreshing as there is potential
contention from {@link #execute(Promise)}
+ *
+ * @param keyspace keyspace to determine replication
+ * @param forceRefresh whether refresh the topology view forcibly or not
+ * @return token ranges of the local Cassandra instances or an empty map
of nothing is found
+ */
+ @Override
+ public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace,
boolean forceRefresh)
+ {
+ TokenRangeReplicasResponse topology;
+ if (forceRefresh) // fetch the latest topology and load into cache
+ {
+ synchronized (this)
+ {
+ topology = prepareAndFetch((storageOperations, nodeSettings)
-> {
+ String partitioner = nodeSettings.partitioner();
+ return replicaByTokenRangePerKeyspace.loadOne(keyspace, k
-> storageOperations.tokenRangeReplicas(new Name(keyspace), partitioner));
+ });
+ }
+ }
+ else // get the cached value
+ {
+ topology =
replicaByTokenRangePerKeyspace.topologyOfKeyspace(keyspace);
+ }
+
+ return calculateLocalTokenRanges(metadataFetcher, topology);
+ }
+
+ // todo: refactor to a utility class _when_ refactoring
TokenRangeReplicasResponse data structure (separate out server and http data
representations)
+ @NotNull
+ public static Map<Integer, Set<TokenRange>>
calculateLocalTokenRanges(InstanceMetadataFetcher metadataFetcher,
TokenRangeReplicasResponse topology)
+ {
+ if (topology == null)
+ {
+ return Collections.emptyMap();
+ }
+
+ // todo: this assumes one C* node per IP address
+ Map<String, Integer> allNodes =
topology.replicaMetadata().values().stream()
+
.collect(Collectors.toMap(TokenRangeReplicasResponse.ReplicaMetadata::address,
+
TokenRangeReplicasResponse.ReplicaMetadata::port));
+
+ List<InstanceMetadata> localNodes =
metadataFetcher.allLocalInstances();
+ Map<String, InstanceMetadata> localEndpointsToMetadata = new
HashMap<>(localNodes.size());
+ for (InstanceMetadata instanceMetadata : localNodes)
+ {
+ populateEndpointToMetadata(instanceMetadata, allNodes,
localEndpointsToMetadata);
+ }
+
+ Map<Integer, Set<TokenRange>> localTokenRanges = new
HashMap<>(localEndpointsToMetadata.size());
+ for (TokenRangeReplicasResponse.ReplicaInfo ri :
topology.writeReplicas())
+ {
+ TokenRange range = new TokenRange(Token.from(ri.start()),
Token.from(ri.end()));
+ for (List<String> instanceOfDc :
ri.replicasByDatacenter().values())
+ {
+ for (String instanceEndpoint : instanceOfDc)
+ {
+ // skip the non-local nodes
+ if
(!localEndpointsToMetadata.containsKey(instanceEndpoint))
+ {
+ continue;
+ }
+
+ InstanceMetadata instanceMetadata =
localEndpointsToMetadata.get(instanceEndpoint);
+ localTokenRanges.compute(instanceMetadata.id(), (key,
value) -> {
+ if (value == null)
+ {
+ value = new HashSet<>();
+ }
+ value.add(range);
+ return value;
+ });
Review Comment:
Agreed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]