[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848363#comment-17848363
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


By the way – if this test is failing, it's highly likely that this is a real 
bug. This test was designed to be completely deterministic (hence the seed) and 
doesn't set up any actual clients or clusters or anything that might make it 
flaky. [~mjsax] if you see this again can you report the failure here 
(including both the seed and rackAwareStrategy parameter)?

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848358#comment-17848358
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


Note: when reporting an instance of failure for this test, please include both 
the seed (contained in the stack trace, eg 
"runRandomizedScenario(-3595932977400264775)") and the full test name including 
parameters (not present in the original ticket description, eg 
"rackAwareStrategy=balance_subtopology")

We need the test name to figure out which variant of the assignor was running. 
This will help narrow down the issue to a specific rackAwareStrategy, as well 
as enable reproduction of the failed test

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848351#comment-17848351
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16586:


Failed again:

 
{code:java}
randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] 
– 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest

Stacktracejava.lang.AssertionError: Assertion failed in randomized test. 
Reproduce with: `runRandomizedScenario(-3595932977400264775)`. {code}
 

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM:
--

Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
KAFKA-15170

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though


was (Author: ableegoldman):
Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 

[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16361:


Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 
> 1, replicas = 

[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15170:
---
Fix Version/s: 3.8.0
   3.7.1

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15170.

Resolution: Fixed

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16277:
---
Fix Version/s: 3.7.1

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846180#comment-17846180
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16758:


I'm not super familiar with the new consumer, but for the legacy one at least I 
can say this should be very straightforward to implement, so I was thinking 
about applying the "newbie" label. Does anyone have a sense of whether this 
would be straightforward in the new consumer as well?

FWIW I personally do not have time to pick up the KIP anytime soon (though I'd 
be happy to help shepherd/review it) but I do frequently get asked about good 
starter work by new contributors, so I'm just wondering if this one would be a 
reasonable addition to the list. [~lianetm] [~schofielaj] [~dajac] any thoughts?

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846178#comment-17846178
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


FYI I filed https://issues.apache.org/jira/browse/KAFKA-16758 to cover the KIP 
for this new consumer feature. Let's use this ticket to track the issue in 
Streams specifically, which is obviously dependent on/blocked by this KIP.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16758:
--

 Summary: Extend Consumer#close with option to leave the group or 
not
 Key: KAFKA-16758
 URL: https://issues.apache.org/jira/browse/KAFKA-16758
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: A. Sophie Blee-Goldman


See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the full 
context.

Essentially we would get rid of the "internal.leave.group.on.close" config that 
is used as a backdoor by Kafka Streams right now to prevent closed consumers 
from leaving the group, thus reducing unnecessary task movements after a simple 
bounce. 

This would be replaced by an actual public API that would allow the caller to 
opt in or out to the LeaveGroup when close is called. This would be similar to 
the KafkaStreams#close(CloseOptions) API, and in fact would be how that API 
will be implemented (since it only works for static groups at the moment as 
noted in KAFKA-16514 )

This has several benefits over the current situation:
 # It allows plain consumer apps to opt-out of leaving the group when closed, 
which is currently not possible through any public API (only an internal 
backdoor config)
 # It enables the caller to dynamically select the appropriate action depending 
on why the client is being closed – for example, you would not want the 
consumer to leave the group during a simple restart, but would want it to leave 
the group when shutting down the app or if scaling down the node. This is not 
possible today, even with the internal config, since configs are immutable
 # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
that the user's choice to leave the group during close will be respected for 
non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846075#comment-17846075
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16361:


[~luked] it would help narrow down whether it was a regression vs a 
long-standing issue if you can try to reproduce it with some earlier versions. 
For example I believe the rack-aware assignment was added to the StickyAssignor 
in 3.5, so the first step might be to see if it's reproducible in 3.4. If so, 
then try a much older version to see if it's always been around – 2.3 would be 
a good place to start, since we  first implemented the "constrained" algorithm 
back in 2.4. 

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 8, leader 

[jira] [Comment Edited] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16644 at 4/30/24 8:57 PM:
-

[~mjsax] is KAFKA-14778 the correct issue that introduced a regression? That 
seems to link to an unrelated (and also unresolved) ticket


was (Author: ableegoldman):
[~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated 
(and also unresolved) ticket

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16644:


[~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated 
(and also unresolved) ticket

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841374#comment-17841374
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


I opened a quick example PR to showcase more clearly what I'm proposing here. 
It's definitely rather hacky, but as I said already, this functionality of not 
leaving the group when a consumer is closed was done in a hacky way to begin 
with (ie via internal consumer config introduced for use by Kafka Streams 
only). So we may as well fix this issue so the Streams closeOptions have 
correct semantics

The more I think about this the more I feel strongly that it's just silly for 
Streams users to be unable to opt-out of this "don't leave the group on close" 
behavior. It's not even possible to use the internal config since Streams 
strictly overrides it inside StreamsConfig. You can work around this by 
plugging in your own consumers via KafkaClientSupplier though that does feel a 
bit extreme. More importantly though, you'd still have to choose up front 
whether or not to leave the group on close, where you would obviously not know 
whether it makes sense to leave until you're actually calling close and know 
_why_ you're calling close (specifically, whether it's a temporary 
restart/bounce or "permanent" close)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841369#comment-17841369
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


The immutable internal config thing is definitely a bummer.

To recap: if we want to solve this so that the current Streams API – ie the 
#close(closeOptions) API – works as intended, ie for non-static members as 
well, we'd need to change the way the consumer works. Or wait for mutable 
configs, which would be nice, but realistically that's not happening soon 
enough.

To do this "right" we'd probably need to introduce a new public consumer API of 
some sort which would mean going through a KIP which could be a bit messy.

But as a slightly-hacky alternative, would it be possible to just introduce an 
internal API that works similar to the effect of the existing internal config, 
and just have Kafka Streams use that internal API without making it a "real" 
API and having to do a KIP? I mean that's basically what the internal config is 
anyways – an internal config not exposed to/intended for use by consumer 
applications and only introduced for Kafka Streams to use. Doesn't seem that 
big a deal to just switch from this immutable config to a new internal overload 
of #close (or even an internal #leaveGroupOnClose API that can be toggled 
on/off). Thoughts?

[~mjsax] [~cadonna] maybe you can raise this with someone who works on the 
clients to see if there are any concerns/make sure no one would object to this 
approach?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839850#comment-17839850
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16600:


Thanks [~aleung181] – definitely sounds like a bug. Hard to diagnose without 
more context though – can you share a bit more of the logs? A minute or so 
leading up to the error message should hopefully be enough 

> Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state 
> is PENDING_SHUTDOWN" during streams close
> ---
>
> Key: KAFKA-16600
> URL: https://issues.apache.org/jira/browse/KAFKA-16600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Alex Leung
>Priority: Minor
>
> From time to time, we observe the following ERROR message during streams 
> close:
> {code:java}
> 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
> PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
> {code}
> These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
> not changed any code related to streams shutdown.
> When the problem does not occur (most of the time), it looks like the 
> following:
> {code:java}
> 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
> PENDING_SHUTDOWN, all resources are being closed and the client will be 
> stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837820#comment-17837820
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16566:


I'm a bit confused – I thought we haven't migrated Streams over to the new 
consumer rebalancing protocol. Or is this referring to something else? What is 
the "classic" vs "consumer" protocol? And when/why did we migrate our system 
tests to using it? Does it have to do with static membership specifically?

 

Sorry for being out of the loop here

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836736#comment-17836736
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


I haven't gone back and re-read the KIP, but IIRC the reason for adding these 
CloseOptions was specific to solving an issue with static membership, hence why 
it only takes affect there.

That said – I completely agree that there's no reason why this should only work 
with static membership, and the decision to not leave the group for 
non-static-membership is one of those biases that Kafka Streams has in assuming 
persistent state stores.

I would fully support changing the behavior to work with non-static-members 
rather than just updating the javadocs to explain this. [~mjsax] would this 
need a KIP? Or can we just consider this a "bug" (especially since the javadocs 
make no mention that it's intended to only work on static members) and since we 
don't need any API changes, simply make the change without a KIP?

Either way – [~sal.sorrentino] would you be interested in picking this up?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833758#comment-17833758
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12506:


[~goyarpit] yes, it seems this ticket has been abandoned. Feel free to pick it 
up (and let me know if you have any questions)

 

[~kebab-mai-haddi] if you still want to work on this, just let me know, I'm 
sure there are multiple improvements that could be made here in parallel

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12506:
--

Assignee: Arpit Goyal  (was: Aviral Srivastava)

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16458) Add contains method in KeyValue store interface

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833749#comment-17833749
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16458:


This could be nice to have, but unfortunately given the overly complex way that 
state stores are implemented and the multi-layer hierarchy, something that 
should be simple – ie just adding a basic API to the StateStore interface – 
actually ends up being a massive amount of work.

We do plan to overhaul the state store hierarchy at some point in order to 
simplify the codebase, both for maintenance and new features, although there's 
no clear roadmap or promise for this to happen anytime soon. That said, I would 
personally suggest we hold off on adding any new APIs that don't add 
strictly-new functionality until after we've simplified the state store 
implementation. 

Of course, if this is something you really want, you're always free to kick off 
a KIP discussion whenever. Just wanted to provide some context and warn that 
this would not be as straightforward as it might seem to actually implement.

To your final question: I do think in some sense the reality is that yes, this 
API is not offered on purpose, in order to keep the interface as simple as 
possible. But this in itself would be less of a concern if the state store 
hierarchy was not such a hassle to expand and maintain, which is why I think 
the community would be open to it after we can get around to cleaning up the 
store implementation.

> Add contains method in KeyValue store interface
> ---
>
> Key: KAFKA-16458
> URL: https://issues.apache.org/jira/browse/KAFKA-16458
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Ayoub Omari
>Priority: Minor
>  Labels: needs-kip
>
> In some stream processors, we sometimes just want to check if a key exists in 
> the state store or not.
>  
> I find calling .get() and checking if the return value is null a little bit 
> verbose
> {code:java}
> if (store.get(key) != null) {
> }{code}
>  
> But I am not sure if it is on purpose that we would like to keep the store 
> interface simple.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820880#comment-17820880
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16277:


FYI I added you as a contributor so you should be able to self-assign tickets 
from now on. Thanks again for contributing a fix!

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-16277.

Resolution: Fixed

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16277:
---
Fix Version/s: 3.8.0

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16277:
--

Assignee: Cameron Redpath

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16295) Align RocksDB and in-memory store init() sequence

2024-02-21 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16295:
---
Labels: newbie  (was: )

> Align RocksDB and in-memory store init() sequence
> -
>
> Key: KAFKA-16295
> URL: https://issues.apache.org/jira/browse/KAFKA-16295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie
>
> Cf [https://lists.apache.org/thread/f4z1vmpb21xhyxl6966xtcb3958fyx5d] 
> {quote}For RocksDB stores, we open the store first and then call #register, 
> [...] However the in-memory store actually registers itself *first*, before 
> marking itself as open,[..].
> I suppose it would make sense to align the two store implementations to have 
> the same behavior, and the in-memory store is probably technically more 
> correct.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16295) Align RocksDB and in-memory store init() sequence

2024-02-21 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16295:
---
Summary: Align RocksDB and in-memory store init() sequence  (was: Align 
RocksDB and in-memory store inti() sequence)

> Align RocksDB and in-memory store init() sequence
> -
>
> Key: KAFKA-16295
> URL: https://issues.apache.org/jira/browse/KAFKA-16295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Cf [https://lists.apache.org/thread/f4z1vmpb21xhyxl6966xtcb3958fyx5d] 
> {quote}For RocksDB stores, we open the store first and then call #register, 
> [...] However the in-memory store actually registers itself *first*, before 
> marking itself as open,[..].
> I suppose it would make sense to align the two store implementations to have 
> the same behavior, and the in-memory store is probably technically more 
> correct.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2024-02-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818599#comment-17818599
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14597:


Hey [~atuljainiitk] I just came across this ticket for the first time, but for 
the most part I think your analysis makes sense. Are you still interested in 
picking this up? Specifically, reverting the linked change and getting the true 
system time for terminal nodes? It's been a while since that change was 
made/the KIP was implemented, so I don't remember all the context, but fetching 
the current time at terminal nodes sounds reasonable to me. Clearly the metric 
is otherwise useless, so we should either update it to be correct (and monitor 
for any potential performance impact) or just remove it entirely. And fixing it 
is obviously preferable, at least unless we know for a sure thing that it does 
hurt performance. I'm optimistic though

cc also [~talestonini] [~cadonna]

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818594#comment-17818594
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16277:


Hey [~credpath-seek] – first up, while it's been a while since I've looked at 
the sticky assignor code, I'm not too surprised that this might be the case. 
The obvious emphasis (per the name) was put on "stickiness" and 
partition-number balance, with good data parallelism ie topic-level balance 
being best-effort at most. 

That said, I suspect the assignor could be making a better effort. Presumably 
what is happening is that during the phase where it attempts to re-assign 
previously-owned partitions back to their former owner, we make a pass over a 
sorted list of previously-owned partitions that is grouped by topic. The 
assignor will then assign partitions from this list one-by-one to its previous 
owner until it hits the expected total number of partitions. So in the scenario 
you describe, it's basically looping over (t1p0, t1p1, t1p2, t1p3...t1pN, t2p0, 
t2p1, t2p2...t2pN) and assigning the first N partitions to the first consumer, 
which would be everything from topic 1, then just dumping the remaining 
partitions – all of which belong to topic 2 – onto the new consumer. 

