junrao commented on code in PR #17177:
URL: https://github.com/apache/kafka/pull/17177#discussion_r1759249757
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -50,21 +52,32 @@ public class DelayedShareFetch extends DelayedOperation {
private final ReplicaManager replicaManager;
private final Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+ private final DelayedActionQueue delayedActionQueue;
+ private final DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory;
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
DelayedShareFetch(
SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData,
ReplicaManager replicaManager,
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap) {
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap,
+ DelayedActionQueue delayedActionQueue,
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory) {
super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
+ this.delayedActionQueue = delayedActionQueue;
+ this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
Review Comment:
Instead of creating a separate DelayedActionQueue, it would be useful to
share with the one already in ReplicaManager.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -50,21 +52,32 @@ public class DelayedShareFetch extends DelayedOperation {
private final ReplicaManager replicaManager;
private final Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+ private final DelayedActionQueue delayedActionQueue;
+ private final DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory;
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
DelayedShareFetch(
SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData,
ReplicaManager replicaManager,
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap) {
+ Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap,
+ DelayedActionQueue delayedActionQueue,
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory) {
super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
+ this.delayedActionQueue = delayedActionQueue;
+ this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
}
+ /**
+ * Complete the delayed share fetch actions that were added to the queue.
Since onExpiration serves as a callback for
+ * forceComplete, it should not lead to infinite call stack.
+ */
@Override
public void onExpiration() {
+ delayedActionQueue.tryCompleteActions();
Review Comment:
I am not sure why we need this. We only need to call tryCompleteActions()
after a new item has been added to the queue. In KafkaApis, after processing
each request, an item could be added to the queue. So we need to call
tryCompleteActions() there (we are already doing that for the existing action
queue).
In https://github.com/apache/kafka/pull/8657, we also call
https://github.com/apache/kafka/pull/8657 in DelayedJoin.onExpiration(). This
is because this method calls `coordinator.onExpireJoin()`, which could call
ReplicaManager.appendRecords() and thus add an item to the queue. Here, since
there is no additional logic to add an item to the queue in onExpiration(),
there is no need to call tryCompleteActions() here.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -524,6 +533,7 @@ private void addDelayedShareFetch(DelayedShareFetch
delayedShareFetch, Set<Objec
@Override
public void close() throws Exception {
+ this.delayedActionsQueue.tryCompleteActions();
Review Comment:
This seems unnecessary since we don't do that when closing the
ReplicaManager.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]