lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2445477968
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2686,12 +2686,11 @@ public void testCurrentLag(GroupProtocol groupProtocol)
throws InterruptedExcept
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
- // requests: list-offset, fetch
- TestUtils.waitForCondition(() -> {
- boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
- boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
- return hasListOffsetRequest && hasFetchRequest;
- }, "No list-offset & fetch request sent");
Review Comment:
> The call to seek() still requires a LIST_OFFSETS RPC to be sent.
uhm no, not really, why? Seek sets the position without leader epoch exactly
for that purpose, to ensure that we skip validation and can start fetching
right away after the user manually sets the position with `seek` (no need to
List_Offsets)
-
https://github.com/apache/kafka/blob/35bd71cfc2c3b3c2ebe16bb7830d9ee660d4a7a2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L789
-
https://github.com/apache/kafka/blob/35bd71cfc2c3b3c2ebe16bb7830d9ee660d4a7a2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L1111-L1112
The difference I'm concerned about is that, with the current state of the
PR, seems we're delaying sending FETCH in a new way right? (the consumer not
FETCH-ing as soon as it is polled with a manually set position)
And that reveals a gap, we're not checking if we have all positions on every
poll anymore (as we used to before this PR). We're only checking for
`hasAllPositions` when we trigger a new `inflightPoll` (and`inflightPoll` is
not triggered on every consumer.poll). Is that check on every poll what we're
maybe missing?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]