[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the 

[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception 

[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16777:
--

 Summary: New consumer should throw NoOffsetForPartitionException 
on continuous poll zero if no reset strategy
 Key: KAFKA-16777
 URL: https://issues.apache.org/jira/browse/KAFKA-16777
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Lianet Magrans


If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846446#comment-17846446
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], sorry I had missed your last question. The new group 
rebalance protocol from KIP-848 is supported in KRaft mode only.

> AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
> -
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException". 
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should 

[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16766:
--

 Summary: New consumer offsetsForTimes timeout exception does not 
have the proper message
 Key: KAFKA-16766
 URL: https://issues.apache.org/jira/browse/KAFKA-16766
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16764:
---
Description: 
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])

This is probably what makes that 
[testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
 fails for the new consumer. Once this bug is fixed, we should be able to 
enable that test for the new consumer.

  was:
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])


> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16764:
--

 Summary: New consumer should throw InvalidTopicException on poll 
when invalid topic in metadata
 Key: KAFKA-16764
 URL: https://issues.apache.org/jira/browse/KAFKA-16764
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16406:
---
Labels: consumer kip-848-client-support  (was: )

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer, kip-848-client-support
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16406.
--

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16406.

Resolution: Fixed

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846315#comment-17846315
 ] 

Lianet Magrans commented on KAFKA-16758:


Hey, I would be happy to take on this one! I expect I could make time for it 
after 3.8. Thanks!

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16737:
--

Assignee: Lianet Magrans

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Affects Version/s: 3.7.0

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Fix Version/s: 3.8.0

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Priority: Blocker  (was: Major)

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16737:
--

 Summary: Clean up KafkaConsumerTest TODOs enabling tests for new 
consumer
 Key: KAFKA-16737
 URL: https://issues.apache.org/jira/browse/KAFKA-16737
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Lianet Magrans


KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
only enabled for the CLASSIC protocol and should be reviewed and enabled for 
the new CONSUMER group protocol when applicable. Some tests also have TODOs to 
enable them for the new consumer when certain features/bugs are addressed. 

The new protocol and consumer implementation have evolved a lot since those 
TODOs where added, so we should review them all, enable tests for the new 
protocol when applicable and removing the TODOs from the code. Note that there 
is another AsyncKafkaConsumerTest.java, testing logic specific to the internals 
of the new consumer, but still many tests in the KafkaConsumerTest apply to 
both the new and legacy consumer, and we should enable them for both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Fix Version/s: 3.8.0

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Labels: kip-848-client-support  (was: )

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16695:
--

 Summary: Improve expired poll interval logging by showing exceeded 
time
 Key: KAFKA-16695
 URL: https://issues.apache.org/jira/browse/KAFKA-16695
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


When a consumer poll iteration takes longer than the max.poll.interval, the 
consumer logs a warn suggesting that the max.poll.interval config was exceeded, 
and pro-actively leaves the group. The log suggests to consider adjusting the 
max.poll.interval.config which should help in the cases of long processing 
times. We should consider adding the info of how much time the interval was 
exceeded, since it could be helpful in guiding the user to effectively adjust 
the config. This is done in other clients, that log this kind of messages in 
this situation:
{quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
max.poll.interval.ms for long-running message processing): leaving group{quote}
  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16665.
--

> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> ---
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe (HeartbeatRequest) until it gets a response to the initial 
FindCoordinator request (HeartbeatManager skips sending requests if it does not 
know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's 

[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans commented on KAFKA-16670:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> 

[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file

2024-05-06 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16675:
--

 Summary: Move rebalance callback test for positions to callbacks 
test file
 Key: KAFKA-16675
 URL: https://issues.apache.org/jira/browse/KAFKA-16675
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Integration test 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added 
to the PlaintextConsumerTest.scala in this PR 
https://github.com/apache/kafka/pull/15856, as there was no specific file for 
testing callbacks at the moment. Another PR is in-flight, adding the file for 
callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 
gets merged, we should move 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843324#comment-17843324
 ] 

Lianet Magrans commented on KAFKA-16474:


Yes, makes sense, that PR is addressing only point 1. of what you reported, so 
we still have the point about the poll frequency (maybe to address separately 
if we still believe it's a concern) 

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843318#comment-17843318
 ] 

Lianet Magrans commented on KAFKA-16474:


Hey [~pnee], this is the PR https://github.com/apache/kafka/pull/15723 from 
[~kirktrue] that fixes a bug that for sure was leading to double heartbeats (I 
guess it's what you were seeing here, to double check)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16665:
---
Description: 
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

Note that a partition position can be updated from 2 places: call to 
consumer.position or call to consumer.poll. Both will attempt to 
`updateFetchPositions` if there is no valid position yet, but even after having 
a valid position after those calls, the partition will remain non-fetchable 
until the onPartitionsAssigned callback completes (fetchable considers that the 
partitions has a valid position AND is not awaiting the callback)

  

  was:
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  


> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> ---
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16665:
--

 Summary: Fail to get partition's position from within 
onPartitionsAssigned callback in new consumer 
 Key: KAFKA-16665
 URL: https://issues.apache.org/jira/browse/KAFKA-16665
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842765#comment-17842765
 ] 

Lianet Magrans commented on KAFKA-16514:


Just to answer the question above:
{quote}what is the purpose of -2 code? In the end, not sending any request, 
with a large enough session timeout, no rebalance would be triggered anyway? 
What does change is we send -2 instead of just not sending any leaver group 
request on close()?{quote}

The purpose is just to set the intention explicitly at the protocol level (and 
not assume it). This is mainly to allow for richer semantics around the static 
membership leave in the future. It does not make a difference at the moment 
(over not sending the leave group), but it does allow to cleanly extend the 
current logic if we ever want to, and allow static members to leave permanently 
by sending a -1 epoch on the leave group. That would effectively allow to 
remove a static members from a group (which can only be achieved now either 
waiting for the session timeout to expire, or via the admin api)

Anyways, that's just food for thought for now. The KIP extending the consumer 
close with options seems sensible to solve the current situation, and would 
align nicely with any future extension of the static leave semantics if we ever 
go down that path. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 8:25 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your client code again, locally 
starting a broker in kraft mode, with the default config, only adding 
`group.coordinator.rebalance.protocols=consumer,classic`. 1 topic, 1 partition, 
1 instance of your consumer app running with the poll duration of 1s, and was 
able to consume messages as expected. I only changed to StringDeserializers for 
simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest taking a 
look and share the broker logs to understand more about what's going on on your 
setup. If all looks good there maybe provide a ConsumerRebalanceListener to the 
call to subscribe, just to check/print the partitions assigned to your consumer 
on the onPartitionsAssigned callback. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | 

[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> 

[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> 

[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842160#comment-17842160
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], thanks for reporting this. I wonder if you're 
hitting a known issue with the consumer api and timeous (being fixed with 
https://issues.apache.org/jira/browse/KAFKA-16200 and 
https://issues.apache.org/jira/browse/KAFKA-15974). I tried your code, changing 
only the call poll from what you had `kafkaConsumer.poll(Duration.ZERO)` , to 
provide a non-zero duration, and it all worked as expected. So I guess it could 
be related to the timeout enforcement issues being fixed on the consumer side. 

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test

2024-04-29 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16465.
--

> New consumer does not invoke rebalance callbacks as expected in 
> consumer_test.py system test
> 
>
> Key: KAFKA-16465
> URL: https://issues.apache.org/jira/browse/KAFKA-16465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 29.511 seconds
> AssertionError('Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 254, in test_static_consumer_bounce
> (num_revokes_after_bounce, check_condition)
> AssertionError: Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16459) New consumer times out joining group in consumer_test.py system test

2024-04-29 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16459.
--

> New consumer times out joining group in consumer_test.py system test
> 
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841986#comment-17841986
 ] 

Lianet Magrans commented on KAFKA-16514:


If my understanding here is right, this seems like a fair requirement to have 
the ability to dynamically decide if a member should leave the group, 
regardless of its dynamic/static membership. We've actually had conversations 
about this while working on the new consumer group protocol, where static 
members do send a leave group request when leaving, and they send it with an 
epoch that explicitly indicates it's a temporary leaving (-2). For now this is 
the only way static members leave (temporarily), but the ground is set if ever 
in the future we decide that we want to allow static members to send a 
definitive leave group (ex. -1 epoch, like dynamic members do). So with this 
context, and back to the streams situation here, I wonder if we would be better 
overall if we bring a less hacky solution and enable a close with richer 
semantics at the consumer level, that would allow to have a close where 
options/params could be passed to dynamically indicate how to close 
(temporarily, definitely, ...), and then align this nicely with the new 
protocol (and make it work with the legacy one). Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16628:
--

Assignee: Lianet Magrans

> Add system test for validating static consumer bounce and assignment when not 
> eager
> ---
>
> Key: KAFKA-16628
> URL: https://issues.apache.org/jira/browse/KAFKA-16628
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Existing system 
> [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
>  include a test for validating that partitions are not re-assigned when a 
> static member is bounced, but the test design and setup is intended for 
> testing this for the Eager assignment strategy only (based on the eager 
> protocol where all dynamic members revoke their partitions when a rebalance 
> happens). 
> We should considering adding a test that would ensure that partitions are not 
> re-assigned when using the cooperative sticky assignor or the new consumer 
> group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16628:
--

 Summary: Add system test for validating static consumer bounce and 
assignment when not eager
 Key: KAFKA-16628
 URL: https://issues.apache.org/jira/browse/KAFKA-16628
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Lianet Magrans


Existing system 
[test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
 include a test for validating that partitions are not re-assigned when a 
static member is bounced, but the test design and setup is intended for testing 
this for the Eager assignment strategy only (based on the eager protocol where 
all dynamic members revoke their partitions when a rebalance happens). 
We should considering adding a test that would ensure that partitions are not 
re-assigned when using the cooperative sticky assignor or the new consumer 
group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16528) Reset member heartbeat interval when request sent

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16528.
--

> Reset member heartbeat interval when request sent
> -
>
> Key: KAFKA-16528
> URL: https://issues.apache.org/jira/browse/KAFKA-16528
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Member should reset the heartbeat timer when the request is sent, rather than 
> when a response is received. This aims to ensure that we don't add-up to 
> interval any delay there might be in a response. With this, we better respect 
> the contract of members sending HB on the interval to remain in the group, 
> and avoid potential unwanted rebalances.   
> Note that there is already a logic in place to avoid sending a request if a 
> response hasn't been received. So that will ensure that, even with the reset 
> of the interval on the send,  the next HB will only be sent as when the 
> response is received. (Will be sent out on the next poll of the HB manager, 
> and respecting the minimal backoff for sending consecutive requests). This 
> will btw be consistent with how the interval timing & in-flights is handled 
> for auto-commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839096#comment-17839096
 ] 

Lianet Magrans commented on KAFKA-16493:


Hey [~phuctran], any progress on this one? Even though it's a performance 
improvement I'm afraid it's a very sensitive one given that it would affect the 
poll loop, so we need to make sure it makes it into 3.8 (it had the wrong fix 
version before, I just updated it). Let me know if you have any questions. 
Thanks!

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16493:
---
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Component/s: clients
 consumer
 system tests

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:59 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new protocol/consumer. You're on the right page regarding the rest: Streams 
tests haven't been migrated yet because it's not integrated with the new 
protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Summary: Update consumer static membership fencing system test to support 
new protocol  (was: Update static membership fencing system test to support new 
protocol)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:34 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:31 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that was parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:30 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that was parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans commented on KAFKA-16566:


Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16566:
--

 Summary: Update static membership fencing system test to support 
new protocol
 Key: KAFKA-16566
 URL: https://issues.apache.org/jira/browse/KAFKA-16566
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
that verifies the sequence in which static members join a group when using 
conflicting instance id. This behaviour is different in the classic and 
consumer protocol, so the tests should be updated to set the right expectations 
when running with the new consumer protocol. Note that what the tests covers 
(params, setup), apply to both protocols. It is the expected results that are 
not the same. 

When conflicts between static members joining a group:

Classic protocol: all members join the group with the same group instance id, 
and then the first one will eventually receive a HB error with 
FencedInstanceIdException

Consumer protocol: new member with an instance Id already in use is not able to 
join, receiving an UnreleasedInstanceIdException in the response to the HB to 
join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16528) Reset member heartbeat interval when request sent

2024-04-11 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16528:
--

 Summary: Reset member heartbeat interval when request sent
 Key: KAFKA-16528
 URL: https://issues.apache.org/jira/browse/KAFKA-16528
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


Member should reset the heartbeat timer when the request is sent, rather than 
when a response is received. This aims to ensure that we don't add-up to 
interval any delay there might be in a response. With this, we better respect 
the contract of members sending HB on the interval to remain in the group, and 
avoid potential unwanted rebalances.   

Note that there is already a logic in place to avoid sending a request if a 
response hasn't been received. So that will ensure that, even with the reset of 
the interval on the send,  the next HB will only be sent as when the response 
is received. (Will be sent out on the next poll of the HB manager, and 
respecting the minimal backoff for sending consecutive requests). This will btw 
be consistent with how the interval timing & in-flights is handled for 
auto-commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16493:
--

 Summary: Avoid unneeded subscription regex check if metadata 
version unchanged
 Key: KAFKA-16493
 URL: https://issues.apache.org/jira/browse/KAFKA-16493
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


When using pattern subscription (java pattern), the new consumer regularly 
checks if the list of topics that match the regex has changed. This is done as 
part of the consumer poll loop, and it evaluates the regex using the latest 
cluster metadata. As an improvement, we should avoid evaluating the regex if 
the metadata version hasn't changed (similar to what the legacy coordinator 
does 
[here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16474:
--

Assignee: Lianet Magrans  (was: Philip Nee)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16418) Review/split long-running admin client integration tests

2024-04-02 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16418:
---
Summary: Review/split long-running admin client integration tests  (was: 
Split long-running admin client integration tests)

> Review/split long-running admin client integration tests
> 
>
> Key: KAFKA-16418
> URL: https://issues.apache.org/jira/browse/KAFKA-16418
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
> parallelization and improve build times. This tests is the longest running 
> integration test in kafka.api, so a similar approach to what has been done 
> with the consumer tests in PlaintextConsumerTest should be a good 
> improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-03-28 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15843:
---
Priority: Minor  (was: Major)

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: callback, kip-848-client-support, reconciliation
> Fix For: 4.0.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-03-26 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831050#comment-17831050
 ] 

Lianet Magrans commented on KAFKA-15538:


Hey [~kirktrue], I noticed this stayed without a response but @phuc already had 
an approach in his PR, as part of the poll loop. That was not being done before 
(given that we had no regex support)

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16418) Split long-running admin client integration tests

2024-03-25 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16418:
--

 Summary: Split long-running admin client integration tests
 Key: KAFKA-16418
 URL: https://issues.apache.org/jira/browse/KAFKA-16418
 Project: Kafka
  Issue Type: Task
  Components: clients
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
parallelization and improve build times. This tests is the longest running 
integration test in kafka.api, so a similar approach to what has been done with 
the consumer tests in PlaintextConsumerTest should be a good improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-25 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16375.

Resolution: Fixed

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining 
> while reconciling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16406) Split long-running consumer integration test

