Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 merged PR #15803: URL: https://github.com/apache/kafka/pull/15803 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 commented on PR #15803: URL: https://github.com/apache/kafka/pull/15803#issuecomment-2093997309 @lianetm any feedbacks? I'd like to merge it later :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1587067096 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) { // Update the timer before we head into the loop in case it took a while to get the lock. timer.update(); -if (timer.isExpired()) +if (timer.isExpired()) { +// If the thread was interrupted before we start waiting, it still counts as +// interrupted from the point of view of the KafkaConsumer.poll(Duration) contract. +// We only need to check this when we are not going to wait because waiting +// already checks whether the thread is interrupted. +if (Thread.interrupted()) +throw new InterruptException("Thread interrupted."); Review Comment: OK. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1586968300 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) { // Update the timer before we head into the loop in case it took a while to get the lock. timer.update(); -if (timer.isExpired()) +if (timer.isExpired()) { +// If the thread was interrupted before we start waiting, it still counts as +// interrupted from the point of view of the KafkaConsumer.poll(Duration) contract. +// We only need to check this when we are not going to wait because waiting +// already checks whether the thread is interrupted. +if (Thread.interrupted()) +throw new InterruptException("Thread interrupted."); Review Comment: Maybe we should use the same exception message `Interrupted waiting for results from fetching records` for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1585852808 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { +consumer = newConsumer(); +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(singleton(tp)); + +// interrupt the thread and call poll +try { +Thread.currentThread().interrupt(); +assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); +} finally { +// clear interrupted state again since this thread may be reused by JUnit Review Comment: By calling `Thread.interrupted()`, the code is ensuring that the test does not exit with the thread still in an interrupted state. I have updated the comment accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
lianetm commented on PR #15803: URL: https://github.com/apache/kafka/pull/15803#issuecomment-2083525470 Thanks for the catch and the fix @AndrewJSchofield, left a nit, but LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
lianetm commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1583641482 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { +consumer = newConsumer(); +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(singleton(tp)); + +// interrupt the thread and call poll +try { +Thread.currentThread().interrupt(); +assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); +} finally { +// clear interrupted state again since this thread may be reused by JUnit Review Comment: Just to make the comment accurate, I expect that we need to flip the interrupted flag here so that the assertion down below polling again does not throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1580787182 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { Review Comment: Thanks for the review. KafkaConsumerTest is really in general very specific to the LegacyKafkaConsumer and the old consumer group protocol so I don't think merging this into that test is appropriate. But the TODO in that test is clearly incorrect. I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1580186766 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { Review Comment: Is it possible to merge this new test into https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L1263? If not, maybe we can remove the TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield opened a new pull request, #15803: URL: https://github.com/apache/kafka/pull/15803 The contract of KafkaConsumer.poll(Duration) says that it throws InterruptException "if the calling thread is interrupted before or while this function is called". The new KafkaConsumer implementation was not doing this if the thread was interrupted before the poll was called, specifically with a very short timeout. If it ever waited for records, it did check the thread state. If it did not wait for records because of a short timeout, it did not. Some of the log messages in the code were erroneously mentioned timeouts, when they really meant interruption. Also adds a test for this specific scenario. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org