jeffkbkim commented on code in PR #15430:
URL: https://github.com/apache/kafka/pull/15430#discussion_r1503398332


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java:
##########
@@ -137,31 +136,43 @@ public void add(T event) throws 
RejectedExecutionException {
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method block 
indefinitely until
-     * one event is ready or the accumulator is closed.
+     * Returns the next {{@link Event}} available or null if no event is
+     * available.
      *
-     * @return The next event.
+     * @return The next event available or null.
      */
     public T poll() {
-        return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+        lock.lock();
+        try {
+            K key = randomKey();
+            if (key == null) return null;
+
+            Queue<T> queue = queues.get(key);
+            T event = queue.poll();
+
+            if (queue.isEmpty()) queues.remove(key);
+            inflightKeys.add(key);
+            size--;
+
+            return event;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method blocks for the 
provided
-     * time and returns null of not event is available.
+     * Returns the next {{@link Event}} available. This method blocks until an
+     * event is available or the thread is interrupted.
      *
-     * @param timeout   The timeout.
-     * @param unit      The timeout unit.
      * @return The next event available or null.
      */
-    public T poll(long timeout, TimeUnit unit) {
+    public T take() {
         lock.lock();
         try {
             K key = randomKey();
-            long nanos = unit.toNanos(timeout);
-            while (key == null && !closed && nanos > 0) {
+            while (key == null && !closed) {
                 try {
-                    nanos = condition.awaitNanos(nanos);

Review Comment:
   it seems to me that the purpose of this PR is to remove this. how much worse 
is using awaitNanos compared to await? i can imagine a subtle impact but i 
guess i'd like to know your expectation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to