dcapwell commented on code in PR #3416:
URL: https://github.com/apache/cassandra/pull/3416#discussion_r1681446182
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -358,6 +464,79 @@ public synchronized void startup()
state = State.STARTED;
}
+ private static boolean isSyncComplete(Ranges ranges)
+ {
+ for (Range range : ranges)
+ {
+ TokenRange tr = (TokenRange) range;
+ if (!tr.isFullRange())
+ return false;
+ }
+ return true;
+ }
+
+ private List<ClusterMetadata> discoverHistoric(Node node,
ClusterMetadataService cms, OptionalLong optMaxEpoch)
+ {
+ ClusterMetadata current = cms.metadata();
+ Topology topology = AccordTopology.createAccordTopology(current);
+ Ranges localRanges = topology.rangesForNode(node.id());
+ if (!localRanges.isEmpty()) // already joined, nothing to see here
+ return Collections.emptyList();
+
+ Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
+ for (KeyspaceMetadata keyspace : current.schema.getKeyspaces())
+ {
+ List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
+ if (tables.isEmpty())
+ continue;
+ DataPlacement placement =
current.placements.get(keyspace.params.replication);
+ DataPlacement whenSettled =
current.writePlacementAllSettled(keyspace);
+ Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(whenSettled.writes.byEndpoint().keySet(),
placement.writes.byEndpoint().keySet());
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ whenSettled.writes.forEach((range, group) -> {
+ if (group.endpoints().contains(self))
+ {
+ for (InetAddressAndPort peer : group.endpoints())
+ {
+ if (!alive.contains(peer)) continue;
+ for (TableMetadata table : tables)
+ peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
+ }
+ }
+ });
+ }
+ if (peers.isEmpty())
+ return Collections.emptyList();
+
+ Long minEpoch = findMinEpoch(MessagingService.instance(), peers,
optMaxEpoch);
+ if (minEpoch == null)
+ return Collections.emptyList();
+ List<ClusterMetadata> history = new
ArrayList<>(Math.toIntExact(current.epoch.getEpoch() - minEpoch));
+ for (long epoch = minEpoch; epoch < current.epoch.getEpoch(); epoch++)
+ history.add(cms.loadHistoricEpoch(Epoch.create(epoch)));
Review Comment:
> create a special verb for CMS nodes that would receive the epoch and
construct CMS for that epoch
in my testing this list can be large (but should hopefully shrink once we
start pouring), so something that says the min epoch would make the most sense
to me (rather than 40 calls, do 1)..
One thing I wonder, couldn't we just have the log pull the rest? Or is that
too annoying for things in the past (as it assumes to more forward)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]