junrao commented on code in PR #19862: URL: https://github.com/apache/kafka/pull/19862#discussion_r2117642371
########## server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java: ########## @@ -91,7 +91,8 @@ public boolean tryComplete() { } }); // check if every partition has satisfied at least one of case A or B - return deleteRecordsStatus.values().stream().noneMatch(DeleteRecordsPartitionStatus::acksPending) && forceComplete(); + forceComplete(); Review Comment: This change seems incorrect. We need to wait until there is no pending ack for all partitions before we can call forceComplete(), right? ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java: ########## @@ -51,32 +51,15 @@ public DelayedOperation(long delayMs) { * * 1. The operation has been verified to be completable inside tryComplete() * 2. The operation has expired and hence needs to be completed right now - * - * Return true iff the operation is completed by the caller: note that - * concurrent threads can try to complete the same operation, but only - * the first thread will succeed in completing the operation and return - * true, others will still return false. + * */ - public boolean forceComplete() { + public void forceComplete() { // Do not proceed if the operation is already completed. - if (completed) { - return false; - } - // Attain lock prior completing the request. - lock.lock(); - try { - // Re-check, if the operation is already completed by some other thread. - if (!completed) { - completed = true; - // cancel the timeout timer - cancel(); - onComplete(); - return true; - } else { - return false; - } - } finally { - lock.unlock(); + if (!completed) { Review Comment: Both callers already check completed. So, this seems redundant? ########## core/src/main/scala/kafka/server/DelayedFetch.scala: ########## @@ -121,35 +124,58 @@ class DelayedFetch( || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) { debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.") - return forceComplete() + forceComplete() + return true } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " + s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") - return forceComplete() + forceComplete() + return true } } } } catch { case _: NotLeaderOrFollowerException => // Case A or Case B debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately") - return forceComplete() + if (isCompleted) Review Comment: Could isCompleted be true if we get here? It seems that forceComplete() will not throw an exception. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -379,7 +380,12 @@ public boolean tryComplete() { partitionsAcquired.clear(); localPartitionsAlreadyFetched.clear(); } - return forceComplete(); + if (isCompleted()) { + return false; + } else { + forceComplete(); + return true; + } Review Comment: Where is the exception coming from? Can forceComplete() throw an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org