Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-03 Thread via GitHub


chia7712 merged PR #15843:
URL: https://github.com/apache/kafka/pull/15843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-02 Thread via GitHub


chia7712 commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1588449485


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-16659



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-02 Thread via GitHub


kirktrue commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1588234154


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumer.commitSync()
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  @Timeout(15)
+  def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-02 Thread via GitHub


kirktrue commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1588233090


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   Thanks @chia7712 for catching that! Can you file a separate Jira to track? 
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-02 Thread via GitHub


kirktrue commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1588224603


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumer.commitSync()
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  @Timeout(15)
+  def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   > Since there is not specific file for positions-related tests I would 
suggest we move it to the generic `PlainTextConsumerTest`. Makes sense?
   
   I agree. Before I stuck it there, I looked around for any other tests named 
`test*Position*`. The only test I saw was `testPositionAndCommit` in 
`PlaintextConsumerCommitTest`, so 路‍♂️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-02 Thread via GitHub


lianetm commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1587645917


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -271,6 +272,19 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumer.commitSync()
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  @Timeout(15)
+  def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   This test covers a position API behaviour that is unrelated to the committed 
offsets as I see it (we expect exactly the same when retrieving positions based 
on the partitions offsets from the leader instead of the committed offsets from 
the coordinator), so I would say it shouldn't be in this file ? Since there is 
not specific file for positions-related tests I would suggest we move it to the 
generic `PlainTextConsumerTest`. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1587003306


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   BTW, please let's me know if you have no free time. I'm fine to fix it if 
above bug I described is existent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1586996403


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   (this comment is unrelated to this PR)
   
   It seems `AsyncConsumer#position` does not honour `WakupException`? see 
following test
   ```scala
 @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
 @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
 @Timeout(10)
 def testPositionRespectsWakeup(quorum: String, groupProtocol: String): 
Unit = {
   val topicPartition = new TopicPartition("abc", 15)
   val consumer = createConsumer()
   consumer.assign(List(topicPartition).asJava)
   val service = Executors.newSingleThreadExecutor()
   service.execute(() => {
 TimeUnit.SECONDS.sleep(1)
 consumer.wakeup()
   })
   try assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
   finally {
 service.shutdownNow()
 service.awaitTermination(1, TimeUnit.SECONDS)
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


kirktrue opened a new pull request, #15843:
URL: https://github.com/apache/kafka/pull/15843

   The AsyncKafkaConsumer implementation of `position(TopicPartition, 
Duration)` was not updating its internal `Timer`, causing it to execute the 
loop forever. Adding a call to update the `Timer` at the bottom of the loop 
fixes the issue.
   
   An integration test was added to catch this case; it fails without the newly 
added call to `Timer.update(long)`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org