kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2407857084
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -876,6 +871,67 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}
+ /**
+ * {@code checkInflightPollResult()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has reached a
+ * {@link AsyncPollEvent.State terminal state}. If it has, the result will
be processed accordingly.
+ */
+ public void checkInflightPollResult(Timer timer) {
+ if (inflightPoll == null) {
+ log.trace("No existing inflight async poll event, submitting a new
event");
+ submitEvent(timer);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure to
handle possible errors.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
+
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Attempting to retrieve result from previously submitted
{} with {} remaining on timer",
+ inflightPoll,
+ timer.remainingMs()
+ );
+ }
+
+ // Result should be non-null and starts off as State.STARTED.
+ AsyncPollEvent.Result result = inflightPoll.result();
+ AsyncPollEvent.State state = result.state();
+
+ if (state == AsyncPollEvent.State.SUCCEEDED) {
+ // The async poll event has completed all the requisite
stages, though it does not imply that
+ // there is data in the FetchBuffer yet. Make sure to clear
out the inflight request.
+ log.trace("Event {} completed, clearing inflight",
inflightPoll);
+ inflightPoll = null;
+ } else if (state == AsyncPollEvent.State.FAILED) {
Review Comment:
I've removed the states, as requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]