AndrewJSchofield commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1733563763


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1577,27 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            UpdateFetchPositionsEvent updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
+            wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
+            cachedSubscriptionHasAllFetchPositions = 
applicationEventHandler.addAndGet(updateFetchPositionsEvent);

Review Comment:
   Previously, this variable was being set based on 
`subscriptions.hasAllFetchPositions()` and did not rely on successfully 
retrieving the result of the new event. I wonder whether it has become stickier 
as a result which might not be desirable. Perhaps you should set it to false 
before waiting for the result of the event so that it's false if the result 
does not arrive. What do you think the "default" value would be here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    private boolean 
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+        Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+        if (exception != null) {
+            // Return exception that may have been encountered on a previous 
attempt to update
+            // positions, after the triggering event had already expired.
+            result.completeExceptionally(exception);
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {

Review Comment:
   This `CompletableFuture<Boolean>` is a slightly tricky thing. The boolean I 
think refers to whether we have all of the offset information we need. But 
there are a lot of "results" flying around and I'm finding it tricky to follow 
this boolean through the various methods. I wonder whether a specific variable 
name for the future which contains the flag which indicates whether we have all 
of the offset information used at all points whether it's currently a variable 
or a method argument would help my tiny brain.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -194,14 +402,15 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
      * an error is received in the response, it will be saved to be thrown on 
the next call to
      * this function (ex. {@link 
org.apache.kafka.common.errors.TopicAuthorizationException})
      */
-    public CompletableFuture<Void> resetPositionsIfNeeded() {
+    protected CompletableFuture<Void> resetPositionsIfNeeded() {

Review Comment:
   I always feel that `protected` has an implication of subclassing. I don't 
believe this is being subclassed, so I'd prefer to see package-private which 
should give accessibility for testing. Of course, this is a matter of taste so 
feel free to ignore this :)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1577,27 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            UpdateFetchPositionsEvent updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer));

Review Comment:
   I believe that this will create an `UpdateFetchPositionsEvent` for every 
iteration of the inner loop in `AsyncKafkaConsumer.poll(Duration)`. Previously, 
the `cachedSubscriptionsHasAllFetchPositions` was an attempt at optimising this 
area, which was probably not entirely satisfactory but it was well intentioned. 
Generally, once the positions have been validated, there's no need for any of 
this unless something nasty happens.  I feel there's an optimisation waiting 
here. We know when the set of assigned partitions changed. We also know when an 
error such as log truncation occurred. We know when an 
`UpdateFetchPositionsEvent` completed successfully. So, I think we probably can 
be a bit more deliberate about creating the event only when it's needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to