dcapwell commented on code in PR #3416:
URL: https://github.com/apache/cassandra/pull/3416#discussion_r1680111572
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -358,6 +464,79 @@ public synchronized void startup()
state = State.STARTED;
}
+ private static boolean isSyncComplete(Ranges ranges)
+ {
+ for (Range range : ranges)
+ {
+ TokenRange tr = (TokenRange) range;
+ if (!tr.isFullRange())
+ return false;
+ }
+ return true;
+ }
+
+ private List<ClusterMetadata> discoverHistoric(Node node,
ClusterMetadataService cms, OptionalLong optMaxEpoch)
+ {
+ ClusterMetadata current = cms.metadata();
+ Topology topology = AccordTopology.createAccordTopology(current);
+ Ranges localRanges = topology.rangesForNode(node.id());
+ if (!localRanges.isEmpty()) // already joined, nothing to see here
+ return Collections.emptyList();
+
+ Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
+ for (KeyspaceMetadata keyspace : current.schema.getKeyspaces())
+ {
+ List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
+ if (tables.isEmpty())
+ continue;
+ DataPlacement placement =
current.placements.get(keyspace.params.replication);
+ DataPlacement whenSettled =
current.writePlacementAllSettled(keyspace);
+ Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(whenSettled.writes.byEndpoint().keySet(),
placement.writes.byEndpoint().keySet());
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ whenSettled.writes.forEach((range, group) -> {
+ if (group.endpoints().contains(self))
+ {
+ for (InetAddressAndPort peer : group.endpoints())
+ {
+ if (!alive.contains(peer)) continue;
+ for (TableMetadata table : tables)
+ peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
+ }
+ }
+ });
+ }
+ if (peers.isEmpty())
+ return Collections.emptyList();
+
+ Long minEpoch = findMinEpoch(MessagingService.instance(), peers,
optMaxEpoch);
+ if (minEpoch == null)
+ return Collections.emptyList();
+ List<ClusterMetadata> history = new
ArrayList<>(Math.toIntExact(current.epoch.getEpoch() - minEpoch));
+ for (long epoch = minEpoch; epoch < current.epoch.getEpoch(); epoch++)
+ history.add(cms.loadHistoricEpoch(Epoch.create(epoch)));
Review Comment:
so, my thinking of putting it in `cms` was that `ClusterMetadataService`
figures out how to do it correctly... From Accord's point of view local log or
remote peer doesn't matter (other than the fact its slower)...
I do know that this edge case exists but had no clue how to use CMS well
enough to resolve... =D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]