2024-03-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16406:
--

 Summary: Split long-running consumer integration test
 Key: KAFKA-16406
 URL: https://issues.apache.org/jira/browse/KAFKA-16406
 Project: Kafka
  Issue Type: Task
Reporter: Lianet Magrans
Assignee: Lianet Magrans


PlaintextConsumerTest contains integration tests for the consumer. Since the 
introduction of the new consumer group protocol (KIP-848) and the new 
KafkaConsumer, this test has been parametrized to run with multiple 
combinations, making sure we test the logic for the old and new coordinator, as 
well as for the legacy and new KafkaConsumer. 

This led to this being one of the longest-running integration tests, so in the 
aim of reducing the impact on the build times we could split it to allow for 
parallelization.  The tests covers multiple areas of the consumer logic, in a 
single file, so splitting based on the high-level features being tested would 
be sensible and achieve the result wanted.   




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Description: The current implementation of the new consumer discards the 
result of a reconciliation if the member rejoined, based on a comparison of the 
member epoch at the start and end of the reconciliation. If the epochs changed 
the reconciliation is discarded. This is not right because the member epoch 
could be incremented without an assignment change. This should be fixed to 
ensure that the reconciliation is discarded if the member rejoined, probably 
based on a flag that truly reflects that it went through a transition to 
joining while reconciling.  (was: The current implementation of the new 
consumer discards the result of a reconciliation if the member rejoined, based 
on a comparison of the member epoch at the start and end of the reconciliation. 
If the epochs changed the reconciliation is discarded. This is not right 
because the member epoch could be incremented without an assignment change. 
This should be fixed to ensure that the reconciliation is discarded if the 
member rejoined, probably based on a flag that truly reflects that it went 
through a transition to joining. )

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining 
> while reconciling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-03-18 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827990#comment-17827990
 ] 