The fix should be fairly simple – we just need to group this sorted list by 
partition, rather than by topic (ie t1p0, t2p0, t1p1, t2p1...t1pN, t2pN). Would 
you be interested in submitting a patch for this? 

As for what you can do right now: technically even if a fix for this was 
merged, you'd have to wait for the next release. However, the assignment is 
technically completely customizable, so in theory you could just copy/paste all 
the code from the patched assignor into a custom ConsumerPartitionAssignor 
implementation and then plug that in instead of the "cooperative-sticky" 
assignment strategy.

Otherwise, the workaround you suggest is a reasonable backup – with the obvious 
downside being that the two threads will have an unbalanced load between them, 
at least the overall node-level workload will be more even

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 

[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816234#comment-17816234
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16241:


Off-topic but I opened the logs for a second and there is a huge amount of 
spam. I know everyone is probably aware of the "Detected out-of-order KTable 
update" logspam by now, but there was over 100MB from that one log message 
alone (for a single node)...yikes

Something that's new, or new to me at least, is this:
{code:java}
[2024-02-09 03:23:55,168] WARN [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Detected that shutdown was 
requested. All clients in this app will now begin to shutdown 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2024-02-09 03:23:55,168] INFO [i-0fede2697f39580f9-StreamThread-1] [Consumer 
instanceId=ip-172-31-14-207-1, 
clientId=i-0fede2697f39580f9-StreamThread-1-consumer, groupId=stream-soak-test] 
Request joining group due to: Shutdown requested 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code}
Those two lines are logged repeatedly (and excessively) in a very tight loop at 
the end of the logs. Filtering those two lines out dropped the file size by 
another 200MB...that's a lot!! And it's all from about 10 seconds of real time. 
So it's a VERY busy loop...

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16055:
--

