cadonna commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1579221285
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) { public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { - AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets); + Timer timer = time.timer(Long.MAX_VALUE); + AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, timer); Review Comment: Why do you use a timer here? The `asyncCommit()` does not throw any timeout exception, does it? If you need to pass the timer to the `CommitEvent` or further up the class hierarchy then you can create the timer in the constructor of `AsyncCommitEvent` or even further up the class hierarchy. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java: ########## @@ -27,19 +30,34 @@ public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent implements CompletableEvent<T> { private final CompletableFuture<T> future; + private final long deadlineMs; - protected CompletableBackgroundEvent(final Type type) { + protected CompletableBackgroundEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); + Objects.requireNonNull(timer); + + long currentTimeMs = timer.currentTimeMs(); + long remainingMs = timer.remainingMs(); + + if (currentTimeMs > Long.MAX_VALUE - remainingMs) + this.deadlineMs = Long.MAX_VALUE; + else + this.deadlineMs = currentTimeMs + remainingMs; Review Comment: Same questions as above. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java: ########## @@ -16,120 +16,31 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.internals.IdempotentCloser; -import org.apache.kafka.common.utils.LogContext; -import org.slf4j.Logger; - -import java.io.Closeable; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; /** - * An {@link EventProcessor} is the means by which events <em>produced</em> by thread <em>A</em> are - * <em>processed</em> by thread <em>B</em>. By definition, threads <em>A</em> and <em>B</em> run in parallel to - * each other, so a mechanism is needed with which to receive and process the events from the other thread. That - * communication channel is formed around {@link BlockingQueue a shared queue} into which thread <em>A</em> - * enqueues events and thread <em>B</em> reads and processes those events. + * An {@code EventProcessor} is the means by which events are <em>processed</em>, the meaning of which is left + * intentionally loose. This is in large part to keep the {@code EventProcessor} focused on what it means to process + * the events, and <em>not</em> linking itself too closely with the rest of the surrounding application. + * + * <p/> + * + * The {@code EventProcessor} is envisaged as a stateless service that acts as a conduit, receiving an event and + * dispatching to another block of code to process. The semantic meaning of each event is different, so the + * {@code EventProcessor} will need to interact with other parts of the system that maintain state. The + * implementation should not be concerned with the mechanism by which an event arrived for processing. While the + * events are shuffled around the consumer subsystem by means of {@link BlockingQueue shared queues}, it should + * be considered an anti-pattern to need to know how it arrived or what happens after its is processed. */ -public abstract class EventProcessor<T> implements Closeable { - - private final Logger log; - private final BlockingQueue<T> eventQueue; - private final IdempotentCloser closer; - - protected EventProcessor(final LogContext logContext, final BlockingQueue<T> eventQueue) { - this.log = logContext.logger(EventProcessor.class); - this.eventQueue = eventQueue; - this.closer = new IdempotentCloser(); - } - - public abstract boolean process(); - - protected abstract void process(T event); - - @Override - public void close() { - closer.close(this::closeInternal, () -> log.warn("The event processor was already closed")); - } - - protected interface ProcessHandler<T> { - - void onProcess(T event, Optional<KafkaException> error); - } +public interface EventProcessor<T> extends AutoCloseable { /** - * Drains all available events from the queue, and then processes them in order. If any errors are thrown while - * processing the individual events, these are submitted to the given {@link ProcessHandler}. + * Process an event that is received. */ - protected boolean process(ProcessHandler<T> processHandler) { - closer.assertOpen("The processor was previously closed, so no further processing can occur"); - - List<T> events = drain(); - - if (events.isEmpty()) { - log.trace("No events to process"); - return false; - } + void process(T event); - try { - log.trace("Starting processing of {} event{}", events.size(), events.size() == 1 ? "" : "s"); - - for (T event : events) { - try { - Objects.requireNonNull(event, "Attempted to process a null event"); - log.trace("Processing event: {}", event); - process(event); - processHandler.onProcess(event, Optional.empty()); - } catch (Throwable t) { - KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t); - processHandler.onProcess(event, Optional.of(error)); - } - } - } finally { - log.trace("Completed processing"); - } - - return true; - } - - /** - * It is possible for the consumer to close before complete processing all the events in the queue. In - * this case, we need to throw an exception to notify the user the consumer is closed. - */ - private void closeInternal() { - log.trace("Closing event processor"); - List<T> incompleteEvents = drain(); - - if (incompleteEvents.isEmpty()) - return; - - KafkaException exception = new KafkaException("The consumer is closed"); - - // Check each of the events and if it has a Future that is incomplete, complete it exceptionally. - incompleteEvents - .stream() - .filter(e -> e instanceof CompletableEvent) - .map(e -> ((CompletableEvent<?>) e).future()) - .filter(f -> !f.isDone()) - .forEach(f -> { - log.debug("Completing {} with exception {}", f, exception.getMessage()); - f.completeExceptionally(exception); - }); - - log.debug("Discarding {} events because the consumer is closing", incompleteEvents.size()); - } - - /** - * Moves all the events from the queue to the returned list. - */ - private List<T> drain() { - LinkedList<T> events = new LinkedList<>(); - eventQueue.drainTo(events); - return events; + @Override + default void close() { + // Do nothing by default... Review Comment: Wouldn't it be safer to not have a default here as a reminder for closing. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -1338,7 +1339,14 @@ private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(Consume Set<TopicPartition> partitions) { SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.addAll(partitions); - CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); + + // We don't yet have the concept of having an expiring callback, but we will likely want that eventually. + Timer timer = time.timer(Long.MAX_VALUE); + CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent( + methodName, + sortedPartitions, + timer + ); Review Comment: Here I have a similar comment as with `AsyncCommitEvent`, I would move the creation of the timer into the constructor of `ConsumerRebalanceListenerCallbackNeededEvent`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + + if (backgroundEventReaper != null && backgroundEventQueue != null) { Review Comment: Why should those two fields ever be `null`? They seem necessary for the consumer to function correctly. If my statement is correct, the constructors should ensure that those fields are never `null`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -273,9 +301,20 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + + // Copy over the completable events to a separate list, then reap any incomplete + // events on that list. Review Comment: I think that is clear from the code. We do not need the comment. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -128,7 +138,19 @@ void runOnce() { // Process the events—if any—that were produced by the application thread. It is possible that when processing // an event generates an error. In such cases, the processor will log an exception, but we do not want those // errors to be propagated to the caller. - applicationEventProcessor.process(); + LinkedList<ApplicationEvent> events = new LinkedList<>(); + applicationEventQueue.drainTo(events); + + for (ApplicationEvent event : events) { + try { + if (event instanceof CompletableApplicationEvent) + applicationEventReaper.add((CompletableApplicationEvent<?>) event); + + applicationEventProcessor.process(event); + } catch (Throwable t) { + log.warn("Error processing event {}", t.getMessage(), t); + } + } Review Comment: Could you export this lines into a method `processApplicationEvents()` or similar. I think it makes the code more readable. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -144,6 +166,12 @@ void runOnce() { .map(Optional::get) .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); + + // "Complete" any events that have expired. This cleanup step should only be called after the network I/O + // thread has made at least one call to poll. This is done to emulate the behavior of the legacy consumer's + // handling of timeouts. The legacy consumer makes at least one attempt to satisfy any network requests + // before checking if a timeout has expired. Review Comment: Do we need this comment? I am not a big fan of inline comments in general. All the information about the behavior in the comment should be clear from the corresponding unit tests. I do not think we need the references to the legacy consumer. Once the legacy consumer is gone we need to remove these references which is work that we can avoid by just not writing those comments. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, + * making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper + * bound on the amount of time the event logic will execute. + */ +public class CompletableEventReaper<T extends CompletableEvent<?>> { + + private final Logger log; + + /** + * List of tracked events that we are candidates to expire or cancel when reviewed. + */ + private final List<T> tracked; + + public CompletableEventReaper(LogContext logContext) { + this.log = logContext.logger(CompletableEventReaper.class); + this.tracked = new ArrayList<>(); + } + + /** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ + public void add(T event) { + tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); + } + + /** + * This method "completes" any {@link CompletableEvent}s that have either expired or completed normally. So this + * is a two-step process: + * + * <ol> + * <li> + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * </li> + * <li> + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * </li> + * </ol> + * + * <p/> + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs <em>Current</em> time with which to compare against the + * <em>{@link CompletableEvent#deadlineMs() expiration time}</em> + */ + public void reapExpiredAndCompleted(long currentTimeMs) { + log.trace("Reaping expired events"); + + Consumer<CompletableEvent<?>> timeoutEvent = e -> { + TimeoutException error = new TimeoutException(String.format("%s could not be completed within its timeout", e.getClass().getSimpleName())); + long pastDueMs = currentTimeMs - e.deadlineMs(); + log.debug("Completing event {} exceptionally since it expired {} ms ago", e, pastDueMs); + CompletableFuture<?> f = e.future(); + f.completeExceptionally(error); + }; + + // First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. + tracked + .stream() + .filter(e -> !e.future().isDone()) + .filter(e -> currentTimeMs > e.deadlineMs()) + .forEach(timeoutEvent); + Review Comment: nit: IMO, this is more readable, but feel free to leave it if you do not share my taste. ```suggestion tracked.stream() .filter(e -> !e.future().isDone()) .filter(e -> currentTimeMs > e.deadlineMs()) .forEach(timeoutEvent); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ########## @@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); Objects.requireNonNull(timer); - this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); - } - protected CompletableApplicationEvent(final Type type, final long deadlineMs) { - super(type); - this.future = new CompletableFuture<>(); - this.deadlineMs = deadlineMs; + long currentTimeMs = timer.currentTimeMs(); + long remainingMs = timer.remainingMs(); + + if (currentTimeMs > Long.MAX_VALUE - remainingMs) + this.deadlineMs = Long.MAX_VALUE; + else + this.deadlineMs = currentTimeMs + remainingMs; Review Comment: I have two questions here: 1. Why is this code not in `CompletableEvent`? 2. Why do you not keep the timer in a object field? Regarding 2, I have the feeling this PR contains code that already exist in the timer. You could have a method on `CompletableEvent` that checks for the expiration without exposing the timer. Something like `isExpired()` or `isTimedOut`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org