ifesdjeen commented on code in PR #4244:
URL: https://github.com/apache/cassandra/pull/4244#discussion_r2201737063


##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -139,32 +149,49 @@ static void 
fetchAndReportWatermarksAsync(AccordConfigurationService configServi
 
                    Snapshot snapshot = m.payload;
                    long minEpoch = configService.minEpoch();
-                   for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
-                   {
-                       Ranges r = Ranges.of(e.getKey());
-                       configService.receiveClosed(r, e.getValue());
-                   }
-                   for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
-                   {
-                       Ranges r = Ranges.of(e.getKey());
-                       configService.receiveRetired(r, e.getValue());
-                   }
-                   for (Map.Entry<Integer, Long> e : 
snapshot.synced.entrySet())
+                   snapshot.retired.sort(sortByEpochThenRange);

Review Comment:
   This one will be sorted in forEachEpoch just below



##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -139,32 +149,49 @@ static void 
fetchAndReportWatermarksAsync(AccordConfigurationService configServi
 
                    Snapshot snapshot = m.payload;
                    long minEpoch = configService.minEpoch();
-                   for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
-                   {
-                       Ranges r = Ranges.of(e.getKey());
-                       configService.receiveClosed(r, e.getValue());
-                   }
-                   for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
-                   {
-                       Ranges r = Ranges.of(e.getKey());
-                       configService.receiveRetired(r, e.getValue());
-                   }
-                   for (Map.Entry<Integer, Long> e : 
snapshot.synced.entrySet())
+                   snapshot.retired.sort(sortByEpochThenRange);
+
+                   forEachEpoch(configService::receiveClosed, snapshot.closed);
+                   forEachEpoch(configService::receiveRetired, 
snapshot.retired);
+                   for (Map.Entry<Long, Long> e : snapshot.synced.entrySet())
                    {
-                       Node.Id node = new Node.Id(e.getKey());
+                       Node.Id node = new 
Node.Id(Ints.saturatedCast(e.getKey()));
                        for (long epoch = minEpoch; epoch <= e.getValue(); 
epoch++)
                            configService.receiveRemoteSyncComplete(node, 
epoch);
                    }
                });
     }
 
+    private static void forEachEpoch(BiConsumer<Ranges, Long> forEachEpoch, 
List<Map.Entry<Range, Long>> rangesAndEpochs)
+    {
+        if (rangesAndEpochs.isEmpty())
+            return;
+
+        rangesAndEpochs.sort(sortByEpochThenRange);
+        long collectingEpoch = rangesAndEpochs.get(0).getValue();
+        List<Range> ranges = new ArrayList<>();
+        for (Map.Entry<Range, Long> e : rangesAndEpochs)
+        {
+            Range range = e.getKey();
+            long epoch = e.getValue();
+            if (epoch != collectingEpoch)
+            {
+                forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), 
collectingEpoch);
+                collectingEpoch = epoch;
+                ranges.clear();
+            }

Review Comment:
   Discussed on Slack, should we do 
   
   ```
               if (epoch != collectingEpoch)
               {
                   forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), 
collectingEpoch);
                   collectingEpoch = epoch;
                   ranges.clear();
               }
               ranges.add(range);
   ```



##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -81,16 +89,15 @@ public class WatermarkCollector implements 
ConfigurationService.Listener
     @Override
     public void onRemoteSyncComplete(Node.Id node, long epoch)
     {
-        synced.compute(node.id, (k, prev) -> prev == null ? epoch : 
Long.max(prev, epoch));
+        synced.compute(node.id, (k, prev) -> prev == -1 ? epoch : 
Long.max(prev, epoch));

Review Comment:
   only realizing it now, but we may want to have a synchronized block around 
this one, or?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to