AndrewJSchofield commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1881037714
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -194,12 +202,17 @@ public ShareCoordinatorService( ShareCoordinatorConfig config, CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime, ShareCoordinatorMetrics shareCoordinatorMetrics, - Time time) { + Time time, + Timer timer, + PartitionWriter writer) { Review Comment: nit: New line please so the arguments and the method body do not run into each other. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +253,82 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share state topic prune job."); Review Comment: `share-group state topic` is what we use in most places. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +253,82 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + fut.completeExceptionally(exception); + return; + } + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + + if (lastPrunedOffsets.containsKey(tp) && Objects.equals(lastPrunedOffsets.get(tp), off)) { Review Comment: I suggest `lastPrunedOffsets.get(tp).longValue() == off`. Using the generic object equality method seems odd for just a pair of longs. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +253,82 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); Review Comment: `share-group state topic` ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +253,82 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + fut.completeExceptionally(exception); + return; + } + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + + if (lastPrunedOffsets.containsKey(tp) && Objects.equals(lastPrunedOffsets.get(tp), off)) { + log.debug("{} already pruned at offset {}", tp, off); Review Comment: You've used `till` in most places, so I'd replace the `at` here too. ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java: ########## @@ -50,6 +50,7 @@ private static Map<String, String> testConfigMapRaw() { configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555"); configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10"); configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id)); + configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, "30000"); // 30 seconds Review Comment: This class doesn't contain any tests, so calling it XYZTest is peculiar. Please rename to `ShareCoordinatorTestUtils` or similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org