[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception
[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
Lianet Magrans created KAFKA-16777: -- Summary: New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy Key: KAFKA-16777 URL: https://issues.apache.org/jira/browse/KAFKA-16777 Project: Kafka Issue Type: Bug Components: consumer Reporter: Lianet Magrans If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846446#comment-17846446 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], sorry I had missed your last question. The new group rebalance protocol from KIP-848 is supported in KRaft mode only. > AsyncKafkaConsumer removes offset fetch responses from cache too aggressively > - > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
[ https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16766: --- Description: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the same timeout message in the LegacyConsumer (defined [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), but do not have a clear message in the Async, so we should fix them all 3. With the fix, we should write tests for each func, like the ones defined for the Legacy Consumer ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). Note that we would need different tests, added to AsyncKafkaConsumerTest, given that the async consumer issues a FindCoordinator request in this case, but the AsyncConsumer does, so it does not account for that when matching requests/responses in the current tests. was: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. > New consumer offsetsForTimes timeout exception does not have the proper > message > --- > > Key: KAFKA-16766 > URL: https://issues.apache.org/jira/browse/KAFKA-16766 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer > will throw a org.apache.kafka.common.errors.TimeoutException as expected, but > with the following as message: "java.util.concurrent.TimeoutException". > We should provide a clearer message, and I would even say we keep the same > message that the LegacyConsumer shows in this case, ex: "Failed to get > offsets by times in 6ms". > To fix this we should consider catching the timeout exception in the consumer > when offsetsForTimes result times out > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), > and propagate it with the message specific to offsetsForTimes. > Same situation exists for beginningOffsets and endOffsets. All 3 funcs show > the same timeout message in the LegacyConsumer (defined > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), > but do not have a clear message in the Async, so we should fix them all 3. > With the fix, we should
[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
Lianet Magrans created KAFKA-16766: -- Summary: New consumer offsetsForTimes timeout exception does not have the proper message Key: KAFKA-16766 URL: https://issues.apache.org/jira/browse/KAFKA-16766 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16764: --- Description: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) This is probably what makes that [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] fails for the new consumer. Once this bug is fixed, we should be able to enable that test for the new consumer. was: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
Lianet Magrans created KAFKA-16764: -- Summary: New consumer should throw InvalidTopicException on poll when invalid topic in metadata Key: KAFKA-16764 URL: https://issues.apache.org/jira/browse/KAFKA-16764 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16406: --- Labels: consumer kip-848-client-support (was: ) > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer, kip-848-client-support > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16406. -- > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16406. Resolution: Fixed > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846315#comment-17846315 ] Lianet Magrans commented on KAFKA-16758: Hey, I would be happy to take on this one! I expect I could make time for it after 3.8. Thanks! > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16737: -- Assignee: Lianet Magrans > Clean up KafkaConsumerTest TODOs enabling tests for new consumer > > > Key: KAFKA-16737 > URL: https://issues.apache.org/jira/browse/KAFKA-16737 > Project: Kafka > Issue Type: Task > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are > only enabled for the CLASSIC protocol and should be reviewed and enabled for > the new CONSUMER group protocol when applicable. Some tests also have TODOs > to enable them for the new consumer when certain features/bugs are addressed. > The new protocol and consumer implementation have evolved a lot since those > TODOs where added, so we should review them all, enable tests for the new > protocol when applicable and removing the TODOs from the code. Note that > there is another AsyncKafkaConsumerTest.java, testing logic specific to the > internals of the new consumer, but still many tests in the KafkaConsumerTest > apply to both the new and legacy consumer, and we should enable them for > both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16737: --- Affects Version/s: 3.7.0 > Clean up KafkaConsumerTest TODOs enabling tests for new consumer > > > Key: KAFKA-16737 > URL: https://issues.apache.org/jira/browse/KAFKA-16737 > Project: Kafka > Issue Type: Task > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are > only enabled for the CLASSIC protocol and should be reviewed and enabled for > the new CONSUMER group protocol when applicable. Some tests also have TODOs > to enable them for the new consumer when certain features/bugs are addressed. > The new protocol and consumer implementation have evolved a lot since those > TODOs where added, so we should review them all, enable tests for the new > protocol when applicable and removing the TODOs from the code. Note that > there is another AsyncKafkaConsumerTest.java, testing logic specific to the > internals of the new consumer, but still many tests in the KafkaConsumerTest > apply to both the new and legacy consumer, and we should enable them for > both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16737: --- Fix Version/s: 3.8.0 > Clean up KafkaConsumerTest TODOs enabling tests for new consumer > > > Key: KAFKA-16737 > URL: https://issues.apache.org/jira/browse/KAFKA-16737 > Project: Kafka > Issue Type: Task > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are > only enabled for the CLASSIC protocol and should be reviewed and enabled for > the new CONSUMER group protocol when applicable. Some tests also have TODOs > to enable them for the new consumer when certain features/bugs are addressed. > The new protocol and consumer implementation have evolved a lot since those > TODOs where added, so we should review them all, enable tests for the new > protocol when applicable and removing the TODOs from the code. Note that > there is another AsyncKafkaConsumerTest.java, testing logic specific to the > internals of the new consumer, but still many tests in the KafkaConsumerTest > apply to both the new and legacy consumer, and we should enable them for > both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16737: --- Priority: Blocker (was: Major) > Clean up KafkaConsumerTest TODOs enabling tests for new consumer > > > Key: KAFKA-16737 > URL: https://issues.apache.org/jira/browse/KAFKA-16737 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are > only enabled for the CLASSIC protocol and should be reviewed and enabled for > the new CONSUMER group protocol when applicable. Some tests also have TODOs > to enable them for the new consumer when certain features/bugs are addressed. > The new protocol and consumer implementation have evolved a lot since those > TODOs where added, so we should review them all, enable tests for the new > protocol when applicable and removing the TODOs from the code. Note that > there is another AsyncKafkaConsumerTest.java, testing logic specific to the > internals of the new consumer, but still many tests in the KafkaConsumerTest > apply to both the new and legacy consumer, and we should enable them for > both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
Lianet Magrans created KAFKA-16737: -- Summary: Clean up KafkaConsumerTest TODOs enabling tests for new consumer Key: KAFKA-16737 URL: https://issues.apache.org/jira/browse/KAFKA-16737 Project: Kafka Issue Type: Task Components: consumer Reporter: Lianet Magrans KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are only enabled for the CLASSIC protocol and should be reviewed and enabled for the new CONSUMER group protocol when applicable. Some tests also have TODOs to enable them for the new consumer when certain features/bugs are addressed. The new protocol and consumer implementation have evolved a lot since those TODOs where added, so we should review them all, enable tests for the new protocol when applicable and removing the TODOs from the code. Note that there is another AsyncKafkaConsumerTest.java, testing logic specific to the internals of the new consumer, but still many tests in the KafkaConsumerTest apply to both the new and legacy consumer, and we should enable them for both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
[ https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16695: --- Fix Version/s: 3.8.0 > Improve expired poll interval logging by showing exceeded time > -- > > Key: KAFKA-16695 > URL: https://issues.apache.org/jira/browse/KAFKA-16695 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When a consumer poll iteration takes longer than the max.poll.interval, the > consumer logs a warn suggesting that the max.poll.interval config was > exceeded, and pro-actively leaves the group. The log suggests to consider > adjusting the max.poll.interval.config which should help in the cases of long > processing times. We should consider adding the info of how much time the > interval was exceeded, since it could be helpful in guiding the user to > effectively adjust the config. This is done in other clients, that log this > kind of messages in this situation: > {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust > max.poll.interval.ms for long-running message processing): leaving > group{quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
[ https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16695: --- Labels: kip-848-client-support (was: ) > Improve expired poll interval logging by showing exceeded time > -- > > Key: KAFKA-16695 > URL: https://issues.apache.org/jira/browse/KAFKA-16695 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > When a consumer poll iteration takes longer than the max.poll.interval, the > consumer logs a warn suggesting that the max.poll.interval config was > exceeded, and pro-actively leaves the group. The log suggests to consider > adjusting the max.poll.interval.config which should help in the cases of long > processing times. We should consider adding the info of how much time the > interval was exceeded, since it could be helpful in guiding the user to > effectively adjust the config. This is done in other clients, that log this > kind of messages in this situation: > {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust > max.poll.interval.ms for long-running message processing): leaving > group{quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
Lianet Magrans created KAFKA-16695: -- Summary: Improve expired poll interval logging by showing exceeded time Key: KAFKA-16695 URL: https://issues.apache.org/jira/browse/KAFKA-16695 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans When a consumer poll iteration takes longer than the max.poll.interval, the consumer logs a warn suggesting that the max.poll.interval config was exceeded, and pro-actively leaves the group. The log suggests to consider adjusting the max.poll.interval.config which should help in the cases of long processing times. We should consider adding the info of how much time the interval was exceeded, since it could be helpful in guiding the user to effectively adjust the config. This is done in other clients, that log this kind of messages in this situation: {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group{quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16665. -- > Fail to get partition's position from within onPartitionsAssigned callback in > new consumer > --- > > Key: KAFKA-16665 > URL: https://issues.apache.org/jira/browse/KAFKA-16665 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If we attempt to call consumer.position(tp) from within the > onPartitionsAssigned callback, the new consumer fails with a > TimeoutException. The expectation is that we should be able to retrieve the > position of newly assigned partitions, as it happens with the legacy > consumer, that allows this call. This is actually used from places within > Kafka itself (ex. Connect > [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) > > The failure in the new consumer is because the partitions that are assigned > but awaiting the onPartitionsAssigned callback, are excluded from the list of > partitions that should initialize. We should allow the partitions to > initialize their positions, without allowing to fetch data from them (which > is already achieve based on the isFetchable flag in the subscription state). > Note that a partition position can be updated from 2 places: call to > consumer.position or call to consumer.poll. Both will attempt to > `updateFetchPositions` if there is no valid position yet, but even after > having a valid position after those calls, the partition will remain > non-fetchable until the onPartitionsAssigned callback completes (fetchable > considers that the partitions has a valid position AND is not awaiting the > callback) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887 ] Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM: Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe (HeartbeatRequest) until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's doomed to return empty, because it is not waiting for records from the topics you're interested in (no partitions assigned yet). Could you make sure that the test is calling poll after the assignment has been received? (I would suggest just polling while true for a certain amount of time, no sleeping after the subscribe needed). This integration test for the consumer [testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153] has a very similar logic to the one you're trying to achieve (create consumer, subscribe right away and consume), and since a new broker and consumer are setup for each test, the test will go down the same path of having to find a coordinator before sending the HeartbeatRequest with a subscription. The main difference from looking at both seems to be the limited number of polls in your failed test scenario, so let's try to rule that out to better isolate the situation. Hope it helps! Let me know was (Author: JIRAUSER300183): Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's
[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887 ] Lianet Magrans commented on KAFKA-16670: Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's doomed to return empty, because it is not waiting for records from the topics you're interested in (no partitions assigned yet). Could you make sure that the test is calling poll after the assignment has been received? (I would suggest just polling while true for a certain amount of time, no sleeping after the subscribe needed). This integration test for the consumer [testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153] has a very similar logic to the one you're trying to achieve (create consumer, subscribe right away and consume), and since a new broker and consumer are setup for each test, the test will go down the same path of having to find a coordinator before sending the HeartbeatRequest with a subscription. The main difference from looking at both seems to be the limited number of polls in your failed test scenario, so let's try to rule that out to better isolate the situation. Hope it helps! Let me know > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case >
[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file
Lianet Magrans created KAFKA-16675: -- Summary: Move rebalance callback test for positions to callbacks test file Key: KAFKA-16675 URL: https://issues.apache.org/jira/browse/KAFKA-16675 Project: Kafka Issue Type: Task Components: consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Integration test testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added to the PlaintextConsumerTest.scala in this PR https://github.com/apache/kafka/pull/15856, as there was no specific file for testing callbacks at the moment. Another PR is in-flight, adding the file for callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 gets merged, we should move testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843324#comment-17843324 ] Lianet Magrans commented on KAFKA-16474: Yes, makes sense, that PR is addressing only point 1. of what you reported, so we still have the point about the poll frequency (maybe to address separately if we still believe it's a concern) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843318#comment-17843318 ] Lianet Magrans commented on KAFKA-16474: Hey [~pnee], this is the PR https://github.com/apache/kafka/pull/15723 from [~kirktrue] that fixes a bug that for sure was leading to double heartbeats (I guess it's what you were seeing here, to double check) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16665: --- Description: If we attempt to call consumer.position(tp) from within the onPartitionsAssigned callback, the new consumer fails with a TimeoutException. The expectation is that we should be able to retrieve the position of newly assigned partitions, as it happens with the legacy consumer, that allows this call. This is actually used from places within Kafka itself (ex. Connect [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) The failure in the new consumer is because the partitions that are assigned but awaiting the onPartitionsAssigned callback, are excluded from the list of partitions that should initialize. We should allow the partitions to initialize their positions, without allowing to fetch data from them (which is already achieve based on the isFetchable flag in the subscription state). Note that a partition position can be updated from 2 places: call to consumer.position or call to consumer.poll. Both will attempt to `updateFetchPositions` if there is no valid position yet, but even after having a valid position after those calls, the partition will remain non-fetchable until the onPartitionsAssigned callback completes (fetchable considers that the partitions has a valid position AND is not awaiting the callback) was: If we attempt to call consumer.position(tp) from within the onPartitionsAssigned callback, the new consumer fails with a TimeoutException. The expectation is that we should be able to retrieve the position of newly assigned partitions, as it happens with the legacy consumer, that allows this call. This is actually used from places within Kafka itself (ex. Connect [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) The failure in the new consumer is because the partitions that are assigned but awaiting the onPartitionsAssigned callback, are excluded from the list of partitions that should initialize. We should allow the partitions to initialize their positions, without allowing to fetch data from them (which is already achieve based on the isFetchable flag in the subscription state). > Fail to get partition's position from within onPartitionsAssigned callback in > new consumer > --- > > Key: KAFKA-16665 > URL: https://issues.apache.org/jira/browse/KAFKA-16665 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If we attempt to call consumer.position(tp) from within the > onPartitionsAssigned callback, the new consumer fails with a > TimeoutException. The expectation is that we should be able to retrieve the > position of newly assigned partitions, as it happens with the legacy > consumer, that allows this call. This is actually used from places within > Kafka itself (ex. Connect > [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) > > The failure in the new consumer is because the partitions that are assigned > but awaiting the onPartitionsAssigned callback, are excluded from the list of > partitions that should initialize. We should allow the partitions to > initialize their positions, without allowing to fetch data from them (which > is already achieve based on the isFetchable flag in the subscription state). > Note that a partition position can be updated from 2 places: call to > consumer.position or call to consumer.poll. Both will attempt to > `updateFetchPositions` if there is no valid position yet, but even after > having a valid position after those calls, the partition will remain > non-fetchable until the onPartitionsAssigned callback completes (fetchable > considers that the partitions has a valid position AND is not awaiting the > callback) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer
Lianet Magrans created KAFKA-16665: -- Summary: Fail to get partition's position from within onPartitionsAssigned callback in new consumer Key: KAFKA-16665 URL: https://issues.apache.org/jira/browse/KAFKA-16665 Project: Kafka Issue Type: Task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 If we attempt to call consumer.position(tp) from within the onPartitionsAssigned callback, the new consumer fails with a TimeoutException. The expectation is that we should be able to retrieve the position of newly assigned partitions, as it happens with the legacy consumer, that allows this call. This is actually used from places within Kafka itself (ex. Connect [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) The failure in the new consumer is because the partitions that are assigned but awaiting the onPartitionsAssigned callback, are excluded from the list of partitions that should initialize. We should allow the partitions to initialize their positions, without allowing to fetch data from them (which is already achieve based on the isFetchable flag in the subscription state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842765#comment-17842765 ] Lianet Magrans commented on KAFKA-16514: Just to answer the question above: {quote}what is the purpose of -2 code? In the end, not sending any request, with a large enough session timeout, no rebalance would be triggered anyway? What does change is we send -2 instead of just not sending any leaver group request on close()?{quote} The purpose is just to set the intention explicitly at the protocol level (and not assume it). This is mainly to allow for richer semantics around the static membership leave in the future. It does not make a difference at the moment (over not sending the leave group), but it does allow to cleanly extend the current logic if we ever want to, and allow static members to leave permanently by sending a -1 epoch on the leave group. That would effectively allow to remove a static members from a group (which can only be achieved now either waiting for the session timeout to expire, or via the admin api) Anyways, that's just food for thought for now. The KIP extending the consumer close with options seems sensible to solve the current situation, and would align nicely with any future extension of the static leave semantics if we ever go down that path. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 8:25 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your client code again, locally starting a broker in kraft mode, with the default config, only adding `group.coordinator.rebalance.protocols=consumer,classic`. 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest taking a look and share the broker logs to understand more about what's going on on your setup. If all looks good there maybe provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback. Hope it helps! was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps! > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker |
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps! was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) >
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: {quote}for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1{quote} Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. >
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: {quote}for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1{quote} Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842160#comment-17842160 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], thanks for reporting this. I wonder if you're hitting a known issue with the consumer api and timeous (being fixed with https://issues.apache.org/jira/browse/KAFKA-16200 and https://issues.apache.org/jira/browse/KAFKA-15974). I tried your code, changing only the call poll from what you had `kafkaConsumer.poll(Duration.ZERO)` , to provide a non-zero duration, and it all worked as expected. So I guess it could be related to the timeout enforcement issues being fixed on the consumer side. > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16465. -- > New consumer does not invoke rebalance callbacks as expected in > consumer_test.py system test > > > Key: KAFKA-16465 > URL: https://issues.apache.org/jira/browse/KAFKA-16465 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test fails with the following error: > {noformat} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 1 minute 29.511 seconds > AssertionError('Total revoked count 0 does not match the expectation of > having 0 revokes as 0') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 254, in test_static_consumer_bounce > (num_revokes_after_bounce, check_condition) > AssertionError: Total revoked count 0 does not match the expectation of > having 0 revokes as 0 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16459) New consumer times out joining group in consumer_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16459. -- > New consumer times out joining group in consumer_test.py system test > > > Key: KAFKA-16459 > URL: https://issues.apache.org/jira/browse/KAFKA-16459 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test fails with two different errors related > to consumers joining the consumer group in a timely fashion. > {quote} > * Consumers failed to join in a reasonable amount of time > * Timed out waiting for consumers to join, expected total X joined, but only > see Y joined fromnormal consumer group and Z from conflict consumer > group{quote} > Affected tests: > * {{test_fencing_static_consumer}} > * {{test_static_consumer_bounce}} > * {{test_static_consumer_persisted_after_rejoin}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841986#comment-17841986 ] Lianet Magrans commented on KAFKA-16514: If my understanding here is right, this seems like a fair requirement to have the ability to dynamically decide if a member should leave the group, regardless of its dynamic/static membership. We've actually had conversations about this while working on the new consumer group protocol, where static members do send a leave group request when leaving, and they send it with an epoch that explicitly indicates it's a temporary leaving (-2). For now this is the only way static members leave (temporarily), but the ground is set if ever in the future we decide that we want to allow static members to send a definitive leave group (ex. -1 epoch, like dynamic members do). So with this context, and back to the streams situation here, I wonder if we would be better overall if we bring a less hacky solution and enable a close with richer semantics at the consumer level, that would allow to have a close where options/params could be passed to dynamically indicate how to close (temporarily, definitely, ...), and then align this nicely with the new protocol (and make it work with the legacy one). Thoughts? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager
[ https://issues.apache.org/jira/browse/KAFKA-16628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16628: -- Assignee: Lianet Magrans > Add system test for validating static consumer bounce and assignment when not > eager > --- > > Key: KAFKA-16628 > URL: https://issues.apache.org/jira/browse/KAFKA-16628 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > Existing system > [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209] > include a test for validating that partitions are not re-assigned when a > static member is bounced, but the test design and setup is intended for > testing this for the Eager assignment strategy only (based on the eager > protocol where all dynamic members revoke their partitions when a rebalance > happens). > We should considering adding a test that would ensure that partitions are not > re-assigned when using the cooperative sticky assignor or the new consumer > group protocol assignments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager
Lianet Magrans created KAFKA-16628: -- Summary: Add system test for validating static consumer bounce and assignment when not eager Key: KAFKA-16628 URL: https://issues.apache.org/jira/browse/KAFKA-16628 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Lianet Magrans Existing system [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209] include a test for validating that partitions are not re-assigned when a static member is bounced, but the test design and setup is intended for testing this for the Eager assignment strategy only (based on the eager protocol where all dynamic members revoke their partitions when a rebalance happens). We should considering adding a test that would ensure that partitions are not re-assigned when using the cooperative sticky assignor or the new consumer group protocol assignments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16528) Reset member heartbeat interval when request sent
[ https://issues.apache.org/jira/browse/KAFKA-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16528. -- > Reset member heartbeat interval when request sent > - > > Key: KAFKA-16528 > URL: https://issues.apache.org/jira/browse/KAFKA-16528 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Member should reset the heartbeat timer when the request is sent, rather than > when a response is received. This aims to ensure that we don't add-up to > interval any delay there might be in a response. With this, we better respect > the contract of members sending HB on the interval to remain in the group, > and avoid potential unwanted rebalances. > Note that there is already a logic in place to avoid sending a request if a > response hasn't been received. So that will ensure that, even with the reset > of the interval on the send, the next HB will only be sent as when the > response is received. (Will be sent out on the next poll of the HB manager, > and respecting the minimal backoff for sending consecutive requests). This > will btw be consistent with how the interval timing & in-flights is handled > for auto-commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
[ https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839096#comment-17839096 ] Lianet Magrans commented on KAFKA-16493: Hey [~phuctran], any progress on this one? Even though it's a performance improvement I'm afraid it's a very sensitive one given that it would affect the poll loop, so we need to make sure it makes it into 3.8 (it had the wrong fix version before, I just updated it). Let me know if you have any questions. Thanks! > Avoid unneeded subscription regex check if metadata version unchanged > - > > Key: KAFKA-16493 > URL: https://issues.apache.org/jira/browse/KAFKA-16493 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > When using pattern subscription (java pattern), the new consumer regularly > checks if the list of topics that match the regex has changed. This is done > as part of the consumer poll loop, and it evaluates the regex using the > latest cluster metadata. As an improvement, we should avoid evaluating the > regex if the metadata version hasn't changed (similar to what the legacy > coordinator does > [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
[ https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16493: --- Fix Version/s: 3.8.0 (was: 4.0.0) > Avoid unneeded subscription regex check if metadata version unchanged > - > > Key: KAFKA-16493 > URL: https://issues.apache.org/jira/browse/KAFKA-16493 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When using pattern subscription (java pattern), the new consumer regularly > checks if the list of topics that match the regex has changed. This is done > as part of the consumer poll loop, and it evaluates the regex using the > latest cluster metadata. As an improvement, we should avoid evaluating the > regex if the metadata version hasn't changed (similar to what the legacy > coordinator does > [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Labels: kip-848-client-support system-tests (was: kip-848-client-support) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Component/s: clients consumer system tests > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:59 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new protocol/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Summary: Update consumer static membership fencing system test to support new protocol (was: Update static membership fencing system test to support new protocol) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:34 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:31 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that was parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:30 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that was parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans commented on KAFKA-16566: Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol
Lianet Magrans created KAFKA-16566: -- Summary: Update static membership fencing system test to support new protocol Key: KAFKA-16566 URL: https://issues.apache.org/jira/browse/KAFKA-16566 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer that verifies the sequence in which static members join a group when using conflicting instance id. This behaviour is different in the classic and consumer protocol, so the tests should be updated to set the right expectations when running with the new consumer protocol. Note that what the tests covers (params, setup), apply to both protocols. It is the expected results that are not the same. When conflicts between static members joining a group: Classic protocol: all members join the group with the same group instance id, and then the first one will eventually receive a HB error with FencedInstanceIdException Consumer protocol: new member with an instance Id already in use is not able to join, receiving an UnreleasedInstanceIdException in the response to the HB to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16528) Reset member heartbeat interval when request sent
Lianet Magrans created KAFKA-16528: -- Summary: Reset member heartbeat interval when request sent Key: KAFKA-16528 URL: https://issues.apache.org/jira/browse/KAFKA-16528 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 Member should reset the heartbeat timer when the request is sent, rather than when a response is received. This aims to ensure that we don't add-up to interval any delay there might be in a response. With this, we better respect the contract of members sending HB on the interval to remain in the group, and avoid potential unwanted rebalances. Note that there is already a logic in place to avoid sending a request if a response hasn't been received. So that will ensure that, even with the reset of the interval on the send, the next HB will only be sent as when the response is received. (Will be sent out on the next poll of the HB manager, and respecting the minimal backoff for sending consecutive requests). This will btw be consistent with how the interval timing & in-flights is handled for auto-commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
Lianet Magrans created KAFKA-16493: -- Summary: Avoid unneeded subscription regex check if metadata version unchanged Key: KAFKA-16493 URL: https://issues.apache.org/jira/browse/KAFKA-16493 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When using pattern subscription (java pattern), the new consumer regularly checks if the list of topics that match the regex has changed. This is done as part of the consumer poll loop, and it evaluates the regex using the latest cluster metadata. As an improvement, we should avoid evaluating the regex if the metadata version hasn't changed (similar to what the legacy coordinator does [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16474: -- Assignee: Lianet Magrans (was: Philip Nee) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16418) Review/split long-running admin client integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16418: --- Summary: Review/split long-running admin client integration tests (was: Split long-running admin client integration tests) > Review/split long-running admin client integration tests > > > Key: KAFKA-16418 > URL: https://issues.apache.org/jira/browse/KAFKA-16418 > Project: Kafka > Issue Type: Task > Components: clients >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > Review PlaintextAdminIntegrationTest and attempt to split it to allow for > parallelization and improve build times. This tests is the longest running > integration test in kafka.api, so a similar approach to what has been done > with the consumer tests in PlaintextConsumerTest should be a good > improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15843: --- Priority: Minor (was: Major) > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Minor > Labels: callback, kip-848-client-support, reconciliation > Fix For: 4.0.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost). > Note that the consumer integration tests rely on this call to > onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831050#comment-17831050 ] Lianet Magrans commented on KAFKA-15538: Hey [~kirktrue], I noticed this stayed without a response but @phuc already had an approach in his PR, as part of the poll loop. That was not being done before (given that we had no regex support) > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16418) Split long-running admin client integration tests
Lianet Magrans created KAFKA-16418: -- Summary: Split long-running admin client integration tests Key: KAFKA-16418 URL: https://issues.apache.org/jira/browse/KAFKA-16418 Project: Kafka Issue Type: Task Components: clients Reporter: Lianet Magrans Assignee: Lianet Magrans Review PlaintextAdminIntegrationTest and attempt to split it to allow for parallelization and improve build times. This tests is the longest running integration test in kafka.api, so a similar approach to what has been done with the consumer tests in PlaintextConsumerTest should be a good improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16375. Resolution: Fixed > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining > while reconciling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16406) Split long-running consumer integration test
Lianet Magrans created KAFKA-16406: -- Summary: Split long-running consumer integration test Key: KAFKA-16406 URL: https://issues.apache.org/jira/browse/KAFKA-16406 Project: Kafka Issue Type: Task Reporter: Lianet Magrans Assignee: Lianet Magrans PlaintextConsumerTest contains integration tests for the consumer. Since the introduction of the new consumer group protocol (KIP-848) and the new KafkaConsumer, this test has been parametrized to run with multiple combinations, making sure we test the logic for the old and new coordinator, as well as for the legacy and new KafkaConsumer. This led to this being one of the longest-running integration tests, so in the aim of reducing the impact on the build times we could split it to allow for parallelization. The tests covers multiple areas of the consumer logic, in a single file, so splitting based on the high-level features being tested would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Description: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining while reconciling. (was: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. ) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining > while reconciling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827990#comment-17827990 ] Lianet Magrans commented on KAFKA-15551: Also, given how tight the deadline is to get the fix/PR in, I would suggest we focus on the new consumer only. If we find things that could be improved in this sense in the old one, we could file a separate Jira for it and tackle it afterwards. > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827989#comment-17827989 ] Lianet Magrans commented on KAFKA-15551: Hey [~zxcoccer], thanks for jumping in! This one should definitely be a simple one, as I know that we do handle it already for some API calls (ex. validate positions early return if no positions to validate [here|https://github.com/apache/kafka/blob/5c929874b88b3b96f650de0f733d93d42ac535a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L227]). Still, we wanted to verify that we are doing similarly for all the API calls, to ensure that we're not doing unneeded processing/requests. We want to have this for 3.8, so the deadline is really tight, but if you have availability it would be a great help, feel free to re-assign it to you and ping me anytime if you have questions. Thanks! > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Fix Version/s: 3.8.0 > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827589#comment-17827589 ] Lianet Magrans commented on KAFKA-16375: Yes, I just updated the version ;) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Description: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. (was: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. As a potential improvement, consider if the member could keep the reconciliation if it rejoined but got the same assignment.) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Priority: Critical (was: Major) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. > As a potential improvement, consider if the member could keep the > reconciliation if it rejoined but got the same assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
Lianet Magrans created KAFKA-16375: -- Summary: Fix logic for discarding reconciliation if member rejoined Key: KAFKA-16375 URL: https://issues.apache.org/jira/browse/KAFKA-16375 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. As a potential improvement, consider if the member could keep the reconciliation if it rejoined but got the same assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826080#comment-17826080 ] Lianet Magrans commented on KAFKA-15538: Hey [~phuctran], any update regarding this one? We're getting close to the deadline, and given that this one is needed for feature parity with the old consumer, I think we should prioritize it over https://issues.apache.org/jira/browse/KAFKA-15561. Thanks! > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16301) Review fenced member unsubscribe/subscribe callbacks interaction
Lianet Magrans created KAFKA-16301: -- Summary: Review fenced member unsubscribe/subscribe callbacks interaction Key: KAFKA-16301 URL: https://issues.apache.org/jira/browse/KAFKA-16301 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans When a member gets fenced, it triggers the onPartitionsLost callback if any, and then rejoins the group. If while the callback completes the member attempts to leave the group (ex. unsubscribe), the leave operation detects that the member is already removed from the group (fenced), and just aligns the client state with the current broker state, and marks the client as UNSUBSCRIBED (client side state for not in group). This means that the member could attempt to rejoin the group if the user calls subscribe, get an assignment, and trigger onPartitionsAssigned, when maybe the onPartitionsLost hasn't completed. This approach keeps the client state machine simple given that it does not need to block the new member (it will effectively be a new member because the old one got fenced). The new member could rejoin, get an assignment and make progress. Downside is that it would potentially allow for overlapped callback executions (lost and assign) in the above edge case, which is not the behaviour in the old coordinator. Review and validate. Alternative would definitely require more complex logic on the client to ensure that we do not allow a new member to rejoin until the fenced one completes the callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
[ https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16298: --- Description: When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new coordinator executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, [passing the error along with the event here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] to the background thread, but does not seem to propagate the exception to the user. (was: When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new coordinator executes callbacks in the application thread, and sends en event to the background with the callback result ([here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L252]), but does not seem to propagate the exception to the user. ) > Ensure user callbacks exceptions are propagated to the user on consumer poll > > > Key: KAFKA-16298 > URL: https://issues.apache.org/jira/browse/KAFKA-16298 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > When user-defined callbacks fail with an exception, the expectation is that > the error should be propagated to the user as a KafkaExpception and break the > poll loop (behaviour in the legacy coordinator). The new coordinator executes > callbacks in the application thread, and sends an event to the background > with the callback result and error if any, [passing the error along with the > event > here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] > to the background thread, but does not seem to propagate the exception to > the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
[ https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16298: --- Description: When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new consumer executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, [passing the error along with the event here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] to the background thread, but does not seem to propagate the exception to the user. (was: When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new coordinator executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, [passing the error along with the event here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] to the background thread, but does not seem to propagate the exception to the user. ) > Ensure user callbacks exceptions are propagated to the user on consumer poll > > > Key: KAFKA-16298 > URL: https://issues.apache.org/jira/browse/KAFKA-16298 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > When user-defined callbacks fail with an exception, the expectation is that > the error should be propagated to the user as a KafkaExpception and break the > poll loop (behaviour in the legacy coordinator). The new consumer executes > callbacks in the application thread, and sends an event to the background > with the callback result and error if any, [passing the error along with the > event > here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] > to the background thread, but does not seem to propagate the exception to > the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
Lianet Magrans created KAFKA-16298: -- Summary: Ensure user callbacks exceptions are propagated to the user on consumer poll Key: KAFKA-16298 URL: https://issues.apache.org/jira/browse/KAFKA-16298 Project: Kafka Issue Type: Sub-task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new coordinator executes callbacks in the application thread, and sends en event to the background with the callback result ([here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L252]), but does not seem to propagate the exception to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Description: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation (meaning that we would effectively consider UnknownTopicOrPartitionException as non-retriable, even though it extends RetriableException, only when committing offsets before revocation as part of this task) Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has, tracked with a related issue given that it would require a separate fix for the legacy consumer. was: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation (effectively considering UnknownTopicOrPartitionException as non-retriable in this context) Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has, tracked with a related issue given that it would require a separate fix for the legacy consumer. > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (meaning that we would effectively > consider UnknownTopicOrPartitionException as non-retriable, even though it > extends RetriableException, only when committing offsets before revocation as > part of this task) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has, tracked with a related issue given that it > would require a separate fix for the legacy consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16185: --- Priority: Blocker (was: Critical) > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Blocker > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending and sends an ack for it, but in > cases where the same assignment is received in different epochs this does not > work as expected. > 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation > (delayed), and it receives the same assignment, but in a new epoch (ex. after > being FENCED). First time it receives the assignment it takes no action, as > it already has it as pending to reconcile, but when the reconciliation > completes it discards the result because the epoch changed. And this is > wrong. Note that after sending the assignment with the new epoch one time, > the broker continues to send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining > 2 - Client is not sending ack back to the broker in cases where it finishes a > reconciliation for the same assignment that it sent in the last HB (builder > will not include the assignment). Following sequence: > - client owns T1-1 (last HB sent included ack for T1-1) > - client receives [T1-1, T2-1] and start reconciling > - client receives T1-1 (meaning T2-1 needs to be revoked) > - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it > - next reconciliation starts for T1-1 and completes, but ack is not sent > because the builder sees it's the same it sent on the last HB, leaving the > broker waiting for an ack that won't arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16185: --- Description: Currently, the intention in the client state machine is that the client always reconciles whatever it has pending and sends an ack for it, but in cases where the same assignment is received in different epochs this does not work as expected. 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and it receives the same assignment, but in a new epoch (ex. after being FENCED). First time it receives the assignment it takes no action, as it already has it as pending to reconcile, but when the reconciliation completes it discards the result because the epoch changed. And this is wrong. Note that after sending the assignment with the new epoch one time, the broker continues to send null assignments. Here is a sample sequence leading to the client stuck JOINING: - client joins, epoch 0 - client receives assignment tp1, stuck RECONCILING, epoch 1 - member gets FENCED on the coord, coord bumps epoch to 2 - client tries to rejoin (JOINING), epoch 0 provided by the client - new member added to the group (group epoch bumped to 3), client receives same assignment that is currently trying to reconcile (tp1), but with epoch 3 - previous reconciliation completes, but will discard the result because it will notice that the memberHasRejoined (memberEpochOnReconciliationStart != memberEpoch). Client is stuck JOINING, with the server sending null target assignment because it hasn't changed since the last one sent (tp1) We should end up with a test similar to the existing #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that the member receives the same assignment after being fenced and rejoining 2 - Client is not sending ack back to the broker in cases where it finishes a reconciliation for the same assignment that it sent in the last HB (builder will not include the assignment). Following sequence: - client owns T1-1 (last HB sent included ack for T1-1) - client receives [T1-1, T2-1] and start reconciling - client receives T1-1 (meaning T2-1 needs to be revoked) - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it - next reconciliation starts for T1-1 and completes, but ack is not sent because the builder sees it's the same it sent on the last HB, leaving the broker waiting for an ack that won't arrive. was: Currently, the intention in the client state machine is that the client always reconciles whatever it has pending and sends an ack for it, but in cases where the same assignment is received in different epochs this does not work as expected. 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and it receives the same assignment, but in a new epoch (ex. after being FENCED). First time it receives the assignment it takes no action, as it already has it as pending to reconcile, but when the reconciliation completes it discards the result because the epoch changed. And this is wrong. Note that after sending the assignment with the new epoch one time, the broker continues to send null assignments. Here is a sample sequence leading to the client stuck JOINING: - client joins, epoch 0 - client receives assignment tp1, stuck RECONCILING, epoch 1 - member gets FENCED on the coord, coord bumps epoch to 2 - client tries to rejoin (JOINING), epoch 0 provided by the client - new member added to the group (group epoch bumped to 3), client receives same assignment that is currently trying to reconcile (tp1), but with epoch 3 - previous reconciliation completes, but will discard the result because it will notice that the memberHasRejoined (memberEpochOnReconciliationStart != memberEpoch). Client is stuck JOINING, with the server sending null target assignment because it hasn't changed since the last one sent (tp1) We should end up with a test similar to the existing #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that the member receives the same assignment after being fenced and rejoining 2 - Client is not sending ack back to the broker in cases where it finishes a reconciliation for the same assignment that it sent in the last HB (builder will not include the assignment). Following sequence: - client owns T1-1 (last HB sent included ack for T1-1) - client receives [T1-1, T2-1] and start reconciling - client receives T1-1 (meaning T2-1 needs to be revoked) - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it - next reconciliation starts for T1-1 and completes, but ack not sent because the builder sees it's the same it sent on the last HB, leaving the broker waiting for an ack that won't arrive. > Fix client reconciliation of same assignment received in
[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16261: --- Affects Version/s: 3.7.0 > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, consumer-threading-refactor, > kip-848-client-support > Fix For: 3.8.0 > > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16185: --- Priority: Critical (was: Blocker) > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending and sends an ack for it, but in > cases where the same assignment is received in different epochs this does not > work as expected. > 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation > (delayed), and it receives the same assignment, but in a new epoch (ex. after > being FENCED). First time it receives the assignment it takes no action, as > it already has it as pending to reconcile, but when the reconciliation > completes it discards the result because the epoch changed. And this is > wrong. Note that after sending the assignment with the new epoch one time, > the broker continues to send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining > 2 - Client is not sending ack back to the broker in cases where it finishes a > reconciliation for the same assignment that it sent in the last HB (builder > will not include the assignment). Following sequence: > - client owns T1-1 (last HB sent included ack for T1-1) > - client receives [T1-1, T2-1] and start reconciling > - client receives T1-1 (meaning T2-1 needs to be revoked) > - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it > - next reconciliation starts for T1-1 and completes, but ack is not sent > because the builder sees it's the same it sent on the last HB, leaving the > broker waiting for an ack that won't arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Description: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation (effectively considering UnknownTopicOrPartitionException as non-retriable in this context) Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has, tracked with a related issue given that it would require a separate fix for the legacy consumer. was: Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation (effectively considering UnknownTopicOrPartitionException as non-retriable in this context) Note that legacy coordinator behaviour around this seems to be the same as the new consumer currently has. > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (effectively considering > UnknownTopicOrPartitionException as non-retriable in this context) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has, tracked with a related issue given that it > would require a separate fix for the legacy consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Priority: Critical (was: Major) > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (effectively considering > UnknownTopicOrPartitionException as non-retriable in this context) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16185: --- Description: Currently, the intention in the client state machine is that the client always reconciles whatever it has pending and sends an ack for it, but in cases where the same assignment is received in different epochs this does not work as expected. 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and it receives the same assignment, but in a new epoch (ex. after being FENCED). First time it receives the assignment it takes no action, as it already has it as pending to reconcile, but when the reconciliation completes it discards the result because the epoch changed. And this is wrong. Note that after sending the assignment with the new epoch one time, the broker continues to send null assignments. Here is a sample sequence leading to the client stuck JOINING: - client joins, epoch 0 - client receives assignment tp1, stuck RECONCILING, epoch 1 - member gets FENCED on the coord, coord bumps epoch to 2 - client tries to rejoin (JOINING), epoch 0 provided by the client - new member added to the group (group epoch bumped to 3), client receives same assignment that is currently trying to reconcile (tp1), but with epoch 3 - previous reconciliation completes, but will discard the result because it will notice that the memberHasRejoined (memberEpochOnReconciliationStart != memberEpoch). Client is stuck JOINING, with the server sending null target assignment because it hasn't changed since the last one sent (tp1) We should end up with a test similar to the existing #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that the member receives the same assignment after being fenced and rejoining 2 - Client is not sending ack back to the broker in cases where it finishes a reconciliation for the same assignment that it sent in the last HB (builder will not include the assignment). Following sequence: - client owns T1-1 (last HB sent included ack for T1-1) - client receives [T1-1, T2-1] and start reconciling - client receives T1-1 (meaning T2-1 needs to be revoked) - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it - next reconciliation starts for T1-1 and completes, but ack not sent because the builder sees it's the same it sent on the last HB, leaving the broker waiting for an ack that won't arrive. was: Currently, the intention in the client state machine is that the client always reconciles whatever it has pending and sends an ack for it, but in cases where the same assignment is received in different epochs this does not work as expected. 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and it receives the same assignment, but in a new epoch (ex. after being FENCED). First time it receives the assignment it takes no action, as it already has it as pending to reconcile, but when the reconciliation completes it discards the result because the epoch changed. And this is wrong. Note that after sending the assignment with the new epoch one time, the broker continues to send null assignments. Here is a sample sequence leading to the client stuck JOINING: - client joins, epoch 0 - client receives assignment tp1, stuck RECONCILING, epoch 1 - member gets FENCED on the coord, coord bumps epoch to 2 - client tries to rejoin (JOINING), epoch 0 provided by the client - new member added to the group (group epoch bumped to 3), client receives same assignment that is currently trying to reconcile (tp1), but with epoch 3 - previous reconciliation completes, but will discard the result because it will notice that the memberHasRejoined (memberEpochOnReconciliationStart != memberEpoch). Client is stuck JOINING, with the server sending null target assignment because it hasn't changed since the last one sent (tp1) (We should end up with a test similar to the existing #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that the member receives the same assignment after being fenced and rejoining) 2 - Client is not sending ack back to the broker in cases where it finishes a reconciliation for the same assignment that it sent in the last HB (builder will not include the assignment). Following sequence: - client owns T1-1 (last HB sent included ack for T1-1) - client receives [T1-1, T2-1] and start reconciling - client receives T1-1 (meaning T2-1 needs to be revoked) - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it - next reconciliation starts for T1-1 and completes, but ack not sent because the builder sees it's the same it sent on the last HB, leaving the broker waiting for an ack that won't arrive. > Fix client reconciliation of same assignment received in
[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16261: --- Labels: client-transitions-issues consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, consumer-threading-refactor, > kip-848-client-support > Fix For: 3.8.0 > > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic
[ https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16004: --- Priority: Critical (was: Major) > Review new consumer inflight offset commit logic > > > Key: KAFKA-16004 > URL: https://issues.apache.org/jira/browse/KAFKA-16004 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, offset > Fix For: 3.8.0 > > > New consumer logic for committing offsets handles inflight requests, to > validate that no commit requests are sent if a previous one hasn't received a > response. Review how that logic is currently applied to both, sync and async > commits and validate against the legacy coordinator, who seems to apply it > only for async commits. Review considering behaviour for auto-commit > (considering the expected behaviour for auto-commit on the interval, > auto-commit on reconciliation, auto-commit on close) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16103: --- Priority: Critical (was: Major) > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15839: --- Fix Version/s: 4.0.0 (was: 3.8.0) > Topic ID integration in consumer subscription state > --- > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 4.0.0 > > > With the new consumer group protocol, assignments received by the consumer > contain topic IDs instead of topic names. Topic Ids are used in the > reconciliation path, integrated using TopicIdPartition. When reconciling, > topic names are resolved via a metadata update, but they are also kept in a > local #MembershipManager cache. This local cache serves the purpose of > keeping assigned topicId-names (that might have been deleted from metadata, > ex. topic deleted). > That's just an initial step towards spreading topic IDs internally in the > consumer code. Next step to address with this task would be to include topic > IDs in the subscription state, so that assigned topicId-names can be accessed > from other components without the need of resolving names multiple times. > Note that this task aims only at spreading topic IDs internally in the > consumer, no changes to expose them at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15847) Consider partial metadata requests for client reconciliation
[ https://issues.apache.org/jira/browse/KAFKA-15847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15847: --- Fix Version/s: 4.0.0 (was: 3.8.0) > Consider partial metadata requests for client reconciliation > > > Key: KAFKA-15847 > URL: https://issues.apache.org/jira/browse/KAFKA-15847 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, reconciliation > Fix For: 4.0.0 > > > New consumer implementing KIP-848 protocol needs to resolve metadata for the > topics received in the assignment. It does so by relying on the centralized > metadata object. Currently metadata updates requested through the metadata > object, request metadata for all topics. Consider allowing the partial > updates that are already expressed as an intention in the Metadata class but > not fully supported (investigate background in case there were some specifics > that led to this intention not being fully implemented) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15843: --- Fix Version/s: 4.0.0 (was: 3.8.0) > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: callback, kip-848-client-support, reconciliation > Fix For: 4.0.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost). > Note that the consumer integration tests rely on this call to > onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16185: --- Priority: Critical (was: Major) > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending and sends an ack for it, but in > cases where the same assignment is received in different epochs this does not > work as expected. > 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation > (delayed), and it receives the same assignment, but in a new epoch (ex. after > being FENCED). First time it receives the assignment it takes no action, as > it already has it as pending to reconcile, but when the reconciliation > completes it discards the result because the epoch changed. And this is > wrong. Note that after sending the assignment with the new epoch one time, > the broker continues to send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > (We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining) > 2 - Client is not sending ack back to the broker in cases where it finishes a > reconciliation for the same assignment that it sent in the last HB (builder > will not include the assignment). Following sequence: > - client owns T1-1 (last HB sent included ack for T1-1) > - client receives [T1-1, T2-1] and start reconciling > - client receives T1-1 (meaning T2-1 needs to be revoked) > - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it > - next reconciliation starts for T1-1 and completes, but ack not sent > because the builder sees it's the same it sent on the last HB, leaving the > broker waiting for an ack that won't arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16261: --- Priority: Critical (was: Minor) > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16261: --- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor) > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andrew Schofield >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16258: -- Assignee: Lianet Magrans > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval
[ https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16165: --- Description: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) We should review the poll expiration logic that triggers a leave group operation. That is currently applied in the HB Manager poll, without any validation, and given it depends on the consumer poll timer, it could happen at any time, regardless of the state of the member. Ex. poll timer could expire when the member is leaving, leading to this leaving->stale invalid transition. We should probably consider that this pro-active leave should only apply when the consumer is not leaving (prepare leaving or leaving) was: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Labels: kip-848-client-support (was: ) > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500 ] Lianet Magrans edited comment on KAFKA-16008 at 2/14/24 7:25 PM: - [~kirktrue] I just added the links to the issue that most probably is causing/related to this failure was (Author: JIRAUSER300183): [~kirktrue] I just added the links to the issue that most probably is causing this failure > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500 ] Lianet Magrans commented on KAFKA-16008: [~kirktrue] I just added the links to the issue that most probably is causing this failure > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Description: When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). This issue is most probably what is causing the failures in the integration tests that fail expecting callbacks when the poll interval expires (like https://issues.apache.org/jira/browse/KAFKA-16008) was:When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
Lianet Magrans created KAFKA-16258: -- Summary: Stale member should trigger onPartitionsLost when leaving group Key: KAFKA-16258 URL: https://issues.apache.org/jira/browse/KAFKA-16258 Project: Kafka Issue Type: Sub-task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (effectively considering > UnknownTopicOrPartitionException as non-retriable in this context) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has. -- This message was sent by Atlassian Jira (v8.20.10#820010)