kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2482655079


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
         }
     }
 
+    /**
+     * {@code checkInflightPoll()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has completed. If it has, it will be
+     * processed accordingly.
+     */
+    @SuppressWarnings({"CyclomaticComplexity"})
+    public void checkInflightPoll(Timer timer, boolean firstPass) {
+        // Handle the case where there's an inflight poll from the *previous* 
invocation of AsyncKafkaConsumer.poll().
+        if (firstPass && inflightPoll != null) {
+            if (inflightPoll.isComplete()) {
+                Optional<KafkaException> errorOpt = inflightPoll.error();
+
+                if (errorOpt.isPresent()) {
+                    // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                    // an error, throw it without delay.
+                    log.trace("Previous inflight event {} completed with an 
error, clearing", inflightPoll);
+                    inflightPoll = null;
+                    throw errorOpt.get();
+                } else {
+                    if (fetchBuffer.isEmpty()) {
+                        // If it completed without error, but without 
populating the fetch buffer, clear the event
+                        // so that a new event will be enqueued below.
+                        log.trace("Previous inflight event {} completed 
without filling the buffer, clearing", inflightPoll);
+                        inflightPoll = null;
+                    } else {
+                        // However, if the event completed, and it populated 
the buffer, *don't* create a new event.
+                        // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                        // If a new event was created on *every* poll, each 
time the event would have to complete the
+                        // validate positions stage before the data in the 
fetch buffer is used. Because there is
+                        // no blocking, and effectively a 0 wait, the data in 
the fetch buffer is continuously ignored
+                        // leading to no data ever being returned from poll().
+                        log.trace("Previous inflight event {} completed and 
filled the buffer, not clearing", inflightPoll);
+                    }
+                }
+            } else if (time.milliseconds() >= inflightPoll.deadlineMs() && 
inflightPoll.isValidatePositionsComplete()) {

Review Comment:
   I refactored `checkInflightPoll()` and pulled out 
`maybeClearPreviousInflightPoll()` and `maybeClearCurrentInflightPoll()` helper 
methods. PTAL.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
         }
     }
 
+    /**
+     * {@code checkInflightPoll()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has completed. If it has, it will be
+     * processed accordingly.
+     */
+    @SuppressWarnings({"CyclomaticComplexity"})
+    public void checkInflightPoll(Timer timer, boolean firstPass) {
+        // Handle the case where there's an inflight poll from the *previous* 
invocation of AsyncKafkaConsumer.poll().
+        if (firstPass && inflightPoll != null) {
+            if (inflightPoll.isComplete()) {
+                Optional<KafkaException> errorOpt = inflightPoll.error();
+
+                if (errorOpt.isPresent()) {
+                    // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                    // an error, throw it without delay.
+                    log.trace("Previous inflight event {} completed with an 
error, clearing", inflightPoll);
+                    inflightPoll = null;
+                    throw errorOpt.get();
+                } else {
+                    if (fetchBuffer.isEmpty()) {
+                        // If it completed without error, but without 
populating the fetch buffer, clear the event
+                        // so that a new event will be enqueued below.
+                        log.trace("Previous inflight event {} completed 
without filling the buffer, clearing", inflightPoll);
+                        inflightPoll = null;
+                    } else {
+                        // However, if the event completed, and it populated 
the buffer, *don't* create a new event.
+                        // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                        // If a new event was created on *every* poll, each 
time the event would have to complete the
+                        // validate positions stage before the data in the 
fetch buffer is used. Because there is
+                        // no blocking, and effectively a 0 wait, the data in 
the fetch buffer is continuously ignored
+                        // leading to no data ever being returned from poll().
+                        log.trace("Previous inflight event {} completed and 
filled the buffer, not clearing", inflightPoll);
+                    }
+                }
+            } else if (time.milliseconds() >= inflightPoll.deadlineMs() && 
inflightPoll.isValidatePositionsComplete()) {

Review Comment:
   I refactored `checkInflightPoll()` and pulled out 
`maybeClearPreviousInflightPoll()` and `maybeClearCurrentInflightPoll()` helper 
methods. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to