FrankYang0529 commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776899718
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol)
{
// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
+ TestUtils.waitForCondition(() -> requestGenerated(client,
ApiKeys.FIND_COORDINATOR),
+ "No metadata requests sent");
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
// no error for no current position
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
- assertEquals(0, client.inFlightRequestCount());
-
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background thread
+ // on the next background thread poll.
+ assertEquals(0, client.inFlightRequestCount());
+ }
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch
- assertEquals(2, client.inFlightRequestCount());
+ TestUtils.waitForCondition(() -> {
+ boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
+ boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+ return hasListOffsetRequest && hasFetchRequest;
+ }, "No list-offset & fetch request sent");
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
// poll once again, which should return the list-offset response
// and hence next call would return correct lag result
- client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ ClientRequest listOffsetRequest = findRequest(client,
ApiKeys.LIST_OFFSETS);
+ client.respondToRequest(listOffsetRequest,
listOffsetsResponse(singletonMap(tp0, 90L)));
consumer.poll(Duration.ofMillis(0));
- assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+ // For AsyncKafkaConsumer, subscription sate is updated in background,
so the result will eventually be updated.
+ TestUtils.waitForCondition(() -> {
+ OptionalLong result = consumer.currentLag(tp0);
+ return result.isPresent() && result.getAsLong() == 40L;
+ }, "Subscription state is not updated");
// requests: fetch
- assertEquals(1, client.inFlightRequestCount());
+ TestUtils.waitForCondition(() -> requestGenerated(client,
ApiKeys.FETCH), "No fetch request sent");
// one successful fetch should update the log end offset and the
position
+ ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
- client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+ client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0,
fetchInfo)));
final ConsumerRecords<String, String> records =
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
// correct lag result
- assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+ // For AsyncKafkaConsumer, subscription sate is updated in background,
so the result will eventually be updated.
+ TestUtils.waitForCondition(() -> {
Review Comment:
Yes, you're right, if `consumer.position` can get `45, then the subscription
state has already been updated. Remove `TestUtils.waitForCondition` here.
Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]