lianetm commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776792978


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
         // poll once to update with the current metadata
         consumer.poll(Duration.ofMillis(0));
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+                "No metadata requests sent");
         client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
         // no error for no current position
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-        assertEquals(0, client.inFlightRequestCount());
-
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+            // on the next background thread poll.
+            assertEquals(0, client.inFlightRequestCount());
+        }
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
         consumer.poll(Duration.ofMillis(0));
         // requests: list-offset, fetch
-        assertEquals(2, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> {
+            boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+            boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+            return hasListOffsetRequest && hasFetchRequest;
+        }, "No list-offset & fetch request sent");
 
         // no error for no end offset (so unknown lag)
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
         // poll once again, which should return the list-offset response
         // and hence next call would return correct lag result
-        client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+        ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+        client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
         consumer.poll(Duration.ofMillis(0));
 
-        assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+        // For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+        TestUtils.waitForCondition(() -> {
+            OptionalLong result = consumer.currentLag(tp0);
+            return result.isPresent() && result.getAsLong() == 40L;
+        }, "Subscription state is not updated");
         // requests: fetch
-        assertEquals(1, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
         // one successful fetch should update the log end offset and the 
position
+        ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
         final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-        client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+        client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
         final ConsumerRecords<String, String> records = 
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
         assertEquals(55L, consumer.position(tp0));
 
         // correct lag result
-        assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+        // For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.

Review Comment:
   typo state



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
         // poll once to update with the current metadata
         consumer.poll(Duration.ofMillis(0));
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+                "No metadata requests sent");
         client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
         // no error for no current position
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-        assertEquals(0, client.inFlightRequestCount());
-
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+            // on the next background thread poll.
+            assertEquals(0, client.inFlightRequestCount());
+        }
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
         consumer.poll(Duration.ofMillis(0));
         // requests: list-offset, fetch
-        assertEquals(2, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> {
+            boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+            boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+            return hasListOffsetRequest && hasFetchRequest;
+        }, "No list-offset & fetch request sent");
 
         // no error for no end offset (so unknown lag)
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
         // poll once again, which should return the list-offset response
         // and hence next call would return correct lag result
-        client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+        ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+        client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
         consumer.poll(Duration.ofMillis(0));
 
-        assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+        // For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+        TestUtils.waitForCondition(() -> {
+            OptionalLong result = consumer.currentLag(tp0);
+            return result.isPresent() && result.getAsLong() == 40L;
+        }, "Subscription state is not updated");
         // requests: fetch
-        assertEquals(1, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
         // one successful fetch should update the log end offset and the 
position
+        ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
         final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-        client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+        client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
         final ConsumerRecords<String, String> records = 
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
         assertEquals(55L, consumer.position(tp0));
 
         // correct lag result
-        assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+        // For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+        TestUtils.waitForCondition(() -> {

Review Comment:
   Is this change really needed? In this case we just did a successful fetch, 
so position is updated to 55. We should be able to retrieve the lag of 45 (end 
offsets is already known to be 100). (Is not exactly the same case as above, 
where we needed to allow for the ListOffsets response to be processed in the 
background). Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to