kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2482655989
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ @SuppressWarnings({"CyclomaticComplexity"})
+ public void checkInflightPoll(Timer timer, boolean firstPass) {
+ // Handle the case where there's an inflight poll from the *previous*
invocation of AsyncKafkaConsumer.poll().
+ if (firstPass && inflightPoll != null) {
+ if (inflightPoll.isComplete()) {
+ Optional<KafkaException> errorOpt = inflightPoll.error();
+
+ if (errorOpt.isPresent()) {
+ // If the previous inflight event is complete, check if it
resulted in an error. If there was
+ // an error, throw it without delay.
+ log.trace("Previous inflight event {} completed with an
error, clearing", inflightPoll);
+ inflightPoll = null;
+ throw errorOpt.get();
+ } else {
+ if (fetchBuffer.isEmpty()) {
+ // If it completed without error, but without
populating the fetch buffer, clear the event
+ // so that a new event will be enqueued below.
+ log.trace("Previous inflight event {} completed
without filling the buffer, clearing", inflightPoll);
+ inflightPoll = null;
+ } else {
+ // However, if the event completed, and it populated
the buffer, *don't* create a new event.
+ // This is to prevent an edge case of starvation when
poll() is called with a timeout of 0.
+ // If a new event was created on *every* poll, each
time the event would have to complete the
+ // validate positions stage before the data in the
fetch buffer is used. Because there is
+ // no blocking, and effectively a 0 wait, the data in
the fetch buffer is continuously ignored
+ // leading to no data ever being returned from poll().
+ log.trace("Previous inflight event {} completed and
filled the buffer, not clearing", inflightPoll);
+ }
+ }
+ } else if (time.milliseconds() >= inflightPoll.deadlineMs() &&
inflightPoll.isValidatePositionsComplete()) {
Review Comment:
Added an `isExpired()` method to `AsyncPollEvent`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,108 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ @SuppressWarnings({"CyclomaticComplexity"})
Review Comment:
I refactored `checkInflightPoll()` and pulled out
`maybeClearPreviousInflightPoll()` and `maybeClearCurrentInflightPoll()` helper
methods. PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]