lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1608487528
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing + * any that exceed their {@link CompletableEvent#deadlineMs() deadline} (unless they've already completed). This + * mechanism is used by the {@link AsyncKafkaConsumer} to enforce the timeout provided by the user in its API + * calls (e.g. {@link AsyncKafkaConsumer#commitSync(Duration)}). + */ +public class CompletableEventReaper { + + private final Logger log; + + /** + * List of tracked events that are candidates for expiration. + */ + private final List<CompletableEvent<?>> tracked; + + public CompletableEventReaper(LogContext logContext) { + this.log = logContext.logger(CompletableEventReaper.class); + this.tracked = new ArrayList<>(); + } + + /** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ + public void add(CompletableEvent<?> event) { + tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); + } + + /** + * This method performs a two-step process to "complete" {@link CompletableEvent events} that have either expired + * or completed normally: + * + * <ol> + * <li> + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * </li> + * <li> + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * </li> + * </ol> + * + * <p/> + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs <em>Current</em> time with which to compare against the + * <em>{@link CompletableEvent#deadlineMs() expiration time}</em> + */ + public void reap(long currentTimeMs) { + Consumer<CompletableEvent<?>> expireEvent = event -> { + long pastDueMs = currentTimeMs - event.deadlineMs(); + TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs); + } else { + log.trace("Event {} not completed exceptionally since it was previously completed", event); + } + }; + + // First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. + tracked.stream() + .filter(e -> !e.future().isDone()) + .filter(e -> currentTimeMs > e.deadlineMs()) Review Comment: Don't we want >= here when identifying expired events? I would expect so (that's the semantic applied in the `Timer` class [isExpired](https://github.com/apache/kafka/blob/9fe3932e5c110443f7fa545fcf0b8f78574f2f73/clients/src/main/java/org/apache/kafka/common/utils/Timer.java#L71) for instance) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org