AndrewJSchofield commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1867895458
########## server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java: ########## @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig { public static final int APPEND_LINGER_MS_DEFAULT = 10; public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk."; + public static final String STATE_TOPIC_PRUNE_INTERVAL = "share.coordinator.state.topic.prune.interval.ms"; Review Comment: This should be `STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG` I think. All three of these should have `_MS_` as part of the name. ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java: ########## @@ -107,4 +107,16 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification( short producerEpoch, short apiVersion ) throws KafkaException; + + /** + * Delete records from a topic partition until specified offset + * @param tp The partition to delete records from + * @param deleteUntilOffset Offset to delete until, starting from the beginning Review Comment: I suggest `deleteBeforeOffset` rather than `deleteUntilOffset`. The latter is clearly non-inclusive, while the latter is a bit more ambiguous. I think the effect we want here is that if I provide offset 10, then offsets up to and including 9 may be deleted, but not 10. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +249,43 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + for (int i = 0; i < numPartitions; i++) { Review Comment: Would It not be the case that each shard should set up the pruning for its owned partitions? Shouldn't the pruning stop when a shard loses leadership of a partition? ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java: ########## @@ -107,4 +107,16 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification( short producerEpoch, short apiVersion ) throws KafkaException; + + /** + * Delete records from a topic partition until specified offset + * @param tp The partition to delete records from + * @param deleteUntilOffset Offset to delete until, starting from the beginning + * @throws KafkaException Any KafkaException caught during the operation. + */ + void deleteRecords( + TopicPartition tp, + long deleteUntilOffset, + boolean allowInternalTopicDeletion Review Comment: This parameter is missing from the javadoc. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig, * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset */ - private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = { + private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = { trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition)) offsetPerPartition.map { case (topicPartition, requestedOffset) => - // reject delete records operation on internal topics - if (Topic.isInternal(topicPartition.topic)) { + // reject delete records operation for internal topics if allowInternalTopicDeletion is false Review Comment: probably "unless allowInternalTopicDeletion is true" is clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org