aweisberg commented on code in PR #3842: URL: https://github.com/apache/cassandra/pull/3842#discussion_r1932793214
########## src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java: ########## @@ -407,7 +407,11 @@ void maybeReportMetadata(ClusterMetadata metadata) long epoch = metadata.epoch.getEpoch(); synchronized (epochs) { - if (epochs.maxEpoch() == 0) + long maxEpoch = epochs.maxEpoch(); + if (maxEpoch >= epoch) Review Comment: Does Accord knowing about the epoch guarantee that TCM has already loaded it? We don't want to skip the TCM loading step by not indirectly calling `fetchTopologyInternal` or something to ensure TCM loaded it. ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -410,48 +413,67 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) + if (configService.maxEpoch() >= metadata.epoch.getEpoch()) { - List<TableMetadata> tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView<InetAddressAndPort> alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); + logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); } + + Set<InetAddressAndPort> peers = new HashSet<>(); + peers.addAll(metadata.directory.allAddresses()); + peers.remove(FBUtilities.getBroadcastAddressAndPort()); + + // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return -1; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + + // Bootstrap, fetch min epoch + if (minEpoch == 0) + { + long fetched = findMinEpoch(SharedContext.Global.instance, peers); + // No other node has advanced epoch just yet + if (fetched == 0) + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); + + minEpoch = fetched; + } + + long maxEpoch = metadata.epoch.getEpoch(); + + // If we are behind minEpoch, catch up to at least minEpoch + if (metadata.epoch.getEpoch() < minEpoch) + { + minEpoch = metadata.epoch.getEpoch(); + maxEpoch = minEpoch; + } + + List<Future<Topology>> futures = new ArrayList<>(); + logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch); + + for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) + futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); + + FBUtilities.waitOnFutures(futures); + List<Topology> topologies = new ArrayList<>(futures.size()); + for (Future<Topology> future : futures) + topologies.add(future.get()); - Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers); - if (minEpoch == null) - return -1; - return minEpoch; + return topologies; } @VisibleForTesting - static Long findMinEpoch(SharedContext context, Map<InetAddressAndPort, Set<TokenRange>> peers) + static long findMinEpoch(SharedContext context, Set<InetAddressAndPort> peers) { try { - return FetchMinEpoch.fetch(context, peers).get(); + Long result = FetchMinEpoch.fetch(context, peers).get(); + if (result == null) + return 0L; + return result.longValue(); Review Comment: Won't this unbox automatically? ########## src/java/org/apache/cassandra/service/accord/FetchTopology.java: ########## @@ -123,18 +116,20 @@ public Response(long epoch, Topology topology) long epoch = message.payload.epoch; Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); if (topology == null) - MessagingService.instance().respond(Response.UNKNOWN, message); + MessagingService.instance().respond(Response.unkonwn(epoch), message); else MessagingService.instance().respond(new Response(epoch, topology), message); }; + private static final Logger logger = LoggerFactory.getLogger(FetchTopology.class); Review Comment: `logger` goes at the top? Also unused. ########## src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java: ########## @@ -145,21 +108,22 @@ public static Future<Long> fetch(SharedContext context, Map<InetAddressAndPort, } @VisibleForTesting - static Future<Long> fetch(SharedContext context, InetAddressAndPort to, Set<TokenRange> value) + static Future<Long> fetch(SharedContext context, InetAddressAndPort to) { - FetchMinEpoch req = new FetchMinEpoch(value); - return context.messaging().<FetchMinEpoch, FetchMinEpoch.Response>sendWithRetries(Backoff.NO_OP.INSTANCE, - MessageDelivery.ImmediateRetryScheduler.instance, - Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req, + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().minEpochSyncRetry); + return context.messaging().<FetchMinEpoch, FetchMinEpoch.Response>sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_MIN_EPOCH_REQ, + FetchMinEpoch.instance, Iterators.cycle(to), - RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value), + RetryPredicate.ALWAYS_RETRY, Review Comment: Does this mean that fetching the min epoch requires all nodes to be up to complete? Just looking at how this is accumulated by the caller of `fetch` which combines all the futures and can't complete until every future completes which means any down node would stop this from working? ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -377,24 +371,33 @@ public synchronized void startup() node.commandStores().restoreShardStateUnsafe(topology -> configService.reportTopology(topology, true, true)); configService.start(); - long minEpoch = fetchMinEpoch(); - if (minEpoch >= 0) + try { - for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch(); epoch++) - node.configService().fetchTopologyForEpoch(epoch); + // Fetch topologies up to current + List<Topology> topologies = fetchTopologies(0, metadata); Review Comment: Maybe make 0 a constant indicating that it is actually supposed to find the `minEpoch` to fetch? ########## src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java: ########## @@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata) @Override protected void fetchTopologyInternal(long epoch) { + if (ClusterMetadata.current().epoch.getEpoch() < epoch) + ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch)); + try { Set<InetAddressAndPort> peers = new HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints()); peers.remove(FBUtilities.getBroadcastAddressAndPort()); if (peers.isEmpty()) return; Topology topology; - while ((topology =FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {} + while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {} Review Comment: Why do we need to run `FetchTopology` here if we would indirectly get the topology from ` ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));` ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -410,48 +413,67 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) + if (configService.maxEpoch() >= metadata.epoch.getEpoch()) { - List<TableMetadata> tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView<InetAddressAndPort> alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); + logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); Review Comment: I don't know why this is true so I am just going to have to take it on faith. ########## src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java: ########## @@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata) @Override protected void fetchTopologyInternal(long epoch) { + if (ClusterMetadata.current().epoch.getEpoch() < epoch) Review Comment: Had to put this on the migration stage on my branch ########## test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropKeyspaceTest.java: ########## @@ -35,8 +34,7 @@ public void dropKeyspace() throws IOException int steps = 5; try (Cluster cluster = Cluster.build(3) .withoutVNodes() - .withConfig(c -> c.with(Feature.values()) Review Comment: When I merged I set it explicitly to `GOSSIP, NETWORK, NATIVE_PROTOCOL` but we should probably go back to `values` if the tests can run fine now that we know that `BLANK_GOSSIP` is ignored. ########## src/java/org/apache/cassandra/service/accord/FetchTopology.java: ########## @@ -123,18 +116,20 @@ public Response(long epoch, Topology topology) long epoch = message.payload.epoch; Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); if (topology == null) - MessagingService.instance().respond(Response.UNKNOWN, message); + MessagingService.instance().respond(Response.unkonwn(epoch), message); Review Comment: Is this for test code that can catch the actual exception? ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -410,48 +413,67 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) + if (configService.maxEpoch() >= metadata.epoch.getEpoch()) { - List<TableMetadata> tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView<InetAddressAndPort> alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); + logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); } + + Set<InetAddressAndPort> peers = new HashSet<>(); + peers.addAll(metadata.directory.allAddresses()); + peers.remove(FBUtilities.getBroadcastAddressAndPort()); + + // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return -1; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + + // Bootstrap, fetch min epoch + if (minEpoch == 0) Review Comment: OK, that makes 0 sneaky. I guess 0 is the empty epoch so we never need to fetch it? It might be better/clearer to use a boxed Long and null or a constant. ########## test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java: ########## @@ -271,21 +223,21 @@ private static MessageDelivery.FailedResponseException getFailedResponseExceptio } catch (ExecutionException e) { - if (e.getCause() instanceof MessageDelivery.FailedResponseException) + if (e.getCause() instanceof MessageDelivery.MaxRetriesException) { - exception = (MessageDelivery.FailedResponseException) e.getCause(); + maxRetries = (MessageDelivery.MaxRetriesException) e.getCause(); } else { throw e; } } - return exception; + return maxRetries; } - private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future<Long> f) throws InterruptedException, ExecutionException + private static MessageDelivery.FailedResponseException getFailedResponseException(Future<Long> f) throws InterruptedException, ExecutionException Review Comment: This method is unused ########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -410,48 +413,67 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) + if (configService.maxEpoch() >= metadata.epoch.getEpoch()) { - List<TableMetadata> tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView<InetAddressAndPort> alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); + logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); } + + Set<InetAddressAndPort> peers = new HashSet<>(); + peers.addAll(metadata.directory.allAddresses()); + peers.remove(FBUtilities.getBroadcastAddressAndPort()); + + // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return -1; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + + // Bootstrap, fetch min epoch + if (minEpoch == 0) + { + long fetched = findMinEpoch(SharedContext.Global.instance, peers); + // No other node has advanced epoch just yet + if (fetched == 0) + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); + + minEpoch = fetched; + } + + long maxEpoch = metadata.epoch.getEpoch(); + + // If we are behind minEpoch, catch up to at least minEpoch + if (metadata.epoch.getEpoch() < minEpoch) + { + minEpoch = metadata.epoch.getEpoch(); + maxEpoch = minEpoch; + } + + List<Future<Topology>> futures = new ArrayList<>(); + logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch); + + for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) + futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); Review Comment: This is going to be kind of a flood if you have to collect a lot of these. Maybe not an issue if it's just one node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org