lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2117763853

   Hey @cadonna, the tricky bit is that, for some events, the request managers 
do expire requests too, so in this flow you described:
   
   > The event is processed in the ApplicationEventHandler and a request is 
added to the commit request manager. **Then the commit request manager is 
polled**, the requests are added to the network client and the the network 
client is polled
   
   When the manager is polled, if the event had timeout 0, it will be 
expired/cancelled before making it to the network thread. Currently we have 2 
managers that do this (that I can remember): 
[TopicMetadataManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L86)
 and 
[CommitRequestManager](https://github.com/apache/kafka/blob/f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1168).
 So for those events, even with this PR, if they have timeout 0, they won't 
have a chance to complete.
   
   My point is not to bring more changes into this PR, only to have the whole 
situation in mind so we can address it properly (with multiple PRs). This other 
[PR](https://github.com/apache/kafka/pull/15844) attempts to address this 
situation I described, but only in the `CommitRequestManager` for instance. We 
still have to align on the approach there, and also handle it in the 
`TopicMetadataManager` I would say. I would expect that a combination of this 
PR and those others would allow us to get to a better point (now, even with 
this PR, we cannot make basic progress with a consumer being continuously 
polled with timeout 0 because `FetchCommittedOffsets` is always expired by the 
manager, for instance). I can easily repro it with the following integration 
test (that I was surprised we have not covered, because TestUtils always polls 
with a non-zero timeout)
   
   
   ```
     // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
     @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
     @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
     def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, 
groupProtocol: String): Unit = {
       val numMessages = 100
       val producer = createProducer()
       sendRecords(producer, numMessages, tp)
   
       val consumer = createConsumer()
       consumer.subscribe(Set(topic).asJava)
       val records = awaitNonEmptyRecords(consumer, tp)
       assertEquals(numMessages, records.count())
     }
   ```
   
   
   Makes sense?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to