lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2441146431
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2686,12 +2686,11 @@ public void testCurrentLag(GroupProtocol groupProtocol)
throws InterruptedExcept
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
- // requests: list-offset, fetch
- TestUtils.waitForCondition(() -> {
- boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
- boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
- return hasListOffsetRequest && hasFetchRequest;
- }, "No list-offset & fetch request sent");
Review Comment:
uhm this is suspicious. Why is the consumer not sending a fetch request if
it has a position? (set with `seek`)
I think that the missing bit is probably that on poll, we now trigger an
AsyncPoll event that needs to update positions before generating FETCH
requests. But if the position is updated from the app thread (with seek), we're
just reusing the same `inflightPoll` we had (blocked on the update positions)
and not sending a FETCH as we should.
One option could maybe be to always send the AsyncPoll to the background (to
make sure it can check, on every consumer.poll, if it has all positions
already), and keep the logic to reuse the inflight only in the background?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]