lianetm commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1733248177
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2481,23 +2489,31 @@ public void testCurrentLag(GroupProtocol groupProtocol)
{
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch
- assertEquals(2, client.inFlightRequestCount());
+ TestUtils.waitForCondition(() -> {
+ boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
+ boolean fetchRequest = requestGenerated(client, ApiKeys.FETCH);
+ return hasListOffsetRequest && fetchRequest;
+ }, "No list-offset & fetch request sent");
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
// poll once again, which should return the list-offset response
// and hence next call would return correct lag result
- client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ Optional<ClientRequest> listOffsetRequest =
client.requests().stream().filter(request ->
request.requestBuilder().apiKey().equals(ApiKeys.LIST_OFFSETS)).findFirst();
Review Comment:
this line seems tricky to read and we need it twice (for now). Would it be
clearer to have a helper along the lines of `findRequest(client,
ApiKeys.LIST_OFFSETS)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]