dajac commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1871349386
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -165,4 +166,21 @@ class CoordinatorPartitionWriter( // Required offset. partitionResult.lastOffset + 1 } + + override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): Unit = { + var deleteResults: Map[TopicPartition, DeleteRecordsPartitionResult] = Map.empty + replicaManager.deleteRecords( + timeout = 0L, Review Comment: I have doubts about using timeout=0 here. My understanding is that the it will delete records from the local logs but it wont wait on the replication to acknowledges the deletion from the replicas. Hence, I suppose that it will always throw an error. I suppose that we don't really care about the replication in this case but it may spam the logs with unnecessary errors. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +249,60 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + // visibility for tests + void setupRecordPruning() { + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + runtime.activeTopicPartitions().forEach(tp -> { + performRecordPruning(tp); + }); + // perpetual recursion + setupRecordPruning(); + } + }); + } + + // visibility for tests + void performRecordPruning(TopicPartition tp) { + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup threw an error.", exception); + } + return; + } + result.ifPresent( + off -> { + // guard and optimization + if (off == Long.MAX_VALUE || off <= 0) { + log.warn("Last redundant offset value {} not suitable to make delete call for {}.", off, tp); + return; + } + + log.info("Pruning records in {} till offset {}.", tp, off); + try { + writer.deleteRecords(tp, off); + } catch (Exception e) { + log.error("Failed to delete records in {} till offset {}.", tp, off, e); Review Comment: I believe this will be printed all the time. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -219,6 +232,8 @@ private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value) snapshotUpdateCount.put(mapKey, 0); } } + + offsetsManager.updateState(mapKey, offset); Review Comment: My understanding is that you don't use a timeline data structure in the offset manager. The risk is that a snapshot maybe replayed and hence updates the offset here. However, at this point, we don't know whether the snapshot will be durably stored. Let's imagine the worst case, the write of the snapshot fails for whatever reason. In this case, the snapshot registry is reverted to undo all the uncommitted changes related to the write. Hence all the state will be reverted but the state in the offset manager. Now, another operation may come before the snapshot is retried so your offset would be completely wrong now. It seems to me that there is a risk of trimming the log at a wrong offset due to this. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig, def deleteRecords(timeout: Long, Review Comment: Let's add a unit test for this new flag in ReplicaManagerTest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org