aweisberg commented on code in PR #3842:
URL: https://github.com/apache/cassandra/pull/3842#discussion_r1932793214


##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -407,7 +407,11 @@ void maybeReportMetadata(ClusterMetadata metadata)
         long epoch = metadata.epoch.getEpoch();
         synchronized (epochs)
         {
-            if (epochs.maxEpoch() == 0)
+            long maxEpoch = epochs.maxEpoch();
+            if (maxEpoch >= epoch)

Review Comment:
   Does Accord knowing about the epoch guarantee that TCM has already loaded 
it? We don't want to skip the TCM loading step by not indirectly calling 
`fetchTopologyInternal` or something to ensure TCM loaded it.



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+        if (configService.maxEpoch() >= metadata.epoch.getEpoch())
         {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
+            logger.info("Accord epoch {} matches TCM. All topologies are known 
locally", metadata.epoch);
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
         }
+
+        Set<InetAddressAndPort> peers = new HashSet<>();
+        peers.addAll(metadata.directory.allAddresses());
+        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        // No peers: single node cluster or first node to boot
         if (peers.isEmpty())
-            return -1;
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+        // Bootstrap, fetch min epoch
+        if (minEpoch == 0)
+        {
+            long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+            // No other node has advanced epoch just yet
+            if (fetched == 0)
+                return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+            minEpoch = fetched;
+        }
+
+        long maxEpoch = metadata.epoch.getEpoch();
+
+        // If we are behind minEpoch, catch up to at least minEpoch
+        if (metadata.epoch.getEpoch() < minEpoch)
+        {
+            minEpoch = metadata.epoch.getEpoch();
+            maxEpoch = minEpoch;
+        }
+
+        List<Future<Topology>> futures = new ArrayList<>();
+        logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up 
to {}.", minEpoch, maxEpoch);
+
+        for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+            futures.add(FetchTopology.fetch(SharedContext.Global.instance, 
peers, epoch));
+
+        FBUtilities.waitOnFutures(futures);
+        List<Topology> topologies = new ArrayList<>(futures.size());
+        for (Future<Topology> future : futures)
+            topologies.add(future.get());
 
-        Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers);
-        if (minEpoch == null)
-            return -1;
-        return minEpoch;
+        return topologies;
     }
 
     @VisibleForTesting
