smjn commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1879319264
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +251,96 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>(); + setupRecordPruning(offsets); log.info("Startup complete."); } + private void setupRecordPruning(Map<TopicPartition, Long> offsets) { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp, offsets))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(offsets); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp, Map<TopicPartition, Long> offsets) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + runtime.scheduleWriteOperation( Review Comment: No, the write operation used here is for the write consistency offered by the method. The `ShareCoordinatorShard.replay` calls `offsetsManager.updateState` with various last written offset values. The `replay` method itself is called when other write RPCs produce records. However, it does not mean the offset set in replay has been committed. Now, the coordinator enqueues the write operations in a queue and guarantees that when the `scheduleWriteOperation` completes, the records it generated have been replicated, even those which were written before it. The framework however, gives no consistency guarantees between write and read operations. Consider, a write op writing an offset into the offset manager. We only know that this offset is written but not replicated. A subsequent read could give us back the same offset but still there is no guarantee that this offset has been replicated. It is only when the next write operation completes, do we have a guarantee that the previous offset has been committed. This was extensively discussed with the coordinator framework owner @dajac and we arrived at this solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org