Assignee: Kohei Nozaki  (was: Kohei Nozaki)

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Assignee: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16055:
--

Assignee: Kohei Nozaki

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Assignee: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-02-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16055:
---
Fix Version/s: 3.8.0

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.8.0
>
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-24 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14404:
---
Fix Version/s: 3.8.0

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
> Fix For: 3.8.0
>
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16089:
---
Fix Version/s: 3.8.0

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Fix For: 3.8.0
>
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805801#comment-17805801
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16089:


Yeah, nice investigation! That was a tricky one

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-01-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804498#comment-17804498
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16025:


FYI [~sabitn] I added you as a Jira contributor so you should now be able to 
self-assign any tickets you are working on. Thanks again for the fix!

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Assignee: Sabit
>Priority: Major
> Fix For: 3.8.0
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-01-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-16025:
--

Assignee: Sabit

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Assignee: Sabit
>Priority: Major
> Fix For: 3.8.0
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-01-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16025:
---
Fix Version/s: 3.8.0

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Priority: Major
> Fix For: 3.8.0
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801927#comment-17801927
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16025:


Nice find and writeup of the race condition – I'll take a look at the fix

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Priority: Major
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801924#comment-17801924
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16055:


 See this discussion on the user mailing list for additional context: 
[https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16055:
---
Labels: newbie newbie++  (was: )

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789901#comment-17789901
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15798:


cc [~mjsax] [~wcarlson5] – maybe some motivation to remove, or at least block, 
the named topologies feature in 3.7

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789898#comment-17789898
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15798:


Took a quick look at this in the name of running down some of the worst flaky 
tests in Streams. I think it's pretty clear that this is failing because of the 
state updater thread (see below), but it's not as clear to me whether this 
hints at a real bug with the state updater thread or whether it only broke the 
named topologies feature.

If it's the latter, we should probably block people from using named topologies 
when the state updater thread is enabled in 3.7. Although I'm actually leaning 
towards going a step further and just taking out the named topologies 
altogether – we can just remove the "public" API classes for now, as extracting 
all the internal logic is somewhat of a bigger project that we shouldn't rush.

Of course, this is all assuming there is something about the state updater that 
broke named topologies – someone more familiar with the state updater should 
definitely verify that this isn't a real bug in normal Streams first! cc 
[~cadonna] [~lucasb] 

Oh, and this is how I know the state updater thread is responsible: if you look 
at [the graph of failure rates for this 
test|https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.streams.integration.NamedTopologyIntegrationTest=FLAKY=shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()],
 you'll see it goes from literally zero flakiness to the 2nd most commonly 
failing test in all of Streams on Oct 4th. This is the day we turned on the 
state updater thread by default.

(It's also a bit concerning that we didn't catch this sooner. The uptick in 
failure rate of this test is actually quite sudden. Would be great if we could 
somehow manually alert on this sort of thing)

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787681#comment-17787681
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15843:


Hey [~lianetm] I worked on the old ConsumerRebalanceListener a lot and can 
provide some context here. The reason #onPartitionsAssigned is still called on 
an empty set of partitions is largely historical, and the tl;dr is that it's 
probably ok to change this behavior in the new consumer if it won't impact the 
older one. 

For some background, in the old days of eager rebalancing (which is still the 
default protocol in the original consumer client), we would always invoke both 
#onPartitionsRevoked and #onPartitionsAssigned at the start and end of a 
rebalance, respectively. And since all partitions are revoked and re-assigned 
with eager rebalancing, there (usually) was a non-empty set of partitions 
passed into each of these.

Then came incremental cooperative rebalancing: we no longer revoked & 
reassigned all the partitions and instead acted only on the incremental change 
in partition assignment. So #onPartitionsRevoked only gets the subset of 
partitions that are being migrated to a different consumer, and 
#onPartitionsAssigned only gets newly-added partitions. Also, with the 
cooperative protocol, #onPartitionsRevoked would be invoked at the _end_ of a 
rebalance, rather than at the beginning.

However we still had to maintain compatibility across the two protocols for 
those implementing ConsumerRebalanceListener. And it was common to use the 
rebalance listener not just to listen in on the partition assignment, but to 
notify about the start and end of a rebalance. Therefore we decided to 
guarantee that #onPartitionsAssigned would still be invoked at the end of every 
rebalance, in case of users relying on this callback to detect the end of a 
rebalance. However, since #onPartitionsRevoked is no longer even invoked at the 
start of a cooperative rebalance, it can't be used to detect the start of one 
anymore and there was no reason to continue invoking it on every rebalance 
unless there were actually some partitions that were revoked. You'll notice 
that if the eager protocol is still enabled, the #onPartitionsRevoked callback  
actually is still invoked regardless of whether there's a non-empty set of 
partitions passed into it or not.

#onPartitionsLost is a bit of a special case, since (a) it was only added 
around the time cooperative rebalancing was implemented, as there was no old 
behavior for us to maintain compatibility with, and (b) it doesn't happen 
during a regular rebalance but instead only to notify the rebalance listener of 
a special case, ie that it has lost ownership of these partitions (but for that 
exact reason cannot commit offsets for them, as would normally occur in an 
#onPartitionsRevoked). If there aren't any lost partitions, there's no reason 
to invoke this callback (and it would be misleading to do so)

My understanding is that there is no "eager" or "cooperative" protocol in the 
new consumer, it's an entirely new protocol, so I would assume you're not 
obligated to maintain compatibility for existing ConsumerRebalanceListener 
implementations. In that case, it probably does not make sense to guarantee 
that #onPartitionsAssigned is invoked on every rebalance regardless, even if no 
new partitions are added. I'm not super familiar with the KIP-848 
implementation details, but I would assume that users can still use the 
ConsumerPartitionAssignor callbacks to effectively detect the start and end of 
a rebalance (via #subscriptionUserdata and #onAssignment)

Of course, if you intend to change the behavior in a way that would affect the 
old consumer as well, then you'll need to give Kafka Streams time to adopt a 
new approach since we currently still rely on #onPartitionsAssigned to notify 
us when a rebalance ends. I'm pretty sure we don't plan on using the new 
consumer right away though, since we'll need to make a number of changes like 
this one before we can do so.

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call 

[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787679#comment-17787679
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


Found the ticket: https://issues.apache.org/jira/browse/KAFKA-9398

And yes, it's still unresolved. 

Given all the above, I think we can honestly just disable/remove the test, as 
the named topologies feature was never made into a real public API. I do know 
of a few people who are using it anyway but they're aware it was only an 
experimental feature and not fully supported by Streams. So imo we don't need 
to go out of our way to fix any flaky tests: provided we can demonstrate that 
the issue is specific to named topologies and not potentially an issue with 
Streams itself. Of course in this case it's actually the latter, but we've 
recognized the root cause as a known issue, so I don't think there's anything 
more this test can do for us besides be flaky and annoy everyone.

Thanks for digging into this! 

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787678#comment-17787678
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


I just checked the current code and it looks like we do still respect the 
guarantee of invoking #onPartitionsAssigned in all cases. So I don't think step 
#4 is correct. Did you happen to see anything in the logs that would suggest 
the StreamThread was continuing in its regular loop and never stopping due to 
the rebalanceInProgress flag? Or is it possible that it's hanging somewhere in 
the shutdown process (or even in the rebalance itself)?

I'm just wondering if it might be related to the Producer, not the Consumer. I 
know we had some issues with the Producer#close hanging in the past, and that 
it was related to users deleting topics from under the app, which would be a 
similar situation to what you found here. I'm not sure if we ever fixed that, 
maybe [~mjsax] will remember the ticket for the Producer issue?

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787677#comment-17787677
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


Yeah great analysis, thanks [~gharris1727] 

I'm a bit confused by point #4, however – is this a change in behavior 
(possibly related to KIP-848)? It's my understanding that the 
#onPartitionsAssigned callback is guaranteed to always be invoked regardless of 
whether the set of partitions being newly assigned is non-empty or not. This is 
in contrast with the #onPartitionsRevoked and #onPartitionsLost callbacks, 
which are only invoked when the set of partitions to act upon is non-empty.

I think one could argue that this inconsistency is not ideal, but the behavior 
of always invoking #onPartitionsAssigned is a stated guarantee in the public 
contract of ConsumerRebalanceListener. See [this 
paragraph|https://github.com/apache/kafka/blob/254335d24ab6b6d13142dcdb53fec3856c16de9e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L67]
 of the javadocs. In other words, I don't think we can change this without a 
KIP, and if this behavior was modified recently then we need to revert that 
change until a KIP is accepted.

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15326) Decouple Processing Thread from Polling Thread

2023-11-06 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783436#comment-17783436
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15326:


[~lbrutschy] it looks like there are (at least) 10 PRs associated with this 
ticket/change, according to the naming sequence at least. But only half of them 
are linked in this ticket. Can you locate the remaining 5 and attach them to 
this ticket so the entire implementation is contained here?

> Decouple Processing Thread from Polling Thread
> --
>
> Key: KAFKA-15326
> URL: https://issues.apache.org/jira/browse/KAFKA-15326
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>
> As part of an ongoing effort to implement a better threading architecture in 
> Kafka streams, we decouple N stream threads into N polling threads and N 
> processing threads. The effort to consolidate N polling thread into a single 
> thread is follow-up after this ticket. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15782:
--

 Summary: Establish concrete project conventions to define public 
APIs that require a KIP
 Key: KAFKA-15782
 URL: https://issues.apache.org/jira/browse/KAFKA-15782
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


There seems to be no concrete definition that establishes project-specific 
conventions for what is and is not considered a public API change that requires 
a KIP. This results in frequent drawn-out debates that revisit the same topic 
and slow things down, and often ends up forcing trivial changes through the KIP 
process. For a recent example, KIP-998 was required for a one-line change just 
to add the "protected" access modifier to an otherwise package-private class. 
See [this comment 
thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for 
the full debate on this subject.

It would be beneficial and in the long run save us all time to just sit down 
and hash out the project conventions, such as whether a 
package-private/protected method on a non-final java class is to be considered 
a public API, even if the method itself is/was never a public method. This will 
of course require a KIP, but should help to establish some ground rules to 
avoid any more superfluous KIPs in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15781:
---
Description: 
See [https://github.com/apache/kafka/pull/14681]

 

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access

  was:See https://github.com/apache/kafka/pull/14681


> Change ProducerConfig(props, doLog) constructor to protected
> 
>
> Key: KAFKA-15781
> URL: https://issues.apache.org/jira/browse/KAFKA-15781
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> See [https://github.com/apache/kafka/pull/14681]
>  
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15781:
--

 Summary: Change ProducerConfig(props, doLog) constructor to 
protected
 Key: KAFKA-15781
 URL: https://issues.apache.org/jira/browse/KAFKA-15781
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman


See https://github.com/apache/kafka/pull/14681



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14419:
---
Summary: Failed SyncGroup leading to partitions lost due to processing 
during rebalances  (was: Same message consumed again by the same stream task 
after partition is lost and reassigned)

> Failed SyncGroup leading to partitions lost due to processing during 
> rebalances
> ---
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,923 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as 
> lost since 

[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780119#comment-17780119
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14419:


[~Carlstedt] first off, do you by any chance have especially long processing 
latencies? For example iterating over a large range of a state store, making a 
blocking remote call, etc

Since you're using the default max poll interval, I might recommend starting 
out by lowering the max.poll.records first, as you generally don't want to make 
the max.poll.interval too small – ideally that would be the last resort.

IIRC the default max.poll.records in Streams is set to 1,000 – so maybe try 
cutting that down to just 100? You also can (should) experiment a bit to find a 
good balance for your app in the steady state. Though I suppose if you're 
seeing this frequently enough/able to reproduce it reliably, you could set it 
to something extremely low as a test, like 10 let's say, just to see if that 
solves the issue.

I'll try to put together a PR for this sometime soon, maybe early next week, so 
you can also wait for that and trying running with a patched version of 
Streams. (Would a trunk/3.7 patched version be alright? I'm happy to create a 
branch with the fix ported to an earlier version of Kafka Streams if you'd 
prefer, just let me know which version you need)

Btw: I'm going to update the ticket title if you don't mind, so that it 
reflects the bug described in my last response. The current title is just 
describing the correct behavior of Kafka Streams working as intended

> Same message consumed again by the same stream task after partition is lost 
> and reassigned
> --
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  

[jira] [Comment Edited] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779731#comment-17779731
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-14419 at 10/26/23 10:49 PM:
---

Hey, sorry for the long delay – I'm still trying to catch up my memory of this 
ticket and the related one, but after looking at it again with fresh eyes I 
think I figured out what's going on here. If I'm reading this situation 
correctly, it does seem like there is some less-than-ideal behavior that we 
might be able to improve. 

Based on your recent logs, I think the root cause here is basically the same as 
what I fixed in 
[https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,],
 just to a lesser degree. The issue in that patch was that Streams would 
sometimes trigger a followup rebalance even while the current rebalance was 
still going on, which lead some members to drop out of the group upon hitting a 
REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just 
made the StreamThread wait until the rebalance was over before triggering a 
followup.

This should have been sufficient, but I suppose it is still theoretically 
possible to run into the same issue. Taking a deeper look at the original 
issue, it would only arise because of how Streams uses a non-blocking poll 
which allows it to return to its main loop and continue processing in the 
background during a rebalance. A lot of things happen throughout the loop, but 
the relevant operations here are as such:
 # Check the rebalance "schedule" and trigger one if:
 ## it has been requested for a time equal to or less than the current time
 ## the consumer is not actively participating in a rebalance (ie sometime 
after a SyncGroup response is received but before sending a new JoinGroup 
request)
 # Poll for more records, during which time either or both of the following may 
occur:
 ## consumer enters a new rebalance by sending a JoinGroup request
 ## consumer participates in a rebalance by receiving the JoinGroup response 
and sending a SyncGroup request
 ## consumer completes an ongoing rebalance by receiving a SyncGroup response, 
after which it can commit offsets for revoked tasks and initialize new ones
 # Process more records, which might have been either:
 ## Newly-consumed during the last poll call, or
 ## Left over from a previous batch that could not be fully processed before 
needing to return to poll due to running out of time in the max.poll.interval

So here's what I'm imagining: let's say we have two consumer, A and B, with A 
being the group leader/assignor.
 # A new rebalance begins, and both threads send their JoinGroup requests 
before returning to process some records
 # A doesn't have many records left to process, so it quickly returns to the 
poll call in step 2 of the loop. However B is still processing a large backlog
 # A performs the assignment and determines that a followup rebalance is 
needed, so it sets the rebalance schedule to 
 # After the assignment, A sends it out in the SyncGroup request and exits the 
poll call
 # A does some processing (or not) before returning to the poll and receiving 
the SyncGroup response
 # A exits the poll again, and this time when it reaches step 1 of the loop, it 
is now able to trigger the new rebalance
 # After A has requested a new rebalance, it finally returns to the poll call 
one more time, and rejoins the group/sends a JoinGroup request to kick it off
 # This whole time, B has had a large backlog of records, or a very high 
max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3
 # B finally finishes processing and leaves step 3, returning to the poll call 
during which it sends a very late SyncGroup request.
 # When the SyncGroup response is eventually received, B gets the 
REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is 
stale

The fundamental issue here is that B is theoretically able to spend up to the 
max.poll.interval between sending its SyncGroup request and returning to poll 
to process the SyncGroup response, but A might be able to process its SyncGroup 
response, process its records, and then trigger a new rebalance all in that 
timeframe. This could happen when the task assignment is heavily imbalanced, 
for example. 

I can see a few potential paths forward here, and a fourth option that is more 
of a temporary workaround for [~Carlstedt] if you're still encountering this. 
None of them are really a guarantee, but they would help. For the most 
comprehensive fix we might want to consider doing two or even all three of 
these:

Option 1: add a small delay to the Streams followup rebalance trigger to help 
the entire group finish the SyncGroup phase before beginning the next rebalance.

Option 2: set a shorter upper bound on the maximum time a StreamThread 

[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779731#comment-17779731
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14419:


Hey, sorry for the long delay – I'm still trying to catch up my memory of this 
ticket and the related one, but after looking at it again with fresh eyes I 
think I figured out what's going on here. If I'm reading this situation 
correctly, it does seem like there is some less-than-ideal behavior that we 
might be able to improve. 

Based on your recent logs, I think the root cause here is basically the same as 
what I fixed in 
[https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,],
 just to a lesser degree. The issue in that patch was that Streams would 
sometimes trigger a followup rebalance even while the current rebalance was 
still going on, which lead some members to drop out of the group upon hitting a 
REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just 
made the StreamThread wait until the rebalance was over before triggering a 
followup.

This should have been sufficient, but I suppose it is still theoretically 
possible to run into the same issue. Taking a deeper look at the original 
issue, it would only arise because of how Streams uses a non-blocking poll 
which allows it to return to its main loop and continue processing in the 
background during a rebalance. A lot of things happen throughout the loop, but 
the relevant operations here are as such:
 # Check the rebalance "schedule" and trigger one if:
 ## it has been requested for a time equal to or less than the current time
 ## the consumer is not actively participating in a rebalance (ie sometime 
after a SyncGroup response is received but before sending a new JoinGroup 
request)
 # Poll for more records, during which time either or both of the following may 
occur:
 ## consumer enters a new rebalance by sending a JoinGroup request
 ## consumer participates in a rebalance by receiving the JoinGroup response 
and sending a SyncGroup request
 ## consumer completes an ongoing rebalance by receiving a SyncGroup response, 
after which it can commit offsets for revoked tasks and initialize new ones
 # Process more records, which might have been either:
 ## Newly-consumed during the last poll call, or
 ## Left over from a previous batch that could not be fully processed before 
needing to return to poll due to running out of time in the max.poll.interval

So here's what I'm imagining: let's say we have two consumer, A and B, with A 
being the group leader/assignor.
 # A new rebalance begins, and both threads send their JoinGroup requests 
before returning to process some records
 # A doesn't have many records left to process, so it quickly returns to the 
poll call in step 2 of the loop. However B is still processing a large backlog
 # A performs the assignment and determines that a followup rebalance is 
needed, so it sets the rebalance schedule to 
 # After the assignment, A sends it out in the SyncGroup request and exits the 
poll call
 # A does some processing (or not) before returning to the poll and receiving 
the SyncGroup response
 # A exits the poll again, and this time when it reaches step 1 of the loop, it 
is now able to trigger the new rebalance
 # After A has requested a new rebalance, it finally returns to the poll call 
one more time, and rejoins the group/sends a JoinGroup request to kick it off
 # This whole time, B has had a large backlog of records, or a very high 
max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3
 # B finally finishes processing and leaves step 3, returning to the poll call 
during which it sends a very late SyncGroup request.
 # When the SyncGroup response is eventually received, B gets the 
REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is 
stale

The fundamental issue here is that B is theoretically able to spend up to the 
max.poll.interval between sending its SyncGroup request and returning to poll 
to process the SyncGroup response, but A might be able to process its SyncGroup 
response, process its records, and then trigger a new rebalance all in that 
timeframe. This could happen when the task assignment is heavily imbalanced, 
for example. 

I can see a few potential paths forward here, and a fourth option that is more 
of a temporary workaround for [~Carlstedt] if you're still encountering this. 
None of them are really a guarantee, but they would help. For the most 
comprehensive fix we might want to consider doing two or even all three of 
these:

Option 1: add a small delay to the Streams followup rebalance trigger to help 
the entire group finish the SyncGroup phase before beginning the next rebalance.

Option 2: set a shorter upper bound on the maximum time a StreamThread can 
spend processing records while there is an 

[jira] [Resolved] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15116.

Resolution: Not A Problem

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779725#comment-17779725
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15116:


I tend to agree with Matthias here – this is definitely not a bug in Kafka 
Streams, and the behavior you describe in the most recent message sounds like 
it is working as intended. You can certainly disable cooperative rebalancing by 
setting the UPDATE_FROM config to something 2.3 or below, but as [~mjsax] 
mentioned, we plan to remove support for eager rebalancing entirely in an 
upcoming release so this would only be a temporary fix at best.

I say "at best" because it sounds to me like even without cooperative 
rebalancing, you're relying on implementation details that are not part of the 
public contract. That's not an accusation, it just means you're putting 
yourself at risk of breaking changes to your application, like cooperative 
rebalancing, because the semantics around when to commit are considered 
internal to Kafka Streams and can change at any time.

That said, I do agree that it would be nice if Streams had better support for 
blocking operations or long-running RPCs. Unfortunately that's just not the 
case at the moment, so you kind of have to take what you can get, or else 
rewrite as a plain producer/consumer app. Honestly, while I'm a big believer 
that almost any kind of application that follows the consume-process-produce 
pattern can and should be represented as a Streams app, I think this might be 
one of the few exceptions where it really does make more sense to implement 
with the plain clients. It sounds like part of the application logic is already 
driven by an external consumer, which together with the fact that you have 
strict requirements around committing as well as a shared state with external 
calls, indicates to me that Streams may not be the right tool for the job. Sort 
of by definition Streams is supposed to abstract away all the client 
interactions and all the offset commit logic, in addition to being shardable 
such that everything you need to process a stream of records for a given 
partition is local to that shard. So it's hard to imagine how to fit your 
application logic into a Streams app in a completely "safe" way.

Again, this doesn't mean you can't/shouldn't try to stretch the boundaries of 
Kafka Streams, but I think it makes sense to close this ticket as a bug given 
that everything is working as intended. However I'd encourage you to consider 
filing a "New Feature" ticket for some specific functionality that would help 
with your use case. I believe there is already one for better blocking API/RPC 
support, but I think the ability to pause processing in Kafka Streams would be 
nice to have in general, and could be used to pause Streams during a rebalance 
to solve the issue you're facing.

Hope that all makes sense! If you do want to consider rewriting this with the 
plain clients, I'm happy to give some pointers. It sounds like your Streams 
application is very complex already so I have to wonder if it might be more 
simple to write up outside the framework of Kafka Streams

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12550.

Resolution: Won't Fix

Closing this out since it's usefulness is preempted by the StateUpdaterThread 
and having moved restoration out of the main StreamThread

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12550:
---
Fix Version/s: (was: 4.0.0)

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15463.

Resolution: Not A Problem

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779715#comment-17779715
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15463:


[~yevsh]  Well the intended pattern is to get the StateStore from the context 
during init and then save a reference to the store for your 
processor/transformer. That's why we pass the context in to #init but not to 
#process (also, while it's not a significant overhead, it's definitely more 
efficient to do the state store lookup only once, during init, and not on every 
single record that's processed)

That said, I think it's a fair question as to why this is causing an error, and 
why the error doesn't happen every time. The fact that it generally only 
appears when you have more than one partition/task per application instance 
makes me think there is some state that is somehow being shared between the 
different tasks. This would definitely be explained by returning the same 
transformer instance each time, but based on your latest update, you are 
definitely returning a new transformer each time and still seeing the issue, 
right?

Assuming so, I'm inclined to believe the issue is with Spring. I vaguely recall 
a similar problem being reported in the past, which was ultimately because a 
Spring object was unexpectedly/unknowingly acting as a static/singleton class. 
This was resulting in the context – of which there is supposed to be exactly 
one per task – being shared between different instances of the task/processor 
nodes.

I'm pretty sure that's what is going on here as well. I'm guessing that the 
MyService class is a Spring bean? If so, then it's effectively a singleton and 
will be shared by each of the individual Transformer instances in your 
application, meaning the different tasks will be overwriting each others 
context when invoking #init on this transformer. So the context you retrieve 
from the myServiceBean during #process may not be the same as the context you 
saved to it in #init, causing it to throw this error since only the context 
corresponding to the task that is currently being processed will have the 
currentNode set to a non-null value.

Even if you made the change I originally suggested but saved the StateStore 
reference by passing it to the MyService bean, it wouldn't work – it might not 
throw this error but you would potentially be reading and writing to the wrong 
copy of a state store for a given task, which is even worse. The only way to 
solve this is by removing the Spring bean entirely or at least refactoring it 
so that it doesn't hold any internal state and has to have the full application 
state for that task passed in to it every time – in other words you just need 
to make sure to keep all the objects used by a given transformer completely 
local to that instance. Here is my recommendation for how to implement your 
transformer class – hope this helps!


private final MYService myServiceBean;
private StateStore myStore;

@Overridepublic void init(ProcessorContext context) \{
    myStore = context.getStateStore(STORE_NAME);
}

@Overridepublic KeyValue transform(String key, MyItem myItem) \{
    myServiceBean.process(myItem, myStore);
}
Basically modify the MyService bean to accept the StateStore to operate on as a 
parameter to its #process method. And definitely keep the fix in which you 
return a new Transformer instance each time instead of reusing the same one.

Let me know if you have any questions! I'm going to close the ticket since I'm 
fairly confident in this solution having seen the same problem before, but 
definitely please do reopen it if you implement the suggested fix and still 
encounter an error. Good luck!

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> 

[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779695#comment-17779695
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13152:


Hey [~sagarrao] – I'm happy to try and help you get this into 3.7. As a 
disclaimer I do have a few other KIPs I've already promised to help land in 3.7 
so this would be 3rd on my list, but we still have a good amount of time and 
given the KIP itself is already accepted, I think we can make it.

Just give me a ping on the PR when it's ready for me to take a look. And just 
to refresh my memory, all the caching work – both config and metrics -- are 
already merged, so the only thing remaining is to add (and implement) the new 
input.buffer.max.bytes config. Does that sound right?

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15571:
---
Fix Version/s: 3.5.2
   3.7.0
   3.6.1

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15571.

Resolution: Fixed

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15520) Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on all partitions

2023-09-29 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770635#comment-17770635
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15520:


{quote}Also added
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
CooperativeStickyAssignor.class.getName());

to enable CooperativeStickyAssignor
{quote}
To clarify, Streams will always override and ignore this setting because it 
needs to plug in the StreamsPartitionAssignor – so you don't need this, at 
least not for any Streams applications. It should use cooperative rebalancing 
by default as of version 2.4 and above.

I guess the one thing to watch out for is that cooperative rebalancing will not 
be enabled if you upgrade from a lower version and forget/skip the step to 
remove the StreamsConfig.UPGRADE_FROM property. So just make sure that's not 
being set anywhere (at least, not set to a version that's lower than 2.4)

> Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on 
> all partitions
> 
>
> Key: KAFKA-15520
> URL: https://issues.apache.org/jira/browse/KAFKA-15520
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Rohit Bobade
>Priority: Major
>
> Kafka broker version: 2.8.0 Kafka Streams client version: 2.6.2
> I am running kafka streams stateful aggregations on K8s statefulset with 
> persistent volume attached to each pod. I have also specified
> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, podName);
> which makes sure it gets the sticky partition assignment.
> Enabled standby replica - 
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> and set props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "0");
> However, I'm seeing that when pods restart - it triggers rebalances and 
> causes processing to be paused on all pods till the rebalance and state 
> restore is in progress.
> My understanding is that even if there is a rebalance - only the partitions 
> that should be moved around will be restored in a cooperative way and not 
> pause all the processing. Also, it should failover to standby replica in this 
> case and avoid state restoring on other pods.
> I have increased session timeout to 480 seconds and max poll interval to 15 
> mins to minimize rebalances.
> Also added
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CooperativeStickyAssignor.class.getName());
> to enable CooperativeStickyAssignor
> could someone please help if I'm missing something?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-09-22 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768141#comment-17768141
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15463:


[~yevsh] try moving the 
KeyValueStore store =   
context.getStateStore("storeName");
from the #process method to the #init method of your processor

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed

2023-08-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760632#comment-17760632
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14294:


I'm not sure that workaround would help, as it follows the same mechanism for 
requesting a commit as the actual invocation of the punctuator, which is what 
is "broken" by this bug. Specifically, both a punctuator invocation and the 
`context.commit` API just set the `commitNeeded` flag. However the bug here is 
that this flag is never checked at all, and we skip over the commit entirely 
when there are no new input records processed. The only guarantee is really to 
upgrade to a version with the fix, ie 3.4 or above

> Kafka Streams should commit transaction when no records are processed
> -
>
> Key: KAFKA-14294
> URL: https://issues.apache.org/jira/browse/KAFKA-14294
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0, 3.2.1
>Reporter: Vicky Papavasileiou
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: image-2023-02-08-10-22-20-456.png
>
>
> Currently, if there are no records to process in the input topic, a 
> transaction does not commit. If a custom punctuator code is writing to a 
> state store (which is common practice) the producer gets fenced when trying 
> to write to the changelog topic. This throws a TaskMigratedException and 
> causes a rebalance. 
> A better approach would be to commit a transaction even when there are no 
> records processed as to allow the punctuator to make progress. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed

2023-08-30 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14294:
---
Affects Version/s: 3.1.0

> Kafka Streams should commit transaction when no records are processed
> -
>
> Key: KAFKA-14294
> URL: https://issues.apache.org/jira/browse/KAFKA-14294
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0, 3.2.1
>Reporter: Vicky Papavasileiou
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: image-2023-02-08-10-22-20-456.png
>
>
> Currently, if there are no records to process in the input topic, a 
> transaction does not commit. If a custom punctuator code is writing to a 
> state store (which is common practice) the producer gets fenced when trying 
> to write to the changelog topic. This throws a TaskMigratedException and 
> causes a rebalance. 
> A better approach would be to commit a transaction even when there are no 
> records processed as to allow the punctuator to make progress. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-15308:
--

Assignee: Rohan Desai

> Wipe Stores upon OffsetOutOfRangeException in ALOS
> --
>
> Key: KAFKA-15308
> URL: https://issues.apache.org/jira/browse/KAFKA-15308
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Colt McNealy
>Assignee: Rohan Desai
>Priority: Minor
>
> As per this [Confluent Community Slack 
> Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
>  Streams currently does not wipe away RocksDB state upon encountering an 
> `OffsetOutOfRangeException` in ALOS.
>  
> `OffsetOutOfRangeException` is a rare case that occurs when a standby task 
> requests offsets that no longer exist in the topic. We should wipe the store 
> for three reasons:
>  # Not wiping the store can be a violation of ALOS since some of the 
> now-missing offsets could have contained tombstone records.
>  # Wiping the store has no performance cost since we need to replay the 
> entirety of what's in the changelog topic anyways.
>  # I have heard (not yet confirmed myself) that we wipe the store in EOS 
> anyways, so fixing this bug could remove a bit of complexity from supporting 
> EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750495#comment-17750495
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15297:


Were you able to (re)produce this issue? I'm a bit surprised because I always 
thought the state stores were maintained in strict topological order, both when 
building them initially and then when registering them.

The stores are flushed in the order that they are registered, which corresponds 
to the order they are added to the topology, which in turn *should* reflect the 
topological order of the attached processor nodes.

The original ordering comes from InternalTopologyBuilder#build and the 
topologically-sorted nodeFactories map. This builds up the stateStoreMap in 
topological order, no?

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-14976:
--

Assignee: Almog Gavra

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14976:
---
Labels: kip  (was: needs-kip)

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748314#comment-17748314
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14976:


Will be addressed by 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746236#comment-17746236
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


[~jwreschnig] I'm sure we can find someone to pick up the KIP for a 
full-fledged fix like I proposed, so no worries – it seems reasonable to me to 
for us to just reuse the static membership group.instance.id for the time 
being, if it's set, and punt on the generalized feature for now. Would you be 
interested in just doing a small PR for this case instead? Happy to review such 
a thing if so

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-07-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15215:
--

 Summary: The default.dsl.store config is not compatible with 
custom state stores
 Key: KAFKA-15215
 URL: https://issues.apache.org/jira/browse/KAFKA-15215
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: Almog Gavra


Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
default.dsl.store config, it was decided to scope the initial KIP to just the 
two out-of-the-box state stores types offered by Streams, rocksdb and 
in-memory. The reason being that this would address a large number of the 
relevant use cases, and could always be followed up with another KIP for custom 
state stores if/when the demand arose.

Of course, since rocksdb is the default anyways, the only beneficiaries of this 
KIP right now are the people who specifically want only in-memory stores – yet 
custom state stores users are probably by far the ones with the greatest need 
for an easier way to configure the store type across an entire application. And 
unfortunately, because the config currently relies on enum definitions for the 
known OOTB store types, there's not really any way to extend this feature as it 
is to work with custom implementations.

I think this is a great feature, which is why I hope to see it extended to the 
broader user base. Most likely we'll want to introduce a new config for this, 
though whether it replaces the old default.dsl.store config or complements it 
will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-07-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15215:
---
Labels: needs-kip  (was: )

> The default.dsl.store config is not compatible with custom state stores
> ---
>
> Key: KAFKA-15215
> URL: https://issues.apache.org/jira/browse/KAFKA-15215
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
>
> Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
> default.dsl.store config, it was decided to scope the initial KIP to just the 
> two out-of-the-box state stores types offered by Streams, rocksdb and 
> in-memory. The reason being that this would address a large number of the 
> relevant use cases, and could always be followed up with another KIP for 
> custom state stores if/when the demand arose.
> Of course, since rocksdb is the default anyways, the only beneficiaries of 
> this KIP right now are the people who specifically want only in-memory stores 
> – yet custom state stores users are probably by far the ones with the 
> greatest need for an easier way to configure the store type across an entire 
> application. And unfortunately, because the config currently relies on enum 
> definitions for the known OOTB store types, there's not really any way to 
> extend this feature as it is to work with custom implementations.
> I think this is a great feature, which is why I hope to see it extended to 
> the broader user base. Most likely we'll want to introduce a new config for 
> this, though whether it replaces the old default.dsl.store config or 
> complements it will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15190:
---
Labels: needs-kip  (was: kip)

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15190:
---
Labels: kip  (was: )

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743313#comment-17743313
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I would be a bit hesitant to overly rely on the group.instance.id because not 
everyone uses or wants static membership, and that config is completely coupled 
to the feature. Perhaps we can reuse the group.instance.id as the process id 
only if/when static membership is already being used, which would not 
necessarily even require a KIP (maybe), but we'd still need to introduce a new 
config for the general use case.

It's a bummer because of course, practically speaking, this new config would 
have exactly the same meaning as the group.instance.id – a unique, persistent 
identifier for each client. It would have been the perfect config for this use 
case if not for Kafka's habit of being overly clever about reusing configs to 
enable/disable the related feature, in addition to their actual usage.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743311#comment-17743311
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I'm all for this, though it will need a KIP. Would you be interested in writing 
one? Happy to help you with the process if so.

As for the meantime, perhaps you guys can get some relief by just writing this 
processId directly upon setup, before starting the Streams app? I believe it 
just expects a plain UUID at the moment, so you should be able to write a 
function that hashes this container id to something of that form and then 
persist it to disk in exactly the same way as Streams.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742151#comment-17742151
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15178:


Nice catch!

> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix, patch-available
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15045) Move Streams task assignor to public configs

2023-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15045:
--

 Summary: Move Streams task assignor to public configs
 Key: KAFKA-15045
 URL: https://issues.apache.org/jira/browse/KAFKA-15045
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721546#comment-17721546
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14981:


Have we resolved all the known issues with static membership? IIRC there were 
some that required broker-side changes, could we accidentally introduce 
correctness-related bugs in Streams applications running against older 
clusters? 

Maybe I'm being paranoid, but I thought we had begun to recommend against using 
static membership in Streams (due to the above and/or general stability issues, 
though perhaps most/all of those have been sorted out by now – do we know?)

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-05-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14976:
--

 Summary: Left/outer stream-stream joins create KV stores that 
aren't customizable
 Key: KAFKA-14976
 URL: https://issues.apache.org/jira/browse/KAFKA-14976
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


It appears that we only give the illusion of full customizability when it comes 
to the state stores of a windowed join. This arose due to an 
[optimization|https://github.com/apache/kafka/pull/11252] for the performance 
of the spurious results fix, and means that these joins now come with one 
additional, and possibly unexpected, state store:

 
{code:java}
final StoreBuilder, 
LeftOrRightValue>> builder =
            new ListValueStoreBuilder<>(
         |--[   persistent ? 
this-->  |         Stores.persistentKeyValueStore(storeName) : 
         |--[      Stores.inMemoryKeyValueStore(storeName),
                timestampedKeyAndJoinSideSerde,
                leftOrRightValueSerde,
                Time.SYSTEM
            ); {code}
 

where persistent is defined above that as
{code:java}
final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || 
streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
 
This means regardless of whether a custom state store implementation was passed 
in to the join, we will still insert one of our RocksDB or InMemory state 
stores. Which might be very surprising since the API makes it seem like the 
underlying stores are fully configurable.

I'm adding a warning line for this in PR 
[#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
 but we should really make this hidden state store fully configurable like the 
window stores currently are (which will require a KIP)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-05-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14976:
---
Labels: needs-kip  (was: )

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-04 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719572#comment-17719572
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13891:


Thanks [~pnee] – just to round out the above, the actual fix was in 
[pull/13550|https://github.com/apache/kafka/pull/13550]

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2023-05-04 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719570#comment-17719570
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14016:


Nice job [~pnee] – thanks for taking the time to patch this finally. To 
clarify, since there are a lot of tickets and PRs floating around here, the 
assignor fix is in [pull/13550|https://github.com/apache/kafka/pull/13550]

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-rebalance-should-fix
> Fix For: 3.5.0, 3.4.1
>
> Attachments: CooperativeStickyAssignorBugReproduction.java
>
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2023-01-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14553:
---
Fix Version/s: 3.4.0

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Priority: Minor
> Fix For: 3.4.0, 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680403#comment-17680403
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


I'll give it a few more days before re-enabling both parameters, but so far 
I've got a few runs in with only the `false` parameter enabled and this seems 
to have fixed the flakiness.

Can't really envision why the state updater would/could the listOffsets request 
to fail in the way shown above, but it really does appear to be something about 
enabling this that so badly broke the SmokeTestDriverIntegrationTest

Definitely need to look into this before we consider publicly releasing the 
state updater feature cc [~cadonna] [~lbrutschy] [~guozhang] 

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14650:
--

 Summary: IQv2 can throw ConcurrentModificationException when 
accessing Tasks 
 Key: KAFKA-14650
 URL: https://issues.apache.org/jira/browse/KAFKA-14650
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.4.0
Reporter: A. Sophie Blee-Goldman


>From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, 
>log=true, supplier=IN_MEMORY_WINDOW, 
>kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]*
java.util.ConcurrentModificationException
at 
java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
at java.base/java.util.HashMap.putMapEntries(HashMap.java:508)
at java.base/java.util.HashMap.putAll(HashMap.java:781)
at 
org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361)
at 
org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537)
at 
org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278)
at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680141#comment-17680141
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14639:


Thanks for the additional logs, that does indeed verify that both consumers 
participated in the same rebalance. My next guess would be that for some 
reason, consumer-3 had "lost" its partitions prior to joining the group in gen 
640, in which case they would be allowed to be freely given away to another 
consumer in that same generation. Can you check and/or provide all the logs 
from consumer-3 between gen 639 and gen 640? Is there anything in there about 
resetting the generation, dropping out of the group, resetting the member id, 
anything at all like that? 

I also notice that the assignment changes drastically between gen 639 and 640, 
it's not "sticky" at all which should have been easy for the assignor to do if 
the previous assignment was something relatively simple like each consumer 
claiming exactly one or two partitions (only) and all from the same topic. 
Something fishy is definitely going on.

The only other thing off the top of my head to check would be that every single 
consumer was configured with (only) the CooperativeStickyAssignor over the full 
period from gen 639 through the end of gen 640, or at least check the group 
leader (consumer-5 I think?) and consumers 3 & 4.

I'll take a look at the assignor logic and see if anything jumps out at me on 
my end, but I have to say the complete lack of stickiness in the assignment 
from 639 to 640 is fairly perplexing and something I have never seen before 
with the CooperativeStickyAssignor. There have been some recents bugs related 
to rebalancing edge cases that have been fixed over the past few versions, so I 
may go back over those and see if anything was messed up by them. I did in fact 
discover one bug affecting rebalancing/assignment in the past few months which 
had been introduced by that series of fixes, so perhaps there is another.

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> 
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Bojan Blagojevic
>Priority: Major
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> 

[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680134#comment-17680134
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


FYI after disabling the `false` parameter I immediately saw another failure, 
which points to the state updater as the culprit after all.

 

I did one final PR to verify this by disabling the `true` build and enabling 
the `false` build again – https://github.com/apache/kafka/pull/13156

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679735#comment-17679735
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-14533 at 1/23/23 11:37 PM:
--

First PR to disable the `false` build parameter: 
[https://github.com/apache/kafka/pull/13147]

Followup PR to re-enable it: 
[https://github.com/apache/kafka/pull/13155|https://github.com/apache/kafka/pull/13155]


was (Author: ableegoldman):
https://github.com/apache/kafka/pull/13147

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679735#comment-17679735
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


https://github.com/apache/kafka/pull/13147

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679729#comment-17679729
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


Not sure why this is happening, but the cause seems to be that the listOffsets 
request we make during the rebalance is failing:
{code:java}
[2023-01-20 23:34:28,256] WARN The listOffsets request failed. 
(org.apache.kafka.streams.processor.internals.ClientUtils:154)
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:152)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:690)
 {code}
I see that in the beginning of the logs in all the failed runs so far and none 
of the passes. Unfortunately I haven't been able to reproduce locally so I have 
to go by the Jenkins build logs, which truncate everything in the middle and 
don't show what happens next. But presumably this call is failing repeatedly, 
or potentially other calls are failing as well – whatever is wrong happens for 
10 minutes until the global timeout is triggered.

I also noticed that it seems to be almost always the stateUpdaterEnabled = true 
builds that fail, a parameter that was added just a few weeks before this 
ticket was filed. I doubt it's actually the state updater causing this, but I'm 
going to disable the `false` parameter temporarily to see if it's actually the 
"true" run that's broken or it has to do with the parametrization. Also because 
I need to get a clean build for the 3.4 release and this is failing at least 
once in almost every single build :/

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> 

[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-01-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678981#comment-17678981
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14639:


There's definitely something off, it almost looks like consumer-3 actually 
missed the gen 640 rebalance altogether – which would definitely explain the 
symptom you describe, as partition 74 would be free to assign to consumer-4 if 
consumer-3 didn't join the group in time to report partition-74 among its 
"ownedPartitions". 

Reading from the bottom up, the first 3 lines show consumer-4 going through the 
entire rebalance and getting partition-74 assigned, all before consumer-3 even 
joins the group in gen 640. So that all adds up so far. What seems odd to me is 
that we then see consumer-3 appear to "successfully join the group" AND 
"successfully sync the group" for that same generation, which should not be the 
case if it missed the rebalance entirely – it should fail on JoinGroup if it 
truly did not manage to join the gen 640 rebalance in time, and certainly 
should fail the SyncGroup. So that does seem weird to me

If you're able to reproduce this, could you try doing so with DEBUG logging 
enabled? In the meantime, I might be able to make more sense of what's going on 
if you can upload the full logs over this period of time

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> 
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Bojan Blagojevic
>Priority: Major
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer 

[jira] [Commented] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2022-12-31 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653273#comment-17653273
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14553:


For some reason the release script was complaining about this ticket being 
marked for 3.4 & not resolved, so I'm just going to remove the 3.4 from the Fix 
Version while I try to generate the 3.4 RC. Not sure what's going on but I'll 
put things back as they were when it's done.

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Priority: Minor
> Fix For: 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2022-12-31 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14553:
---
Fix Version/s: (was: 3.4.0)

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Priority: Minor
> Fix For: 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >