smjn commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1879347536
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +251,96 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>(); + setupRecordPruning(offsets); log.info("Startup complete."); } + private void setupRecordPruning(Map<TopicPartition, Long> offsets) { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp, offsets))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(offsets); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp, Map<TopicPartition, Long> offsets) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + // Should not reschedule -> unknown exception. + fut.completeExceptionally(exception); + return; + } + // Should reschedule -> could be transient. + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + // Guard and optimization. + if (off == Long.MAX_VALUE || off <= 0) { + log.warn("Last redundant offset value {} not suitable to make delete call for {}.", off, tp); + // Should reschedule -> next lookup could yield valid value. + fut.complete(null); + return; + } + + if (offsets.containsKey(tp) && Objects.equals(offsets.get(tp), off)) { + log.debug("{} already pruned at offset {}", tp, off); + fut.complete(null); + return; + } + + log.info("Pruning records in {} till offset {}.", tp, off); + + writer.deleteRecords(tp, off) + .whenComplete((res, exp) -> { + if (exp != null) { + // Should not reschedule -> problems while deleting. + log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp); + fut.completeExceptionally(exp); + return; + } + // Should reschedule -> successful delete + fut.complete(null); + // Update offsets map as we do not want to + // issue repeated deleted + offsets.put(tp, off); Review Comment: I don't foresee any consistency issues with that - at max a repeated delete call might be made which is acceptable since the frequency of these calls is very low (in minutes). Whoever the leader, the partition offsets will not change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org