chia7712 commented on code in PR #19759:
URL: https://github.com/apache/kafka/pull/19759#discussion_r2097557356


##########
server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java:
##########
@@ -68,24 +67,36 @@ public DelayedOperation(long delayMs, Lock lock) {
      * Return true iff the operation is completed by the caller: note that
      * concurrent threads can try to complete the same operation, but only
      * the first thread will succeed in completing the operation and return
-     * true, others will still return false
+     * true, others will still return false.
      */
     public boolean forceComplete() {
-        if (completed.compareAndSet(false, true)) {
-            // cancel the timeout timer
-            cancel();
-            onComplete();
-            return true;
-        } else {
+        // Do not proceed if the operation is already completed.
+        if (completed) {
             return false;
         }
+        // Attain lock prior completing the request.
+        lock.lock();

Review Comment:
   After this change, the lock must be reentrant, correct? If so, we should 
consider replacing the `Lock` type with `ReentrantLock` to prevent potential 
deadlocks.
   
   Additionally, `DelayedOperation(long delayMs, Optional<Lock> lockOpt)` seems 
to lose its meaning once the old group coordinator is removed. We might want to 
clean that up.
   
   These points can be addressed in a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to