-    static Long findMinEpoch(SharedContext context, Map<InetAddressAndPort, 
Set<TokenRange>> peers)
+    static long findMinEpoch(SharedContext context, Set<InetAddressAndPort> 
peers)
     {
         try
         {
-            return FetchMinEpoch.fetch(context, peers).get();
+            Long result = FetchMinEpoch.fetch(context, peers).get();
+            if (result == null)
+                return 0L;
+            return result.longValue();

Review Comment:
   Won't this unbox automatically?



##########
src/java/org/apache/cassandra/service/accord/FetchTopology.java:
##########
@@ -123,18 +116,20 @@ public Response(long epoch, Topology topology)
         long epoch = message.payload.epoch;
         Topology topology = 
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
         if (topology == null)
-            MessagingService.instance().respond(Response.UNKNOWN, message);
+            MessagingService.instance().respond(Response.unkonwn(epoch), 
message);
         else
             MessagingService.instance().respond(new Response(epoch, topology), 
message);
     };
 
+    private static final Logger logger = 
LoggerFactory.getLogger(FetchTopology.class);

Review Comment:
   `logger` goes at the top? Also unused.



##########
src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java:
##########
@@ -145,21 +108,22 @@ public static Future<Long> fetch(SharedContext context, 
Map<InetAddressAndPort,
     }
 
     @VisibleForTesting
-    static Future<Long> fetch(SharedContext context, InetAddressAndPort to, 
Set<TokenRange> value)
+    static Future<Long> fetch(SharedContext context, InetAddressAndPort to)
     {
-        FetchMinEpoch req = new FetchMinEpoch(value);
-        return context.messaging().<FetchMinEpoch, 
FetchMinEpoch.Response>sendWithRetries(Backoff.NO_OP.INSTANCE,
-                                                                               
           MessageDelivery.ImmediateRetryScheduler.instance,
-                                                                               
           Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req,
+        Backoff backoff = Backoff.fromConfig(context, 
DatabaseDescriptor.getAccord().minEpochSyncRetry);
+        return context.messaging().<FetchMinEpoch, 
FetchMinEpoch.Response>sendWithRetries(backoff,
+                                                                               
           context.optionalTasks()::schedule,
+                                                                               
           Verb.ACCORD_FETCH_MIN_EPOCH_REQ,
+                                                                               
           FetchMinEpoch.instance,
                                                                                
           Iterators.cycle(to),
-                                                                               
           
RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value),
+                                                                               
           RetryPredicate.ALWAYS_RETRY,

Review Comment:
   Does this mean that fetching the min epoch requires all nodes to be up to 
complete? Just looking at how this is accumulated by the caller of `fetch` 
which combines all the futures and can't complete until every future completes 
which means any down node would stop this from working?



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -377,24 +371,33 @@ public synchronized void startup()
         node.commandStores().restoreShardStateUnsafe(topology -> 
configService.reportTopology(topology, true, true));
         configService.start();
 
-        long minEpoch = fetchMinEpoch();
-        if (minEpoch >= 0)
+        try
         {
-            for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch(); 
epoch++)
-                node.configService().fetchTopologyForEpoch(epoch);
+            // Fetch topologies up to current
+            List<Topology> topologies = fetchTopologies(0, metadata);

Review Comment:
   Maybe make 0 a constant indicating that it is actually supposed to find the 
`minEpoch` to fetch?



##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata)
     @Override
     protected void fetchTopologyInternal(long epoch)
     {
+        if (ClusterMetadata.current().epoch.getEpoch() < epoch)
+            
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));
+
         try
         {
             Set<InetAddressAndPort> peers = new 
HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints());
             peers.remove(FBUtilities.getBroadcastAddressAndPort());
             if (peers.isEmpty())
                 return;
             Topology topology;
-            while ((topology 
=FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == 
null) {}
+            while ((topology = 
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == 
null) {}

Review Comment:
   Why do we need to run `FetchTopology` here if we would indirectly get the 
topology from `          
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));`



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+        if (configService.maxEpoch() >= metadata.epoch.getEpoch())
         {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
+            logger.info("Accord epoch {} matches TCM. All topologies are known 
locally", metadata.epoch);

Review Comment:
   I don't know why this is true so I am just going to have to take it on faith.



##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata)
     @Override
     protected void fetchTopologyInternal(long epoch)
     {
+        if (ClusterMetadata.current().epoch.getEpoch() < epoch)

Review Comment:
   Had to put this on the migration stage on my branch



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropKeyspaceTest.java:
##########
@@ -35,8 +34,7 @@ public void dropKeyspace() throws IOException
         int steps = 5;
         try (Cluster cluster = Cluster.build(3)
                                       .withoutVNodes()
-                                      .withConfig(c -> c.with(Feature.values())

Review Comment:
   When I merged I set it explicitly to `GOSSIP, NETWORK, NATIVE_PROTOCOL` but 
we should probably go back to `values` if the tests can run fine now that we 
know that `BLANK_GOSSIP` is ignored.



##########
src/java/org/apache/cassandra/service/accord/FetchTopology.java:
##########
@@ -123,18 +116,20 @@ public Response(long epoch, Topology topology)
         long epoch = message.payload.epoch;
         Topology topology = 
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
         if (topology == null)
-            MessagingService.instance().respond(Response.UNKNOWN, message);
+            MessagingService.instance().respond(Response.unkonwn(epoch), 
message);

Review Comment:
   Is this for test code that can catch the actual exception?



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+        if (configService.maxEpoch() >= metadata.epoch.getEpoch())
         {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
+            logger.info("Accord epoch {} matches TCM. All topologies are known 
locally", metadata.epoch);
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
         }
+
+        Set<InetAddressAndPort> peers = new HashSet<>();
+        peers.addAll(metadata.directory.allAddresses());
+        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        // No peers: single node cluster or first node to boot
         if (peers.isEmpty())
-            return -1;
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+        // Bootstrap, fetch min epoch
+        if (minEpoch == 0)

Review Comment:
   OK, that makes 0 sneaky. I guess 0 is the empty epoch so we never need to 
fetch it? It might be better/clearer to use a boxed Long and null or a constant.



##########
test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java:
##########
@@ -271,21 +223,21 @@ private static MessageDelivery.FailedResponseException 
getFailedResponseExceptio
         }
         catch (ExecutionException e)
         {
-            if (e.getCause() instanceof 
MessageDelivery.FailedResponseException)
+            if (e.getCause() instanceof MessageDelivery.MaxRetriesException)
             {
-                exception = (MessageDelivery.FailedResponseException) 
e.getCause();
+                maxRetries = (MessageDelivery.MaxRetriesException) 
e.getCause();
             }
             else
             {
                 throw e;
             }
         }
-        return exception;
+        return maxRetries;
     }
 
-    private static MessageDelivery.MaxRetriesException 
getMaxRetriesException(Future<Long> f) throws InterruptedException, 
ExecutionException
+    private static MessageDelivery.FailedResponseException 
getFailedResponseException(Future<Long> f) throws InterruptedException, 
ExecutionException

Review Comment:
   This method is unused



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+        if (configService.maxEpoch() >= metadata.epoch.getEpoch())
         {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
+            logger.info("Accord epoch {} matches TCM. All topologies are known 
locally", metadata.epoch);
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
         }
+
+        Set<InetAddressAndPort> peers = new HashSet<>();
+        peers.addAll(metadata.directory.allAddresses());
+        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        // No peers: single node cluster or first node to boot
         if (peers.isEmpty())
-            return -1;
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+        // Bootstrap, fetch min epoch
+        if (minEpoch == 0)
+        {
+            long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+            // No other node has advanced epoch just yet
+            if (fetched == 0)
+                return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+            minEpoch = fetched;
+        }
+
+        long maxEpoch = metadata.epoch.getEpoch();
+
+        // If we are behind minEpoch, catch up to at least minEpoch
+        if (metadata.epoch.getEpoch() < minEpoch)
+        {
+            minEpoch = metadata.epoch.getEpoch();
+            maxEpoch = minEpoch;
+        }
+
+        List<Future<Topology>> futures = new ArrayList<>();
+        logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up 
to {}.", minEpoch, maxEpoch);
+
+        for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+            futures.add(FetchTopology.fetch(SharedContext.Global.instance, 
peers, epoch));

Review Comment:
   This is going to be kind of a flood if you have to collect a lot of these. 
Maybe not an issue if it's just one node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to