kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118578191
> Hey @cadonna, the tricky bit is that, for some events, the request managers do expire requests too, so in this flow you described: > > > The event is processed in the ApplicationEventHandler and a request is added to the commit request manager. **Then the commit request manager is polled**, the requests are added to the network client and the the network client is polled > > When the manager is polled, if the event had timeout 0, it will be expired/cancelled before making it to the network thread. Currently we have 2 managers that do this (that I can remember): [TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86) and [CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168). So for those events, even with this PR, if they have timeout 0, they won't have a chance to complete. > > My point is not to bring more changes into this PR, only to have the whole situation in mind so we can address it properly (with multiple PRs). This other [PR](https://github.com/apache/kafka/pull/15844) attempts to address this situation I described, but only in the `CommitRequestManager` for instance. We still have to align on the approach there, and also handle it in the `TopicMetadataManager` I would say. I would expect that a combination of this PR and those others would allow us to get to a better point (now, even with this PR, we cannot make basic progress with a consumer being continuously polled with timeout 0 because `FetchCommittedOffsets` is always expired by the manager, for instance). I can easily repro it with the following integration test + poll(ZERO) (that I was surprised we have not covered, because TestUtils always polls with a non-zero timeout) > > ``` > // Ensure TestUtils polls with ZERO. This fails for the new consumer only. > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = { > val numMessages = 100 > val producer = createProducer() > sendRecords(producer, numMessages, tp) > > val consumer = createConsumer() > consumer.subscribe(Set(topic).asJava) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(numMessages, records.count()) > } > ``` > > Makes sense? Yes, the network layer changes are captured in KAFKA-16200 and build on top of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org