lianetm commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1744261989
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -182,6 +215,222 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO result.fetchedOffsets)); } + /** + * Update fetch positions for assigned partitions that do not have a position. This will: + * <ul> + * <li>check if all assigned partitions already have fetch positions and return right away if that's the case</li> + * <li>trigger an async request to validate positions (detect log truncation)</li> + * <li>fetch committed offsets if enabled, and use the response to update the positions</li> + * <li>fetch partition offsets for partitions that may still require a position, and use the response to + * update the positions</li> + * </ul> + * + * @param deadlineMs Time in milliseconds when the triggering application event expires. Any error received after + * this will be saved, and used to complete the result exceptionally on the next call to this + * function. + * @return Future that will complete with a boolean indicating if all assigned partitions have positions (based + * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete immediately, with true, if all positions + * are already available. If some positions are missing, the future will complete once the offsets are retrieved and positions are updated. + */ + public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { + CompletableFuture<Boolean> result = new CompletableFuture<>(); + + try { + if (maybeCompleteWithPreviousException(result)) { + return result; + } + + validatePositionsIfNeeded(); + + if (subscriptionState.hasAllFetchPositions()) { + // All positions are already available + result.complete(true); + return result; + } + + // Some positions are missing, so trigger requests to fetch offsets and update them. + updatePositionsWithOffsets(deadlineMs).whenComplete((__, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(subscriptionState.hasAllFetchPositions()); + } + }); + + } catch (Exception e) { + result.completeExceptionally(maybeWrapAsKafkaException(e)); + } + return result; + } + + private boolean maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) { + Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); + if (cachedException != null) { + result.completeExceptionally(cachedException); + return true; + } + return false; + } + + /** + * Generate requests to fetch offsets, and update positions once a response is received. + */ + private CompletableFuture<Void> updatePositionsWithOffsets(long deadlineMs) { + CompletableFuture<Void> result = new CompletableFuture<>(); + + cacheExceptionIfEventExpired(result, deadlineMs); + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. + if (commitRequestManager != null) { + CompletableFuture<Void> initWithCommittedOffsetsResult = initWithCommittedOffsetsIfNeeded(deadlineMs); + initWithCommittedOffsetsResult.whenComplete((__, error) -> { + if (error == null) { + initWithPartitionOffsetsIfNeeded(result); + } else { + result.completeExceptionally(error); + } + }); + } else { + initWithPartitionOffsetsIfNeeded(result); + } + return result; + } + + /** + * Save exception that may occur while updating fetch positions. Note that since the update fetch positions + * is triggered asynchronously, errors may be found when the triggering UpdateFetchPositionsEvent has already + * expired. In that case, the exception is saved in memory, to be thrown when processing the following + * UpdateFetchPositionsEvent. + * + * @param result Update fetch positions future to get the exception from (if any) + * @param deadlineMs Deadline of the triggering application event, used to identify if the event has already + * expired when the error in the result future occurs. + */ + private void cacheExceptionIfEventExpired(CompletableFuture<Void> result, long deadlineMs) { + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + cachedUpdatePositionsException.set(error); + } + }); + } + + /** + * If there are partitions still needing a position and a reset policy is defined, request + * reset using the default policy. + * + * @param result Future that will complete when the reset operation completes. + * @throws NoOffsetForPartitionException If no reset strategy is configured + */ + private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Void> result) { + try { + // Mark partitions that need reset, using the configured reset strategy. If no + // strategy is defined, this will raise a NoOffsetForPartitionException exception. + subscriptionState.resetInitializingPositions(); + } catch (Exception e) { + result.completeExceptionally(e); + return; + } + + // For partitions awaiting reset, generate a ListOffset request to retrieve the partition + // offsets according to the strategy (ex. earliest, latest), and update the positions. + resetPositionsIfNeeded().whenComplete((resetResult, error) -> { + if (error == null) { + result.complete(null); + } else { + result.completeExceptionally(error); + } + }); + } + + /** + * Fetch the committed offsets for partitions that require initialization. Use them to set + * the fetch positions in the subscription state. + * + * @throws TimeoutException If offsets could not be retrieved within the timeout + */ + private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long deadlineMs) { + final Set<TopicPartition> initializingPartitions = subscriptionState.initializingPartitions(); + + if (initializingPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + CompletableFuture<Void> result = new CompletableFuture<>(); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchCommittedFuture; + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + // Need to generate a new request to fetch committed offsets + final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs); Review Comment: the trick here is that we want to extend the deadline (to the default_api_timeout, or the specific api call, whatever is larger). This pending request is mainly to solve the case of repeated calls with low timeout (ie. poll(0)), so we expect that we'll end up having the default_api_timeout used for deadline, but also considering the cases where the specific call had a larger timeout and we would use it in that case (so that's why the max). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org