cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579221285


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -769,7 +730,8 @@ public void commitAsync(OffsetCommitCallback callback) {
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
-            AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
+            Timer timer = time.timer(Long.MAX_VALUE);
+            AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets, 
timer);

Review Comment:
   Why do you use a timer here? The `asyncCommit()` does not throw any timeout 
exception, does it? If you need to pass the timer to the `CommitEvent` or 
further up the class hierarchy then you can create the timer in the constructor 
of `AsyncCommitEvent` or even further up the class hierarchy. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java:
##########
@@ -27,19 +30,34 @@
 public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent 
implements CompletableEvent<T> {
 
     private final CompletableFuture<T> future;
+    private final long deadlineMs;
 
-    protected CompletableBackgroundEvent(final Type type) {
+    protected CompletableBackgroundEvent(final Type type, final Timer timer) {
         super(type);
         this.future = new CompletableFuture<>();
+        Objects.requireNonNull(timer);
+
+        long currentTimeMs = timer.currentTimeMs();
+        long remainingMs = timer.remainingMs();
+
+        if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+            this.deadlineMs = Long.MAX_VALUE;
+        else
+            this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   Same questions as above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java:
##########
@@ -16,120 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.IdempotentCloser;
-import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.io.Closeable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * An {@link EventProcessor} is the means by which events <em>produced</em> by 
thread <em>A</em> are
- * <em>processed</em> by thread <em>B</em>. By definition, threads <em>A</em> 
and <em>B</em> run in parallel to
- * each other, so a mechanism is needed with which to receive and process the 
events from the other thread. That
- * communication channel is formed around {@link BlockingQueue a shared queue} 
into which thread <em>A</em>
- * enqueues events and thread <em>B</em> reads and processes those events.
+ * An {@code EventProcessor} is the means by which events are 
<em>processed</em>, the meaning of which is left
+ * intentionally loose. This is in large part to keep the {@code 
EventProcessor} focused on what it means to process
+ * the events, and <em>not</em> linking itself too closely with the rest of 
the surrounding application.
+ *
+ * <p/>
+ *
+ * The {@code EventProcessor} is envisaged as a stateless service that acts as 
a conduit, receiving an event and
+ * dispatching to another block of code to process. The semantic meaning of 
each event is different, so the
+ * {@code EventProcessor} will need to interact with other parts of the system 
that maintain state. The
+ * implementation should not be concerned with the mechanism by which an event 
arrived for processing. While the
+ * events are shuffled around the consumer subsystem by means of {@link 
BlockingQueue shared queues}, it should
+ * be considered an anti-pattern to need to know how it arrived or what 
happens after its is processed.
  */
-public abstract class EventProcessor<T> implements Closeable {
-
-    private final Logger log;
-    private final BlockingQueue<T> eventQueue;
-    private final IdempotentCloser closer;
-
-    protected EventProcessor(final LogContext logContext, final 
BlockingQueue<T> eventQueue) {
-        this.log = logContext.logger(EventProcessor.class);
-        this.eventQueue = eventQueue;
-        this.closer = new IdempotentCloser();
-    }
-
-    public abstract boolean process();
-
-    protected abstract void process(T event);
-
-    @Override
-    public void close() {
-        closer.close(this::closeInternal, () -> log.warn("The event processor 
was already closed"));
-    }
-
-    protected interface ProcessHandler<T> {
-
-        void onProcess(T event, Optional<KafkaException> error);
-    }
+public interface EventProcessor<T> extends AutoCloseable {
 
     /**
-     * Drains all available events from the queue, and then processes them in 
order. If any errors are thrown while
-     * processing the individual events, these are submitted to the given 
{@link ProcessHandler}.
+     * Process an event that is received.
      */
-    protected boolean process(ProcessHandler<T> processHandler) {
-        closer.assertOpen("The processor was previously closed, so no further 
processing can occur");
-
-        List<T> events = drain();
-
-        if (events.isEmpty()) {
-            log.trace("No events to process");
-            return false;
-        }
+    void process(T event);
 
-        try {
-            log.trace("Starting processing of {} event{}", events.size(), 
events.size() == 1 ? "" : "s");
-
-            for (T event : events) {
-                try {
-                    Objects.requireNonNull(event, "Attempted to process a null 
event");
-                    log.trace("Processing event: {}", event);
-                    process(event);
-                    processHandler.onProcess(event, Optional.empty());
-                } catch (Throwable t) {
-                    KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(t);
-                    processHandler.onProcess(event, Optional.of(error));
-                }
-            }
-        } finally {
-            log.trace("Completed processing");
-        }
-
-        return true;
-    }
-
-    /**
-     * It is possible for the consumer to close before complete processing all 
the events in the queue. In
-     * this case, we need to throw an exception to notify the user the 
consumer is closed.
-     */
-    private void closeInternal() {
-        log.trace("Closing event processor");
-        List<T> incompleteEvents = drain();
-
-        if (incompleteEvents.isEmpty())
-            return;
-
-        KafkaException exception = new KafkaException("The consumer is 
closed");
-
-        // Check each of the events and if it has a Future that is incomplete, 
complete it exceptionally.
-        incompleteEvents
-                .stream()
-                .filter(e -> e instanceof CompletableEvent)
-                .map(e -> ((CompletableEvent<?>) e).future())
-                .filter(f -> !f.isDone())
-                .forEach(f -> {
-                    log.debug("Completing {} with exception {}", f, 
exception.getMessage());
-                    f.completeExceptionally(exception);
-                });
-
-        log.debug("Discarding {} events because the consumer is closing", 
incompleteEvents.size());
-    }
-
-    /**
-     * Moves all the events from the queue to the returned list.
-     */
-    private List<T> drain() {
-        LinkedList<T> events = new LinkedList<>();
-        eventQueue.drainTo(events);
-        return events;
+    @Override
+    default void close() {
+        // Do nothing by default...

Review Comment:
   Wouldn't it be safer to not have a default here as a reminder for closing.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1338,7 +1339,14 @@ private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(Consume
                                                                              
Set<TopicPartition> partitions) {
         SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         sortedPartitions.addAll(partitions);
-        CompletableBackgroundEvent<Void> event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+
+        // We don't yet have the concept of having an expiring callback, but 
we will likely want that eventually.
+        Timer timer = time.timer(Long.MAX_VALUE);
+        CompletableBackgroundEvent<Void> event = new 
ConsumerRebalanceListenerCallbackNeededEvent(
+            methodName,
+            sortedPartitions,
+            timer
+        );

Review Comment:
   Here I have a similar comment as with `AsyncCommitEvent`, I would move the 
creation of the timer into the constructor of 
`ConsumerRebalanceListenerCallbackNeededEvent`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1270,6 +1230,20 @@ private void close(Duration timeout, boolean 
swallowException) {
         if (applicationEventHandler != null)
             closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
         closeTimer.update();
+
+        if (backgroundEventReaper != null && backgroundEventQueue != null) {

Review Comment:
   Why should those two fields ever be `null`?
   They seem necessary for the consumer to function correctly. If my statement 
is correct, the constructors should ensure that those fields are never `null`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -273,9 +301,20 @@ void cleanup() {
             log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
         } finally {
             sendUnsentRequests(timer);
+
+            // Copy over the completable events to a separate list, then reap 
any incomplete
+            // events on that list.

Review Comment:
   I think that is clear from the code. We do not need the comment.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -128,7 +138,19 @@ void runOnce() {
         // Process the events—if any—that were produced by the application 
thread. It is possible that when processing
         // an event generates an error. In such cases, the processor will log 
an exception, but we do not want those
         // errors to be propagated to the caller.
-        applicationEventProcessor.process();
+        LinkedList<ApplicationEvent> events = new LinkedList<>();
+        applicationEventQueue.drainTo(events);
+
+        for (ApplicationEvent event : events) {
+            try {
+                if (event instanceof CompletableApplicationEvent)
+                    
applicationEventReaper.add((CompletableApplicationEvent<?>) event);
+
+                applicationEventProcessor.process(event);
+            } catch (Throwable t) {
+                log.warn("Error processing event {}", t.getMessage(), t);
+            }
+        }

Review Comment:
   Could you export this lines into a method `processApplicationEvents()` or 
similar. I think it makes the code more readable.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -144,6 +166,12 @@ void runOnce() {
                 .map(Optional::get)
                 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
                 .reduce(Long.MAX_VALUE, Math::min);
+
+        // "Complete" any events that have expired. This cleanup step should 
only be called after the network I/O
+        // thread has made at least one call to poll. This is done to emulate 
the behavior of the legacy consumer's
+        // handling of timeouts. The legacy consumer makes at least one 
attempt to satisfy any network requests
+        // before checking if a timeout has expired.

Review Comment:
   Do we need this comment? I am not a big fan of inline comments in general. 
All the information about the behavior in the comment should be clear from the 
corresponding unit tests. I do not think we need the references to the legacy 
consumer. Once the legacy consumer is gone we need to remove these references 
which is work that we can avoid by just not writing those comments. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper<T extends CompletableEvent<?>> {
+
+    private final Logger log;
+
+    /**
+     * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+     */
+    private final List<T> tracked;
+
+    public CompletableEventReaper(LogContext logContext) {
+        this.log = logContext.logger(CompletableEventReaper.class);
+        this.tracked = new ArrayList<>();
+    }
+
+    /**
+     * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+     *
+     * @param event Event to track
+     */
+    public void add(T event) {
+        tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+    }
+
+    /**
+     * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+     * is a two-step process:
+     *
+     * <ol>
+     *     <li>
+     *         For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+     *         instance of {@link TimeoutException} is created and passed to
+     *         {@link CompletableFuture#completeExceptionally(Throwable)}.
+     *     </li>
+     *     <li>
+     *         For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+     *         {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+     *     </li>
+     * </ol>
+     *
+     * <p/>
+     *
+     * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+     *
+     * @param currentTimeMs <em>Current</em> time with which to compare 
against the
+     *                      <em>{@link CompletableEvent#deadlineMs() 
expiration time}</em>
+     */
+    public void reapExpiredAndCompleted(long currentTimeMs) {
+        log.trace("Reaping expired events");
+
+        Consumer<CompletableEvent<?>> timeoutEvent = e -> {
+            TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+            long pastDueMs = currentTimeMs - e.deadlineMs();
+            log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+            CompletableFuture<?> f = e.future();
+            f.completeExceptionally(error);
+        };
+
+        // First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+        tracked
+                .stream()
+                .filter(e -> !e.future().isDone())
+                .filter(e -> currentTimeMs > e.deadlineMs())
+                .forEach(timeoutEvent);
+

Review Comment:
   nit:
   IMO, this is more readable, but feel free to leave it if you do not share my 
taste.
   ```suggestion
           tracked.stream()
               .filter(e -> !e.future().isDone())
               .filter(e -> currentTimeMs > e.deadlineMs())
               .forEach(timeoutEvent);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##########
@@ -36,13 +36,14 @@ protected CompletableApplicationEvent(final Type type, 
final Timer timer) {
         super(type);
         this.future = new CompletableFuture<>();
         Objects.requireNonNull(timer);
-        this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
-    }
 
-    protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
-        super(type);
-        this.future = new CompletableFuture<>();
-        this.deadlineMs = deadlineMs;
+        long currentTimeMs = timer.currentTimeMs();
+        long remainingMs = timer.remainingMs();
+
+        if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+            this.deadlineMs = Long.MAX_VALUE;
+        else
+            this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   I have two questions here:
   
   1. Why is this code not in `CompletableEvent`?
   2. Why do you not keep the timer in a object field? 
   
   Regarding 2, I have the feeling this PR contains code that already exist in 
the timer. You could have a method on `CompletableEvent` that checks for the 
expiration without exposing the timer. Something like `isExpired()` or 
`isTimedOut`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to