ifesdjeen commented on code in PR #4244: URL: https://github.com/apache/cassandra/pull/4244#discussion_r2201737063
########## src/java/org/apache/cassandra/service/accord/WatermarkCollector.java: ########## @@ -139,32 +149,49 @@ static void fetchAndReportWatermarksAsync(AccordConfigurationService configServi Snapshot snapshot = m.payload; long minEpoch = configService.minEpoch(); - for (Map.Entry<Range, Long> e : snapshot.closed.entrySet()) - { - Ranges r = Ranges.of(e.getKey()); - configService.receiveClosed(r, e.getValue()); - } - for (Map.Entry<Range, Long> e : snapshot.retired.entrySet()) - { - Ranges r = Ranges.of(e.getKey()); - configService.receiveRetired(r, e.getValue()); - } - for (Map.Entry<Integer, Long> e : snapshot.synced.entrySet()) + snapshot.retired.sort(sortByEpochThenRange); Review Comment: This one will be sorted in forEachEpoch just below ########## src/java/org/apache/cassandra/service/accord/WatermarkCollector.java: ########## @@ -139,32 +149,49 @@ static void fetchAndReportWatermarksAsync(AccordConfigurationService configServi Snapshot snapshot = m.payload; long minEpoch = configService.minEpoch(); - for (Map.Entry<Range, Long> e : snapshot.closed.entrySet()) - { - Ranges r = Ranges.of(e.getKey()); - configService.receiveClosed(r, e.getValue()); - } - for (Map.Entry<Range, Long> e : snapshot.retired.entrySet()) - { - Ranges r = Ranges.of(e.getKey()); - configService.receiveRetired(r, e.getValue()); - } - for (Map.Entry<Integer, Long> e : snapshot.synced.entrySet()) + snapshot.retired.sort(sortByEpochThenRange); + + forEachEpoch(configService::receiveClosed, snapshot.closed); + forEachEpoch(configService::receiveRetired, snapshot.retired); + for (Map.Entry<Long, Long> e : snapshot.synced.entrySet()) { - Node.Id node = new Node.Id(e.getKey()); + Node.Id node = new Node.Id(Ints.saturatedCast(e.getKey())); for (long epoch = minEpoch; epoch <= e.getValue(); epoch++) configService.receiveRemoteSyncComplete(node, epoch); } }); } + private static void forEachEpoch(BiConsumer<Ranges, Long> forEachEpoch, List<Map.Entry<Range, Long>> rangesAndEpochs) + { + if (rangesAndEpochs.isEmpty()) + return; + + rangesAndEpochs.sort(sortByEpochThenRange); + long collectingEpoch = rangesAndEpochs.get(0).getValue(); + List<Range> ranges = new ArrayList<>(); + for (Map.Entry<Range, Long> e : rangesAndEpochs) + { + Range range = e.getKey(); + long epoch = e.getValue(); + if (epoch != collectingEpoch) + { + forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), collectingEpoch); + collectingEpoch = epoch; + ranges.clear(); + } Review Comment: Discussed on Slack, should we do ``` if (epoch != collectingEpoch) { forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), collectingEpoch); collectingEpoch = epoch; ranges.clear(); } ranges.add(range); ``` ########## src/java/org/apache/cassandra/service/accord/WatermarkCollector.java: ########## @@ -81,16 +89,15 @@ public class WatermarkCollector implements ConfigurationService.Listener @Override public void onRemoteSyncComplete(Node.Id node, long epoch) { - synced.compute(node.id, (k, prev) -> prev == null ? epoch : Long.max(prev, epoch)); + synced.compute(node.id, (k, prev) -> prev == -1 ? epoch : Long.max(prev, epoch)); Review Comment: only realizing it now, but we may want to have a synchronized block around this one, or? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org