Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 merged PR #15843: URL: https://github.com/apache/kafka/pull/15843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1588449485 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: https://issues.apache.org/jira/browse/KAFKA-16659 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
kirktrue commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1588234154 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumer.commitSync() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit = { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
kirktrue commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1588233090 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: Thanks @chia7712 for catching that! Can you file a separate Jira to track? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
kirktrue commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1588224603 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumer.commitSync() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit = { Review Comment: > Since there is not specific file for positions-related tests I would suggest we move it to the generic `PlainTextConsumerTest`. Makes sense? I agree. Before I stuck it there, I looked around for any other tests named `test*Position*`. The only test I saw was `testPositionAndCommit` in `PlaintextConsumerCommitTest`, so 路♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
lianetm commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1587645917 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumer.commitSync() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit = { Review Comment: This test covers a position API behaviour that is unrelated to the committed offsets as I see it (we expect exactly the same when retrieving positions based on the partitions offsets from the leader instead of the committed offsets from the coordinator), so I would say it shouldn't be in this file ? Since there is not specific file for positions-related tests I would suggest we move it to the generic `PlainTextConsumerTest`. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1587003306 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: BTW, please let's me know if you have no free time. I'm fine to fix it if above bug I described is existent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1586996403 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: (this comment is unrelated to this PR) It seems `AsyncConsumer#position` does not honour `WakupException`? see following test ```scala @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @Timeout(10) def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { val topicPartition = new TopicPartition("abc", 15) val consumer = createConsumer() consumer.assign(List(topicPartition).asJava) val service = Executors.newSingleThreadExecutor() service.execute(() => { TimeUnit.SECONDS.sleep(1) consumer.wakeup() }) try assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) finally { service.shutdownNow() service.awaitTermination(1, TimeUnit.SECONDS) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
kirktrue opened a new pull request, #15843: URL: https://github.com/apache/kafka/pull/15843 The AsyncKafkaConsumer implementation of `position(TopicPartition, Duration)` was not updating its internal `Timer`, causing it to execute the loop forever. Adding a call to update the `Timer` at the bottom of the loop fixes the issue. An integration test was added to catch this case; it fails without the newly added call to `Timer.update(long)`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org