AndrewJSchofield commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1867895458


##########
server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java:
##########
@@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
     public static final int APPEND_LINGER_MS_DEFAULT = 10;
     public static final String APPEND_LINGER_MS_DOC = "The duration in 
milliseconds that the share coordinator will wait for writes to accumulate 
before flushing them to disk.";
 
+    public static final String STATE_TOPIC_PRUNE_INTERVAL = 
"share.coordinator.state.topic.prune.interval.ms";

Review Comment:
   This should be `STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG` I think. All three of 
these should have `_MS_` as part of the name.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java:
##########
@@ -107,4 +107,16 @@ CompletableFuture<VerificationGuard> 
maybeStartTransactionVerification(
         short producerEpoch,
         short apiVersion
     ) throws KafkaException;
+
+    /**
+     * Delete records from a topic partition until specified offset
+     * @param tp                The partition to delete records from
+     * @param deleteUntilOffset Offset to delete until, starting from the 
beginning

Review Comment:
   I suggest `deleteBeforeOffset` rather than `deleteUntilOffset`. The latter 
is clearly non-inclusive, while the latter is a bit more ambiguous. I think the 
effect we want here is that if I provide offset 10, then offsets up to and 
including 9 may be deleted, but not 10.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,43 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                for (int i = 0; i < numPartitions; i++) {

Review Comment:
   Would It not be the case that each shard should set up the pruning for its 
owned partitions? Shouldn't the pruning stop when a shard loses leadership of a 
partition?



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java:
##########
@@ -107,4 +107,16 @@ CompletableFuture<VerificationGuard> 
maybeStartTransactionVerification(
         short producerEpoch,
         short apiVersion
     ) throws KafkaException;
+
+    /**
+     * Delete records from a topic partition until specified offset
+     * @param tp                The partition to delete records from
+     * @param deleteUntilOffset Offset to delete until, starting from the 
beginning
+     * @throws KafkaException   Any KafkaException caught during the operation.
+     */
+    void deleteRecords(
+        TopicPartition tp,
+        long deleteUntilOffset,
+        boolean allowInternalTopicDeletion

Review Comment:
   This parameter is missing from the javadoc.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
    * Delete records on leader replicas of the partition, and wait for delete 
records operation be propagated to other replicas;
    * the callback function will be triggered either when timeout or 
logStartOffset of all live replicas have reached the specified offset
    */
-  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, 
Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, 
Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, 
LogDeleteRecordsResult] = {
     trace("Delete records on local logs to offsets 
[%s]".format(offsetPerPartition))
     offsetPerPartition.map { case (topicPartition, requestedOffset) =>
-      // reject delete records operation on internal topics
-      if (Topic.isInternal(topicPartition.topic)) {
+      // reject delete records operation for internal topics if 
allowInternalTopicDeletion is false

Review Comment:
   probably "unless allowInternalTopicDeletion is true" is clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to