AndrewJSchofield commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1881037714


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -194,12 +202,17 @@ public ShareCoordinatorService(
         ShareCoordinatorConfig config,
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime,
         ShareCoordinatorMetrics shareCoordinatorMetrics,
-        Time time) {
+        Time time,
+        Timer timer,
+        PartitionWriter writer) {

Review Comment:
   nit: New line please so the arguments and the method body do not run into 
each other.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +253,82 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share state topic prune job.");

Review Comment:
   `share-group state topic` is what we use in most places.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +253,82 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning();
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.debug("Last redundant offset for tp {} lookup threw an 
error.", tp, exception);
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup for tp {} threw an 
error.", tp, exception);
+                    fut.completeExceptionally(exception);
+                    return;
+                }
+                fut.complete(null);
+                return;
+            }
+            if (result.isPresent()) {
+                Long off = result.get();
+
+                if (lastPrunedOffsets.containsKey(tp) && 
Objects.equals(lastPrunedOffsets.get(tp), off)) {

Review Comment:
   I suggest `lastPrunedOffsets.get(tp).longValue() == off`. Using the generic 
object equality method seems odd for just a pair of longs.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +253,82 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune.", exp);

Review Comment:
   `share-group state topic`



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +253,82 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning();
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.debug("Last redundant offset for tp {} lookup threw an 
error.", tp, exception);
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup for tp {} threw an 
error.", tp, exception);
+                    fut.completeExceptionally(exception);
+                    return;
+                }
+                fut.complete(null);
+                return;
+            }
+            if (result.isPresent()) {
+                Long off = result.get();
+
+                if (lastPrunedOffsets.containsKey(tp) && 
Objects.equals(lastPrunedOffsets.get(tp), off)) {
+                    log.debug("{} already pruned at offset {}", tp, off);

Review Comment:
   You've used `till` in most places, so I'd replace the `at` here too.



##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java:
##########
@@ -50,6 +50,7 @@ private static Map<String, String> testConfigMapRaw() {
         configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555");
         configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, 
String.valueOf(CompressionType.NONE.id));
+        
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, 
"30000");  // 30 seconds

Review Comment:
   This class doesn't contain any tests, so calling it XYZTest is peculiar. 
Please rename to `ShareCoordinatorTestUtils` or similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to