lianetm commented on code in PR #20521: URL: https://github.com/apache/kafka/pull/20521#discussion_r2399561924
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.slf4j.Logger; + +import java.time.Duration; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; + +/** + * {@code CompositePollEventInvoker} is executed on the application thread in the + * {@link AsyncKafkaConsumer#poll(Duration)}. + */ +public class CompositePollEventInvoker { + + private final Logger log; + private final Time time; + private final ApplicationEventHandler applicationEventHandler; + private final Runnable applicationThreadCallbacks; + private CompositePollEvent inflight; + + public CompositePollEventInvoker(LogContext logContext, + Time time, + ApplicationEventHandler applicationEventHandler, + Runnable applicationThreadCallbacks) { Review Comment: this param ends up being used to pass the same thing all the time (trigger commit/rebalance callbacks). Do we add any value by parameterizing? Wondering if just directly triggering the callbacks on poll() here would be simpler ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -217,35 +235,109 @@ public void process(ApplicationEvent event) { } } - private void process(final PollEvent event) { - // Trigger a reconciliation that can safely commit offsets if needed to rebalance, - // as we're processing before any new fetching starts in the app thread - requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> - consumerMembershipManager.maybeReconcile(true)); - if (requestManagers.commitRequestManager.isPresent()) { - CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); - // all commit request generation points have been passed, - // so it's safe to notify the app thread could proceed and start fetching - event.markReconcileAndAutoCommitComplete(); - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - } else { - // safe to unblock - no auto-commit risk here: - // 1. commitRequestManager is not present - // 2. shareConsumer has no auto-commit mechanism - event.markReconcileAndAutoCommitComplete(); - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); + private void process(final CompositePollEvent event) { + if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, ApplicationEvent.Type.POLL)) + return; + + ApplicationEvent.Type nextEventType = event.nextEventType(); + + if (nextEventType == ApplicationEvent.Type.POLL) { + log.debug("Processing {} logic for {}", nextEventType, event); + processPollEvent(event.pollTimeMs()); + nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA; + + if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, nextEventType)) + return; + } + + if (nextEventType == ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA) { + log.debug("Processing {} logic for {}", nextEventType, event); + processUpdatePatternSubscriptionEvent(); + nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS; + + if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, nextEventType)) + return; + } Review Comment: I see this hasn't been addressed. Seems like unneeded states/transitions that make the `CompositePollEvent` more complex, but I could be missing why we may need it like this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -218,32 +242,35 @@ public void process(ApplicationEvent event) { } private void process(final PollEvent event) { + processPollEvent(event.pollTimeMs()); + event.markReconcileAndAutoCommitComplete(); Review Comment: I would expect this is not needed anymore, and actually doesn't seem used. This used to be to ensure we didn't move onto generating a fetch before retrieving positions to commit (both steps were independently triggered from the app thread), but that's not the case anymore right?) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.slf4j.Logger; + +import java.time.Duration; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; + +/** + * {@code CompositePollEventInvoker} is executed on the application thread in the + * {@link AsyncKafkaConsumer#poll(Duration)}. + */ +public class CompositePollEventInvoker { + + private final Logger log; + private final Time time; + private final ApplicationEventHandler applicationEventHandler; + private final Runnable applicationThreadCallbacks; + private CompositePollEvent inflight; + + public CompositePollEventInvoker(LogContext logContext, + Time time, + ApplicationEventHandler applicationEventHandler, + Runnable applicationThreadCallbacks) { + this.log = logContext.logger(getClass()); + this.time = time; + this.applicationEventHandler = applicationEventHandler; + this.applicationThreadCallbacks = applicationThreadCallbacks; + } + + /** + * {@code poll()} manages the lifetime of the {@link CompositePollEvent} processing. If it is called when + * no event is currently processing, it will start a new event processing asynchronously. A check is made + * during each invocation to see if the <em>inflight</em> event has reached a + * {@link CompositePollEvent.State terminal state}. If it has, the result will be processed accordingly. + */ + public void poll(Timer timer) { + if (inflight == null) { + log.trace("No existing inflight event, submitting a new event"); + submitEvent(ApplicationEvent.Type.POLL, timer); + } + + try { + if (log.isTraceEnabled()) { + log.trace( + "Attempting to retrieve result from previously submitted {} with {} remaining on timer", + inflight, + timer.remainingMs() + ); + } + + // Result should be non-null and starts off as State.STARTED. + CompositePollEvent.Result result = inflight.result(); + CompositePollEvent.State state = result.state(); + + if (state == CompositePollEvent.State.SUCCEEDED) { + // The composite event has completed all the requisite stages, though it does not imply that + // there is data in the FetchBuffer yet. Make sure to clear out the inflight request. + log.trace("Event {} completed, clearing inflight", inflight); + inflight = null; + } else if (state == CompositePollEvent.State.FAILED) { + // The composite event failed at one of the stages. Make sure to clear out the inflight request + // before the underlying error is surfaced to the user. + log.trace("Event {} failed, clearing inflight", inflight); + inflight = null; + + throw result.asKafkaException(); + } else if (state == CompositePollEvent.State.CALLBACKS_REQUIRED) { + // The background thread detected that it needed to yield to the application thread to invoke + // callbacks. Even though the inflight reference _should_ be overwritten when the next stage of + // the event is submitted, go ahead and clear out the inflight request just to be sure. + log.trace("Event {} paused for callbacks, clearing inflight", inflight); + inflight = null; + + // Note: this is calling user-supplied code, so make sure to handle possible errors. + applicationThreadCallbacks.run(); + + // The application thread callbacks are complete. Create another event to resume the polling at + // the next stage. + submitEvent(result.asNextEventType(), timer); + } Review Comment: I don't quite get why we need this whole new state to trigger rebalance/commit callbacks? We already have thread-safe queues where elems are added IF there are callbacks that need to be executed, so I was expecting that we simply need to check if there is something on those queues and run it at the beginning of each poll iteration? ``` public void poll(Timer timer) { // trigger rebalance and commit callbacks if any - what applicationThreadCallbacks.run() does offsetCommitCallbackInvoker.executeCallbacks(); processBackgroundEvents(); if (inflight == null) { log.trace("No existing inflight event, submitting a new event"); submitEvent(ApplicationEvent.Type.POLL, timer); } .... ``` Would that work? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -726,6 +757,57 @@ private void process(final StreamsOnAllTasksLostCallbackCompletedEvent event) { requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event); } + private void process(final CompositePollEvent event) { + CompositePollEventProcessorContext context = compositePollContext.orElseThrow(IllegalArgumentException::new); + ApplicationEvent.Type nextEventType = event.startingEventType(); + + if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType)) + return; + + if (nextEventType == ApplicationEvent.Type.POLL) { + log.debug("Processing {} logic for {}", nextEventType, event); + processPollEvent(event.pollTimeMs()); + nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA; + + if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType)) Review Comment: Totally agree with `maybeCompleteWithCallbackRequired` here (right after triggering a reconciliation, we may have callbacks to run) But I'm still not convinced about the need for `maybeCompleteExceptionally` here. That one is to propagate metadata errors we may have received in MetadataReponses. The classic consumer would throw them on the first call to client.poll hit from the consumer.poll (could be dealing with the coordinator, or later on). With the async consumer, I would imagine it should be enough to have a single point within the `consumer.poll` where we "maybeThrowMetadataErrors" We already have an initial check to "maybeThrowMetadataErrors" (where the [comment](https://github.com/apache/kafka/pull/20521/files#r2399350196) is to maybe move to the app thread even). Shouldn't that be enough? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -726,6 +757,57 @@ private void process(final StreamsOnAllTasksLostCallbackCompletedEvent event) { requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event); } + private void process(final CompositePollEvent event) { + CompositePollEventProcessorContext context = compositePollContext.orElseThrow(IllegalArgumentException::new); + ApplicationEvent.Type nextEventType = event.startingEventType(); + + if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType)) + return; Review Comment: this logic is to maybe jump back to the app thread, but at this point we haven't really done anything in the background yet, so couldn't we avoid this hop and check for this in the app thread directly? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -218,32 +242,35 @@ public void process(ApplicationEvent event) { } private void process(final PollEvent event) { + processPollEvent(event.pollTimeMs()); + event.markReconcileAndAutoCommitComplete(); + } + + private void processPollEvent(final long pollTimeMs) { // Trigger a reconciliation that can safely commit offsets if needed to rebalance, // as we're processing before any new fetching starts in the app thread requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> consumerMembershipManager.maybeReconcile(true)); if (requestManagers.commitRequestManager.isPresent()) { CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); + commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs); // all commit request generation points have been passed, // so it's safe to notify the app thread could proceed and start fetching Review Comment: this should be removed (referred to the `markReconcileAndAutoCommitComplete` that was removed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
