chia7712 commented on code in PR #19862: URL: https://github.com/apache/kafka/pull/19862#discussion_r2128155550
########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java: ########## @@ -51,32 +51,15 @@ public DelayedOperation(long delayMs) { * * 1. The operation has been verified to be completable inside tryComplete() * 2. The operation has expired and hence needs to be completed right now - * - * Return true iff the operation is completed by the caller: note that - * concurrent threads can try to complete the same operation, but only - * the first thread will succeed in completing the operation and return - * true, others will still return false. + * */ - public boolean forceComplete() { + public void forceComplete() { Review Comment: please update the docs of `tryComplete` ########## core/src/main/scala/kafka/server/DelayedFetch.scala: ########## @@ -121,35 +124,58 @@ class DelayedFetch( || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) { debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.") - return forceComplete() + forceComplete() + return true } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " + s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") - return forceComplete() + forceComplete() + return true } } } } catch { case _: NotLeaderOrFollowerException => // Case A or Case B debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately") - return forceComplete() + if (isCompleted) Review Comment: `forceComplete` leads to `onComplete`, which then reads from the log. Therefore, it is possible to throw an exception. ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java: ########## @@ -51,32 +51,15 @@ public DelayedOperation(long delayMs) { * * 1. The operation has been verified to be completable inside tryComplete() * 2. The operation has expired and hence needs to be completed right now - * - * Return true iff the operation is completed by the caller: note that - * concurrent threads can try to complete the same operation, but only - * the first thread will succeed in completing the operation and return - * true, others will still return false. + * */ - public boolean forceComplete() { + public void forceComplete() { // Do not proceed if the operation is already completed. - if (completed) { - return false; - } - // Attain lock prior completing the request. - lock.lock(); - try { - // Re-check, if the operation is already completed by some other thread. - if (!completed) { - completed = true; - // cancel the timeout timer - cancel(); - onComplete(); - return true; - } else { - return false; - } - } finally { - lock.unlock(); + if (!completed) { Review Comment: It seems to me the code gets complicated after removing the boolean value from `forceComplete`. This PR leads to following code style in each delayed operation. ``` forceComplete() return true ``` ``` if (isCompleted) return false else { forceComplete() return true } ``` If forceComplete returns a boolean indicating the operation is completed by the caller, the above code snippet could be reverted to a simpler version. ``` return forceComplete() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org