dcapwell commented on code in PR #3842:
URL: https://github.com/apache/cassandra/pull/3842#discussion_r1934243353


##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+        if (configService.maxEpoch() >= metadata.epoch.getEpoch())
         {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
+            logger.info("Accord epoch {} matches TCM. All topologies are known 
locally", metadata.epoch);
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
         }
+
+        Set<InetAddressAndPort> peers = new HashSet<>();
+        peers.addAll(metadata.directory.allAddresses());
+        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        // No peers: single node cluster or first node to boot
         if (peers.isEmpty())
-            return -1;
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+        // Bootstrap, fetch min epoch
+        if (minEpoch == 0)
+        {
+            long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+            // No other node has advanced epoch just yet
+            if (fetched == 0)
+                return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+            minEpoch = fetched;
+        }
+
+        long maxEpoch = metadata.epoch.getEpoch();
+
+        // If we are behind minEpoch, catch up to at least minEpoch
+        if (metadata.epoch.getEpoch() < minEpoch)
+        {
+            minEpoch = metadata.epoch.getEpoch();
+            maxEpoch = minEpoch;
+        }
+
+        List<Future<Topology>> futures = new ArrayList<>();
+        logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up 
to {}.", minEpoch, maxEpoch);
+
+        for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+            futures.add(FetchTopology.fetch(SharedContext.Global.instance, 
peers, epoch));

Review Comment:
   is there a reason to not make this a batch?  if you know `N` you know `N - 
1`, so rather than do many concurrent fetches we could just ask for the full 
batch in 1 request?
   
   thinking about the current bootstrap case, you can startup with 50 epochs 
just in benchmarks, so we have 50 concurrent fetches, and they will go to the 
same node at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to