junrao commented on code in PR #17437:
URL: https://github.com/apache/kafka/pull/17437#discussion_r1795780640
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -341,7 +329,7 @@ public CompletableFuture<Map<TopicIdPartition,
ShareAcknowledgeResponseData.Part
void
addPurgatoryCheckAndCompleteDelayedActionToActionQueue(Set<TopicIdPartition>
topicIdPartitions, String groupId) {
delayedActionsQueue.add(() -> {
topicIdPartitions.forEach(topicIdPartition ->
- delayedShareFetchPurgatory.checkAndComplete(
+ replicaManager.completeDelayedShareFetchRequest(
Review Comment:
Could we just reuse the delayedActionsQueue in ReplicaManager instead of
keeping another reference in this class?
##########
core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java:
##########
@@ -18,19 +18,21 @@
import kafka.server.DelayedOperationKey;
-import org.apache.kafka.common.Uuid;
-
import java.util.Objects;
/**
- * A key for delayed share fetch purgatory that refers to the topic partition.
+ * A key for delayed share fetch purgatory that refers to the topic partition.
Since the below replicaManager functionalities
+ * use TopicPartition and not TopicIdPartition, hence we are using the same
here.
Review Comment:
Agree that using topicId is probably more accurate since we could reset the
state in a share partition when a topic is recreated.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -950,6 +988,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+ delayedShareFetchPurgatory.checkAndComplete(new
DelayedShareFetchPartitionKey(topicPartition.topic(),
topicPartition.partition()))
Review Comment:
There are also paths in `Partition` where HWM could be increased. We need to
add the `shareFetchPurgatory` there too.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -3029,6 +3069,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Started fetchers as part of become-follower for
${partitionsToStartFetching.size} partitions")
partitionsToStartFetching.keySet.foreach(completeDelayedFetchOrProduceRequests)
+
partitionsToStartFetching.keySet.foreach(completeDelayedShareFetchRequest)
Review Comment:
1. It seems that all existing callers to
`completeDelayedFetchOrProduceRequests` need to also check the share purgatory.
Perhaps we could just add the `completeDelayedShareFetchRequest` call inside
`completeDelayedFetchOrProduceRequests`.
2. In the case when a partition is deleted or becomes a follower, we
probably also want to remove the sharePartition from `SharePartitionManager` to
free up the space.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]