Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15803:
URL: https://github.com/apache/kafka/pull/15803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on PR #15803:
URL: https://github.com/apache/kafka/pull/15803#issuecomment-2093997309

   @lianetm any feedbacks? I'd like to merge it later :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-01 Thread via GitHub


AndrewJSchofield commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1587067096


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) {
 // Update the timer before we head into the loop in case it 
took a while to get the lock.
 timer.update();
 
-if (timer.isExpired())
+if (timer.isExpired()) {
+// If the thread was interrupted before we start waiting, 
it still counts as
+// interrupted from the point of view of the 
KafkaConsumer.poll(Duration) contract.
+// We only need to check this when we are not going to 
wait because waiting
+// already checks whether the thread is interrupted.
+if (Thread.interrupted())
+throw new InterruptException("Thread interrupted.");

Review Comment:
   OK. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1586968300


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) {
 // Update the timer before we head into the loop in case it 
took a while to get the lock.
 timer.update();
 
-if (timer.isExpired())
+if (timer.isExpired()) {
+// If the thread was interrupted before we start waiting, 
it still counts as
+// interrupted from the point of view of the 
KafkaConsumer.poll(Duration) contract.
+// We only need to check this when we are not going to 
wait because waiting
+// already checks whether the thread is interrupted.
+if (Thread.interrupted())
+throw new InterruptException("Thread interrupted.");

Review Comment:
   Maybe we should use the same exception message `Interrupted waiting for 
results from fetching records` for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-30 Thread via GitHub


AndrewJSchofield commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1585852808


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(singleton(tp));
+
+// interrupt the thread and call poll
+try {
+Thread.currentThread().interrupt();
+assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
+} finally {
+// clear interrupted state again since this thread may be reused 
by JUnit

Review Comment:
   By calling `Thread.interrupted()`, the code is ensuring that the test does 
not exit with the thread still in an interrupted state. I have updated the 
comment accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-29 Thread via GitHub


lianetm commented on PR #15803:
URL: https://github.com/apache/kafka/pull/15803#issuecomment-2083525470

   Thanks for the catch and the fix @AndrewJSchofield, left a nit, but LGTM. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-29 Thread via GitHub


lianetm commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1583641482


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(singleton(tp));
+
+// interrupt the thread and call poll
+try {
+Thread.currentThread().interrupt();
+assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
+} finally {
+// clear interrupted state again since this thread may be reused 
by JUnit

Review Comment:
   Just to make the comment accurate, I expect that we need to flip the 
interrupted flag here so that the assertion down below polling again does not 
throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-26 Thread via GitHub


AndrewJSchofield commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1580787182


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {

Review Comment:
   Thanks for the review.
   
   KafkaConsumerTest is really in general very specific to the 
LegacyKafkaConsumer and the old consumer group protocol so I don't think 
merging this into that test is appropriate. But the TODO in that test is 
clearly incorrect. I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1580186766


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {

Review Comment:
   Is it possible to merge this new test into 
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L1263?
 If not, maybe we can remove the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-25 Thread via GitHub


AndrewJSchofield opened a new pull request, #15803:
URL: https://github.com/apache/kafka/pull/15803

   The contract of KafkaConsumer.poll(Duration) says that it throws 
InterruptException "if the calling thread is interrupted before or while this 
function is called". The new KafkaConsumer implementation was not doing this if 
the thread was interrupted before the poll was called, specifically with a very 
short timeout. If it ever waited for records, it did check the thread state. If 
it did not wait for records because of a short timeout, it did not.
   
   Some of the log messages in the code were erroneously mentioned timeouts, 
when they really meant interruption.
   
   Also adds a test for this specific scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org