lianetm commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2800855155


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -277,6 +277,28 @@ void updateSubscriptionState(Map<TopicPartition, 
OffsetFetcherUtils.ListOffsetDa
                     log.trace("Updating high watermark for partition {} to 
{}", partition, offset);
                     subscriptionState.updateHighWatermark(partition, offset);
                 }
+            } else {
+                if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+                    log.warn("Not updating last stable offset for partition {} 
as it is no longer assigned", partition);
+                } else {
+                    log.warn("Not updating high watermark for partition {} as 
it is no longer assigned", partition);
+                }
+            }
+        }
+    }
+
+    /**
+     * If any of the given partitions are assigned, this will clear the 
partition's 'end offset requested' flag so
+     * that the next attempt to look up the lag will properly issue another 
<code>LIST_OFFSETS</code> request. This
+     * is only intended to be called when <code>LIST_OFFSETS</code> fails. 
Successful <code>LIST_OFFSETS</code> calls
+     * should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
+     *
+     * @param partitions Partitions for which the 'end offset requested' flag 
should be cleared (if still assigned)
+     */
+    void clearPartitionEndOffsetRequests(Collection<TopicPartition> 
partitions) {
+        for (final TopicPartition partition : partitions) {
+            if 
(subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
+                log.trace("Clearing partition end offset requested for 
partition {}", partition);

Review Comment:
   a bit confusing, dup "partition"?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -153,11 +162,14 @@ public void onSuccess(ListOffsetResult value) {
                         
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
 
                         
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, 
isolationLevel);
+                        
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());

Review Comment:
   here we're clearing the flag for the partitions that didn't get offsets yet. 
I agree we need this if we don't have any time left to retry. But if there's 
still time, the do-while will try again. In that case, do we want to clear the 
flag here? 
   
   I would imagine we don't, because we'll continue retrying while there is 
time. It could be the case of missing leader info for instance: we want to keep 
the flag on for those partitions, hit the `client.awaitMetadataUpdate(timer)` 
below, and try again in the next iteration of the do-while, right?
   
   If so, I imagine we could take the timer into consideration here? (clear the 
flag for the failed partitions only if timer expired?). Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -153,11 +162,14 @@ public void onSuccess(ListOffsetResult value) {
                         
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
 
                         
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, 
isolationLevel);
+                        
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
                     }
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
+                    
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());

Review Comment:
   same as above



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -659,14 +659,25 @@ public synchronized Long 
partitionEndOffset(TopicPartition tp, IsolationLevel is
 
     public synchronized void requestPartitionEndOffset(TopicPartition tp) {
         TopicPartitionState topicPartitionState = assignedState(tp);
-        topicPartitionState.requestEndOffset();
+        topicPartitionState.setRequestEndOffset(true);
     }
 
     public synchronized boolean partitionEndOffsetRequested(TopicPartition tp) 
{
         TopicPartitionState topicPartitionState = assignedState(tp);
         return topicPartitionState.endOffsetRequested();
     }
 
+    public synchronized boolean 
maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
+        TopicPartitionState topicPartitionState = assignedStateOrNull(tp);
+
+        if (topicPartitionState != null && 
topicPartitionState.endOffsetRequested()) {
+            topicPartitionState.setRequestEndOffset(false);
+            return true;
+        } else {
+            return false;

Review Comment:
   this return type seems to only be used to log if we performed the action or 
not. Should we move the log here instead? (simplify this func and the caller) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to