smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1879347536


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +251,96 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>();
+        setupRecordPruning(offsets);
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning(Map<TopicPartition, Long> offsets) {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp, offsets)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning(offsets);
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp, 
Map<TopicPartition, Long> offsets) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.debug("Last redundant offset for tp {} lookup threw an 
error.", tp, exception);
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup for tp {} threw an 
error.", tp, exception);
+                    // Should not reschedule -> unknown exception.
+                    fut.completeExceptionally(exception);
+                    return;
+                }
+                // Should reschedule -> could be transient.
+                fut.complete(null);
+                return;
+            }
+            if (result.isPresent()) {
+                Long off = result.get();
+                // Guard and optimization.
+                if (off == Long.MAX_VALUE || off <= 0) {
+                    log.warn("Last redundant offset value {} not suitable to 
make delete call for {}.", off, tp);
+                    // Should reschedule -> next lookup could yield valid 
value.
+                    fut.complete(null);
+                    return;
+                }
+
+                if (offsets.containsKey(tp) && Objects.equals(offsets.get(tp), 
off)) {
+                    log.debug("{} already pruned at offset {}", tp, off);
+                    fut.complete(null);
+                    return;
+                }
+
+                log.info("Pruning records in {} till offset {}.", tp, off);
+
+                writer.deleteRecords(tp, off)
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            // Should not reschedule -> problems while 
deleting.
+                            log.debug("Exception while deleting records in {} 
till offset {}.", tp, off, exp);
+                            fut.completeExceptionally(exp);
+                            return;
+                        }
+                        // Should reschedule -> successful delete
+                        fut.complete(null);
+                        // Update offsets map as we do not want to
+                        // issue repeated deleted
+                        offsets.put(tp, off);

Review Comment:
   I don't foresee any consistency issues with that - at max a repeated delete 
call might be which is acceptable since the frequency of these calls is very 
low (in minutes).
   Whoever the leader is the off partition offsets will not change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to