smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1879319264


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +251,96 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>();
+        setupRecordPruning(offsets);
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning(Map<TopicPartition, Long> offsets) {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp, offsets)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning(offsets);
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp, 
Map<TopicPartition, Long> offsets) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+        runtime.scheduleWriteOperation(

Review Comment:
   No, the write operation used here is for the write consistency offered by 
the method.
   
   The `ShareCoordinatorShard.replay` calls `offsetsManager.updateState` with 
various last written offset values. The `replay` method itself is called when 
other write RPCs produce records. However, it does not mean the offset set in 
replay has been committed.
   
   Now, the coordinator enqueues the write operations in a queue and guarantees 
that when the `scheduleWriteOperation` completes, the records it generated have 
been replicated, even those which were written before it.
   
   This was extensively discussed with the coordinator framework owner @dajac 
and we arrived at this solution.
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to