Lianet Magrans commented on KAFKA-15551:


Also, given how tight the deadline is to get the fix/PR in, I would suggest we 
focus on the new consumer only. If we find things that could be improved in 
this sense in the old one, we could file a separate Jira for it and tackle it 
afterwards. 

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-03-18 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827989#comment-17827989
 ] 

Lianet Magrans commented on KAFKA-15551:


Hey [~zxcoccer], thanks for jumping in! This one should definitely be a simple 
one, as I know that we do handle it already for some API calls (ex. validate 
positions early return if no positions to validate 
[here|https://github.com/apache/kafka/blob/5c929874b88b3b96f650de0f733d93d42ac535a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L227]).
 Still, we wanted to verify that we are doing similarly for all the API calls, 
to ensure that we're not doing unneeded processing/requests.
We want to have this for 3.8, so the deadline is really tight, but if you have 
availability it would be a great help, feel free to re-assign it to you and 
ping me anytime if you have questions. Thanks!  

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Fix Version/s: 3.8.0

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-15 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827589#comment-17827589
 ] 

Lianet Magrans commented on KAFKA-16375:


Yes, I just updated the version ;)

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Description: The current implementation of the new consumer discards the 
result of a reconciliation if the member rejoined, based on a comparison of the 
member epoch at the start and end of the reconciliation. If the epochs changed 
the reconciliation is discarded. This is not right because the member epoch 
could be incremented without an assignment change. This should be fixed to 
ensure that the reconciliation is discarded if the member rejoined, probably 
based on a flag that truly reflects that it went through a transition to 
joining.   (was: The current implementation of the new consumer discards the 
result of a reconciliation if the member rejoined, based on a comparison of the 
member epoch at the start and end of the reconciliation. If the epochs changed 
the reconciliation is discarded. This is not right because the member epoch 
could be incremented without an assignment change. This should be fixed to 
ensure that the reconciliation is discarded if the member rejoined, probably 
based on a flag that truly reflects that it went through a transition to 
joining. 
As a potential improvement, consider if the member could keep the 
reconciliation if it rejoined but got the same assignment.)

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Priority: Critical  (was: Major)

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 
> As a potential improvement, consider if the member could keep the 
> reconciliation if it rejoined but got the same assignment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16375:
--

 Summary: Fix logic for discarding reconciliation if member rejoined
 Key: KAFKA-16375
 URL: https://issues.apache.org/jira/browse/KAFKA-16375
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


The current implementation of the new consumer discards the result of a 
reconciliation if the member rejoined, based on a comparison of the member 
epoch at the start and end of the reconciliation. If the epochs changed the 
reconciliation is discarded. This is not right because the member epoch could 
be incremented without an assignment change. This should be fixed to ensure 
that the reconciliation is discarded if the member rejoined, probably based on 
a flag that truly reflects that it went through a transition to joining. 
As a potential improvement, consider if the member could keep the 
reconciliation if it rejoined but got the same assignment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-03-13 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826080#comment-17826080
 ] 

Lianet Magrans commented on KAFKA-15538:


Hey [~phuctran], any update regarding this one? We're getting close to the 
deadline, and given that this one is needed for feature parity with the old 
consumer, I think we should prioritize it over 
https://issues.apache.org/jira/browse/KAFKA-15561. Thanks!

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16301) Review fenced member unsubscribe/subscribe callbacks interaction

2024-02-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16301:
--

 Summary: Review fenced member unsubscribe/subscribe callbacks 
interaction
 Key: KAFKA-16301
 URL: https://issues.apache.org/jira/browse/KAFKA-16301
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans


When a member gets fenced, it triggers the onPartitionsLost callback if any, 
and then rejoins the group. If while the callback completes the member attempts 
to leave the group (ex. unsubscribe), the leave operation detects that the 
member is already removed from the group (fenced), and just aligns the client 
state with the current broker state, and marks the client as UNSUBSCRIBED 
(client side state for not in group). 

This means that the member could attempt to rejoin the group if the user calls 
subscribe, get an assignment, and trigger onPartitionsAssigned, when maybe the 
onPartitionsLost hasn't completed.

This approach keeps the client state machine simple given that it does not need 
to block the new member (it will effectively be a new member because the old 
one got fenced). The new member could rejoin, get an assignment and make 
progress. Downside is that it would potentially allow for overlapped callback 
executions (lost and assign) in the above edge case, which is not the behaviour 
in the old coordinator. Review and validate. Alternative would definitely 
require more complex logic on the client to ensure that we do not allow a new 
member to rejoin until the fenced one completes the callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll

2024-02-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16298:
---
Description: When user-defined callbacks fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new coordinator executes callbacks in the application thread, and sends an 
event to the background with the callback result and error if any, [passing the 
error along with the event 
here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
 to the background thread, but does not seem to propagate the exception to the 
user.   (was: When user-defined callbacks fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new coordinator executes callbacks in the application thread, and sends en 
event to the background with the callback result 
([here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L252]),
 but does not seem to propagate the exception to the user. )

> Ensure user callbacks exceptions are propagated to the user on consumer poll
> 
>
> Key: KAFKA-16298
> URL: https://issues.apache.org/jira/browse/KAFKA-16298
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> When user-defined callbacks fail with an exception, the expectation is that 
> the error should be propagated to the user as a KafkaExpception and break the 
> poll loop (behaviour in the legacy coordinator). The new coordinator executes 
> callbacks in the application thread, and sends an event to the background 
> with the callback result and error if any, [passing the error along with the 
> event 
> here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
>  to the background thread, but does not seem to propagate the exception to 
> the user. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll

2024-02-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16298:
---
Description: When user-defined callbacks fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new consumer executes callbacks in the application thread, and sends an 
event to the background with the callback result and error if any, [passing the 
error along with the event 
here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
 to the background thread, but does not seem to propagate the exception to the 
user.   (was: When user-defined callbacks fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new coordinator executes callbacks in the application thread, and sends an 
event to the background with the callback result and error if any, [passing the 
error along with the event 
here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
 to the background thread, but does not seem to propagate the exception to the 
user. )

> Ensure user callbacks exceptions are propagated to the user on consumer poll
> 
>
> Key: KAFKA-16298
> URL: https://issues.apache.org/jira/browse/KAFKA-16298
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> When user-defined callbacks fail with an exception, the expectation is that 
> the error should be propagated to the user as a KafkaExpception and break the 
> poll loop (behaviour in the legacy coordinator). The new consumer executes 
> callbacks in the application thread, and sends an event to the background 
> with the callback result and error if any, [passing the error along with the 
> event 
> here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
>  to the background thread, but does not seem to propagate the exception to 
> the user. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll

2024-02-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16298:
--

 Summary: Ensure user callbacks exceptions are propagated to the 
user on consumer poll
 Key: KAFKA-16298
 URL: https://issues.apache.org/jira/browse/KAFKA-16298
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


When user-defined callbacks fail with an exception, the expectation is that the 
error should be propagated to the user as a KafkaExpception and break the poll 
loop (behaviour in the legacy coordinator). The new coordinator executes 
callbacks in the application thread, and sends en event to the background with 
the callback result 
([here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L252]),
 but does not seem to propagate the exception to the user. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Description: 
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation (meaning that we would effectively consider 
UnknownTopicOrPartitionException as non-retriable, even though it extends 
RetriableException, only when committing offsets before revocation as part of 
this task) 
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has, tracked with a related issue given that it would 
require a separate fix for the legacy consumer.

  was:
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation (effectively considering 
UnknownTopicOrPartitionException as non-retriable in this context) 
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has, tracked with a related issue given that it would 
require a separate fix for the legacy consumer.


> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (meaning that we would effectively 
> consider UnknownTopicOrPartitionException as non-retriable, even though it 
> extends RetriableException, only when committing offsets before revocation as 
> part of this task) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has, tracked with a related issue given that it 
> would require a separate fix for the legacy consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Priority: Blocker  (was: Critical)

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending and sends an ack for it, but in 
> cases where the same assignment is received in different epochs this does not 
> work as expected.
> 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
> (delayed), and it receives the same assignment, but in a new epoch (ex. after 
> being FENCED). First time it receives the assignment it takes no action, as 
> it already has it as pending to reconcile, but when the reconciliation 
> completes it discards the result because the epoch changed. And this is 
> wrong. Note that after sending the assignment with the new epoch one time, 
> the broker continues to send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining
> 2 - Client is not sending ack back to the broker in cases where it finishes a 
> reconciliation for the same assignment that it sent in the last HB (builder 
> will not include the assignment). Following sequence:
> - client owns T1-1 (last HB sent included ack for T1-1)
> - client receives [T1-1, T2-1] and start reconciling 
> - client receives T1-1 (meaning T2-1 needs to be revoked)
> - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
> - next reconciliation starts for T1-1 and completes, but ack is not sent 
> because the builder sees it's the same it sent on the last HB, leaving the 
> broker waiting for an ack that won't arrive. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Description: 
Currently, the intention in the client state machine is that the client always 
reconciles whatever it has pending and sends an ack for it, but in cases where 
the same assignment is received in different epochs this does not work as 
expected.

1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
(delayed), and it receives the same assignment, but in a new epoch (ex. after 
being FENCED). First time it receives the assignment it takes no action, as it 
already has it as pending to reconcile, but when the reconciliation completes 
it discards the result because the epoch changed. And this is wrong. Note that 
after sending the assignment with the new epoch one time, the broker continues 
to send null assignments. 

Here is a sample sequence leading to the client stuck JOINING:
- client joins, epoch 0
- client receives assignment tp1, stuck RECONCILING, epoch 1
- member gets FENCED on the coord, coord bumps epoch to 2
- client tries to rejoin (JOINING), epoch 0 provided by the client
- new member added to the group (group epoch bumped to 3), client receives same 
assignment that is currently trying to reconcile (tp1), but with epoch 3
- previous reconciliation completes, but will discard the result because it 
will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
memberEpoch). Client is stuck JOINING, with the server sending null target 
assignment because it hasn't changed since the last one sent (tp1)

We should end up with a test similar to the existing 
#testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that 
the member receives the same assignment after being fenced and rejoining


2 - Client is not sending ack back to the broker in cases where it finishes a 
reconciliation for the same assignment that it sent in the last HB (builder 
will not include the assignment). Following sequence:
- client owns T1-1 (last HB sent included ack for T1-1)
- client receives [T1-1, T2-1] and start reconciling 
- client receives T1-1 (meaning T2-1 needs to be revoked)
- ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
- next reconciliation starts for T1-1 and completes, but ack is not sent 
because the builder sees it's the same it sent on the last HB, leaving the 
broker waiting for an ack that won't arrive. 



  was:
Currently, the intention in the client state machine is that the client always 
reconciles whatever it has pending and sends an ack for it, but in cases where 
the same assignment is received in different epochs this does not work as 
expected.

1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
(delayed), and it receives the same assignment, but in a new epoch (ex. after 
being FENCED). First time it receives the assignment it takes no action, as it 
already has it as pending to reconcile, but when the reconciliation completes 
it discards the result because the epoch changed. And this is wrong. Note that 
after sending the assignment with the new epoch one time, the broker continues 
to send null assignments. 

Here is a sample sequence leading to the client stuck JOINING:
- client joins, epoch 0
- client receives assignment tp1, stuck RECONCILING, epoch 1
- member gets FENCED on the coord, coord bumps epoch to 2
- client tries to rejoin (JOINING), epoch 0 provided by the client
- new member added to the group (group epoch bumped to 3), client receives same 
assignment that is currently trying to reconcile (tp1), but with epoch 3
- previous reconciliation completes, but will discard the result because it 
will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
memberEpoch). Client is stuck JOINING, with the server sending null target 
assignment because it hasn't changed since the last one sent (tp1)

We should end up with a test similar to the existing 
#testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that 
the member receives the same assignment after being fenced and rejoining


2 - Client is not sending ack back to the broker in cases where it finishes a 
reconciliation for the same assignment that it sent in the last HB (builder 
will not include the assignment). Following sequence:
- client owns T1-1 (last HB sent included ack for T1-1)
- client receives [T1-1, T2-1] and start reconciling 
- client receives T1-1 (meaning T2-1 needs to be revoked)
- ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
- next reconciliation starts for T1-1 and completes, but ack not sent 
because the builder sees it's the same it sent on the last HB, leaving the 
broker waiting for an ack that won't arrive. 




> Fix client reconciliation of same assignment received in 

[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16261:
---
Affects Version/s: 3.7.0

> MembershipManagerImpl.updateSubscription fails if already empty subscription
> 
>
> Key: KAFKA-16261
> URL: https://issues.apache.org/jira/browse/KAFKA-16261
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, consumer-threading-refactor, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The internal SubscriptionState object keeps track of whether the assignment 
> is user-assigned, or auto-assigned. If there are no assigned partitions, the 
> assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed 
> in this state it fails.
> The easiest thing is perhaps to check 
> SubscriptionState.hasAutoAssignedPartitions() to make sure that 
> assignFromSubscribed is going to be permitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Priority: Critical  (was: Blocker)

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending and sends an ack for it, but in 
> cases where the same assignment is received in different epochs this does not 
> work as expected.
> 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
> (delayed), and it receives the same assignment, but in a new epoch (ex. after 
> being FENCED). First time it receives the assignment it takes no action, as 
> it already has it as pending to reconcile, but when the reconciliation 
> completes it discards the result because the epoch changed. And this is 
> wrong. Note that after sending the assignment with the new epoch one time, 
> the broker continues to send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining
> 2 - Client is not sending ack back to the broker in cases where it finishes a 
> reconciliation for the same assignment that it sent in the last HB (builder 
> will not include the assignment). Following sequence:
> - client owns T1-1 (last HB sent included ack for T1-1)
> - client receives [T1-1, T2-1] and start reconciling 
> - client receives T1-1 (meaning T2-1 needs to be revoked)
> - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
> - next reconciliation starts for T1-1 and completes, but ack is not sent 
> because the builder sees it's the same it sent on the last HB, leaving the 
> broker waiting for an ack that won't arrive. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Description: 
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation (effectively considering 
UnknownTopicOrPartitionException as non-retriable in this context) 
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has, tracked with a related issue given that it would 
require a separate fix for the legacy consumer.

  was:
Current logic for auto-committing offsets when partitions are revoked will 
retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
member not completing the revocation in time. We should consider this as an 
indication of the topic being deleted, and in the context of committing offsets 
to revoke partitions, we should abort the commit attempt and move on to 
complete and ack the revocation (effectively considering 
UnknownTopicOrPartitionException as non-retriable in this context) 
Note that legacy coordinator behaviour around this seems to be the same as the 
new consumer currently has.


> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (effectively considering 
> UnknownTopicOrPartitionException as non-retriable in this context) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has, tracked with a related issue given that it 
> would require a separate fix for the legacy consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Priority: Critical  (was: Major)

> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (effectively considering 
> UnknownTopicOrPartitionException as non-retriable in this context) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Description: 
Currently, the intention in the client state machine is that the client always 
reconciles whatever it has pending and sends an ack for it, but in cases where 
the same assignment is received in different epochs this does not work as 
expected.

1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
(delayed), and it receives the same assignment, but in a new epoch (ex. after 
being FENCED). First time it receives the assignment it takes no action, as it 
already has it as pending to reconcile, but when the reconciliation completes 
it discards the result because the epoch changed. And this is wrong. Note that 
after sending the assignment with the new epoch one time, the broker continues 
to send null assignments. 

Here is a sample sequence leading to the client stuck JOINING:
- client joins, epoch 0
- client receives assignment tp1, stuck RECONCILING, epoch 1
- member gets FENCED on the coord, coord bumps epoch to 2
- client tries to rejoin (JOINING), epoch 0 provided by the client
- new member added to the group (group epoch bumped to 3), client receives same 
assignment that is currently trying to reconcile (tp1), but with epoch 3
- previous reconciliation completes, but will discard the result because it 
will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
memberEpoch). Client is stuck JOINING, with the server sending null target 
assignment because it hasn't changed since the last one sent (tp1)

We should end up with a test similar to the existing 
#testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that 
the member receives the same assignment after being fenced and rejoining


2 - Client is not sending ack back to the broker in cases where it finishes a 
reconciliation for the same assignment that it sent in the last HB (builder 
will not include the assignment). Following sequence:
- client owns T1-1 (last HB sent included ack for T1-1)
- client receives [T1-1, T2-1] and start reconciling 
- client receives T1-1 (meaning T2-1 needs to be revoked)
- ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
- next reconciliation starts for T1-1 and completes, but ack not sent 
because the builder sees it's the same it sent on the last HB, leaving the 
broker waiting for an ack that won't arrive. 



  was:
Currently, the intention in the client state machine is that the client always 
reconciles whatever it has pending and sends an ack for it, but in cases where 
the same assignment is received in different epochs this does not work as 
expected.

1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
(delayed), and it receives the same assignment, but in a new epoch (ex. after 
being FENCED). First time it receives the assignment it takes no action, as it 
already has it as pending to reconcile, but when the reconciliation completes 
it discards the result because the epoch changed. And this is wrong. Note that 
after sending the assignment with the new epoch one time, the broker continues 
to send null assignments. 

Here is a sample sequence leading to the client stuck JOINING:
- client joins, epoch 0
- client receives assignment tp1, stuck RECONCILING, epoch 1
- member gets FENCED on the coord, coord bumps epoch to 2
- client tries to rejoin (JOINING), epoch 0 provided by the client
- new member added to the group (group epoch bumped to 3), client receives same 
assignment that is currently trying to reconcile (tp1), but with epoch 3
- previous reconciliation completes, but will discard the result because it 
will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
memberEpoch). Client is stuck JOINING, with the server sending null target 
assignment because it hasn't changed since the last one sent (tp1)

(We should end up with a test similar to the existing 
#testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that 
the member receives the same assignment after being fenced and rejoining)


2 - Client is not sending ack back to the broker in cases where it finishes a 
reconciliation for the same assignment that it sent in the last HB (builder 
will not include the assignment). Following sequence:
- client owns T1-1 (last HB sent included ack for T1-1)
- client receives [T1-1, T2-1] and start reconciling 
- client receives T1-1 (meaning T2-1 needs to be revoked)
- ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
- next reconciliation starts for T1-1 and completes, but ack not sent 
because the builder sees it's the same it sent on the last HB, leaving the 
broker waiting for an ack that won't arrive. 




> Fix client reconciliation of same assignment received in 

[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16261:
---
Labels: client-transitions-issues consumer-threading-refactor 
kip-848-client-support  (was: consumer-threading-refactor 
kip-848-client-support)

> MembershipManagerImpl.updateSubscription fails if already empty subscription
> 
>
> Key: KAFKA-16261
> URL: https://issues.apache.org/jira/browse/KAFKA-16261
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, consumer-threading-refactor, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The internal SubscriptionState object keeps track of whether the assignment 
> is user-assigned, or auto-assigned. If there are no assigned partitions, the 
> assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed 
> in this state it fails.
> The easiest thing is perhaps to check 
> SubscriptionState.hasAutoAssignedPartitions() to make sure that 
> assignFromSubscribed is going to be permitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16004:
---
Priority: Critical  (was: Major)

> Review new consumer inflight offset commit logic
> 
>
> Key: KAFKA-16004
> URL: https://issues.apache.org/jira/browse/KAFKA-16004
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, offset
> Fix For: 3.8.0
>
>
> New consumer logic for committing offsets handles inflight requests, to 
> validate that no commit requests are sent if a previous one hasn't received a 
> response. Review how that logic is currently applied to both, sync and async 
> commits and validate against the legacy coordinator, who seems to apply it 
> only for async commits. Review considering behaviour for auto-commit 
> (considering the expected behaviour for auto-commit on the interval, 
> auto-commit on reconciliation, auto-commit on close) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16103:
---
Priority: Critical  (was: Major)

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15839:
---
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Topic ID integration in consumer subscription state
> ---
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 4.0.0
>
>
> With the new consumer group protocol, assignments received by the consumer 
> contain topic IDs instead of topic names. Topic Ids are used in the 
> reconciliation path, integrated using TopicIdPartition. When reconciling, 
> topic names are resolved via a metadata update, but they are also kept in a 
> local #MembershipManager cache. This local cache serves the purpose of 
> keeping assigned topicId-names (that might have been deleted from metadata, 
> ex. topic deleted). 
> That's just an initial step towards spreading topic IDs internally in the 
> consumer code. Next step to address with this task would be to include topic 
> IDs in the subscription state, so that assigned topicId-names can be accessed 
> from other components without the need of resolving names multiple times.
> Note that this task aims only at spreading topic IDs internally in the 
> consumer, no changes to expose them at the API level. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15847) Consider partial metadata requests for client reconciliation

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15847:
---
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Consider partial metadata requests for client reconciliation
> 
>
> Key: KAFKA-15847
> URL: https://issues.apache.org/jira/browse/KAFKA-15847
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, reconciliation
> Fix For: 4.0.0
>
>
> New consumer implementing KIP-848 protocol needs to resolve metadata for the 
> topics received in the assignment. It does so by relying on the centralized 
> metadata object. Currently metadata updates requested through the metadata 
> object, request metadata for all topics. Consider allowing the partial 
> updates that are already expressed as an intention in the Metadata class but 
> not fully supported (investigate background in case there were some specifics 
> that led to this intention not being fully implemented) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15843:
---
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: callback, kip-848-client-support, reconciliation
> Fix For: 4.0.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Priority: Critical  (was: Major)

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending and sends an ack for it, but in 
> cases where the same assignment is received in different epochs this does not 
> work as expected.
> 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation 
> (delayed), and it receives the same assignment, but in a new epoch (ex. after 
> being FENCED). First time it receives the assignment it takes no action, as 
> it already has it as pending to reconcile, but when the reconciliation 
> completes it discards the result because the epoch changed. And this is 
> wrong. Note that after sending the assignment with the new epoch one time, 
> the broker continues to send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> (We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining)
> 2 - Client is not sending ack back to the broker in cases where it finishes a 
> reconciliation for the same assignment that it sent in the last HB (builder 
> will not include the assignment). Following sequence:
> - client owns T1-1 (last HB sent included ack for T1-1)
> - client receives [T1-1, T2-1] and start reconciling 
> - client receives T1-1 (meaning T2-1 needs to be revoked)
> - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it
> - next reconciliation starts for T1-1 and completes, but ack not sent 
> because the builder sees it's the same it sent on the last HB, leaving the 
> broker waiting for an ack that won't arrive. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription

2024-02-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16261:
---
Priority: Critical  (was: Minor)

> MembershipManagerImpl.updateSubscription fails if already empty subscription
> 
>
> Key: KAFKA-16261
> URL: https://issues.apache.org/jira/browse/KAFKA-16261
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The internal SubscriptionState object keeps track of whether the assignment 
> is user-assigned, or auto-assigned. If there are no assigned partitions, the 
> assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed 
> in this state it fails.
> The easiest thing is perhaps to check 
> SubscriptionState.hasAutoAssignedPartitions() to make sure that 
> assignFromSubscribed is going to be permitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16261:
---
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> MembershipManagerImpl.updateSubscription fails if already empty subscription
> 
>
> Key: KAFKA-16261
> URL: https://issues.apache.org/jira/browse/KAFKA-16261
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The internal SubscriptionState object keeps track of whether the assignment 
> is user-assigned, or auto-assigned. If there are no assigned partitions, the 
> assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed 
> in this state it fails.
> The easiest thing is perhaps to check 
> SubscriptionState.hasAutoAssignedPartitions() to make sure that 
> assignFromSubscribed is going to be permitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16258:
--

Assignee: Lianet Magrans

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Description: 
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)


We should review the poll expiration logic that triggers a leave group 
operation. That is currently applied in the HB Manager poll, without any 
validation, and given it depends on the consumer poll timer, it could happen at 
any time, regardless of the state of the member. Ex. poll timer could expire 
when the member is leaving, leading to this leaving->stale invalid transition. 
We should probably consider that this pro-active leave should only apply when 
the consumer is not leaving (prepare leaving or leaving)

  was:
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 

[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Labels: kip-848-client-support  (was: )

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500
 ] 

Lianet Magrans edited comment on KAFKA-16008 at 2/14/24 7:25 PM:
-

[~kirktrue] I just added the links to the issue that most probably is 
causing/related to this failure


was (Author: JIRAUSER300183):
[~kirktrue] I just added the links to the issue that most probably is causing 
this failure

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500
 ] 

Lianet Magrans commented on KAFKA-16008:


[~kirktrue] I just added the links to the issue that most probably is causing 
this failure

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Description: 
When the poll timer expires, the new consumer proactively leaves the group and 
clears its assignments, but it should also invoke the onPartitionsLost 
callback. The legacy coordinator does the following sequence on poll timer 
expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).

This issue is most probably what is causing the failures in the integration 
tests that fail expecting callbacks when the poll interval expires (like 
https://issues.apache.org/jira/browse/KAFKA-16008)

  was:When the poll timer expires, the new consumer proactively leaves the 
group and clears its assignments, but it should also invoke the 
onPartitionsLost callback. The legacy coordinator does the following sequence 
on poll timer expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).


> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16258:
--

 Summary: Stale member should trigger onPartitionsLost when leaving 
group
 Key: KAFKA-16258
 URL: https://issues.apache.org/jira/browse/KAFKA-16258
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


When the poll timer expires, the new consumer proactively leaves the group and 
clears its assignments, but it should also invoke the onPartitionsLost 
callback. The legacy coordinator does the following sequence on poll timer 
expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (effectively considering 
> UnknownTopicOrPartitionException as non-retriable in this context) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   >