[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-12 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603328#comment-17603328
 ] 

Jason Gustafson commented on KAFKA-14196:
-

>  Also, this is currently marked as a blocker. Is there a crisp description of 
>the regression?

Prior to revocation, eager rebalance strategies will attempt to auto-commit 
offsets before revoking partitions and joining the rebalance. Originally this 
logic was synchronous, which meant there was no opportunity for additional data 
to be returned before the revocation completed. This changed when we introduced 
asynchronous offset commit logic. Any progress made between the time the 
asynchronous offset commit was sent and the revocation completed would be lost. 
This results in duplicate consumption.

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-11 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17602912#comment-17602912
 ] 

Philip Nee commented on KAFKA-14196:


[~ijuma] - I think that's right, according to the release notes.  I can add the 
description but I don't really know where, do you mean by updating the 
description/title of this ticket?

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD
>  
> https://github.com/apache/kafka/pull/12603



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-11 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17602906#comment-17602906
 ] 

Ismael Juma commented on KAFKA-14196:
-

To clarify, this was introduced in 3.2.1 (not 3.2.0), correct?

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD
>  
> https://github.com/apache/kafka/pull/12603



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17602049#comment-17602049
 ] 

Guozhang Wang commented on KAFKA-14196:
---

Thanks Philip, and regarding your two questions above I agree with [~showuon]'s 
thoughts as well. Especially for 1), I think even if subscriptions changed in 
between consecutive onJoinPrepare, as long as they will not change the assigned 
partitions (i.e. as long as `assignFromSubscribed()` has not called) I think we 
are fine, since the returned records depend on that assigned partitions.

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD
>  
> https://github.com/apache/kafka/pull/12603



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601609#comment-17601609
 ] 

Luke Chen commented on KAFKA-14196:
---

# Should we consider the difference between cooperative and eager protocol.  
Because, cooperative doesn't revoke all partitions.  However, I worry that the 
subscription might change during the onJoinPrepare, so I meant there could be 
edge cases we need to handle here.

--> I think we should consider the difference between cooperative and eager 
protocol, because one of the purpose for cooperative rebalance is to allow 
"non-revoking" partitions can keep processing during rebalance. About the edge 
case, I think that's fine because in the your PR, we'll check and pause the 
partitions each time we enter onJoinPrepare, right? So, even if there's 
subscription change while we're waiting commitAsync, we can pause the updated 
subscription partitions in onJoinPrepare each time. WDYT?
 # I believe this only applies to autocommit enabled.  I think for 
non-autocommit case, user should handle the offset during the revocation, so we 
are good there?

--> Yes, we only need to worry about autocmmit enabled case

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
> 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601523#comment-17601523
 ] 

Philip Nee commented on KAFKA-14196:


[~showuon] and [~guozhang] - I think pausing should probably work, and it's 
also kind of convenient because the partition revocation will unpausing these 
partition automatically.  Let me know if you think the draft is ok, I'll add 
tests later on: https://github.com/apache/kafka/pull/12603

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601379#comment-17601379
 ] 

Philip Nee commented on KAFKA-14196:


Thanks Luke and GW, it looks like we could just pause it, but I'll test it out 
to see if that does what we want... I'll get back to you guys soon. :)

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601095#comment-17601095
 ] 

Luke Chen commented on KAFKA-14196:
---

[~pnee] 
 # I don't think we need to pause the fetch if the previous async commit 
(autocommit) hasn't yet go through, for the normal situation (not rebalancing)? 
Because as long as we are sending out the commit, I think we could tentatively 
assume the acked data has been committed. Am I right?

 --> correct. for normal situation (not rebalancing), we don't pause anything
 # I think we only need to pause the fetch, if there's a rebalance process 
taking place, because it only waits for the current in-flight commit, then 
revoke the partition.  Once the partition is revoked, I don't think we can do 
anything about the uncommitted data.

--> correct.

 

[~guozhang] , thanks for the suggestion.

> I suggested we add a TODO there indicating it's sub-optimal but is allowed 
> under at least once semantics.

Agree!

> we still commit async, while at the same time mark those revoking partitions 
> as "not retrievable" to not return any more data

Sounds good to me!

 

>From Philip:

> And because this regression was caused by the "rebalancing internal state" 
> (pardon me if the words use is confusing), do you think it might be worth 
> exposing the rebalance internal states?

I think we can just `pause` the SubscriptionState of the partitions that we're 
going to revoked. From the javadoc:
{code:java}
/**
 * Suspend fetching from the requested partitions. Future calls to {@link 
#poll(Duration)} will not return
 * any records from these partitions until they have been resumed using {@link 
#resume(Collection)}.
 * Note that this method does not affect partition subscription. In particular, 
it does not cause a group
 * rebalance when automatic assignment is used.
 *
 * Note: Rebalance will not preserve the pause/resume state.
 * @param partitions The partitions which should be paused
 * @throws IllegalStateException if any of the provided partitions are not 
currently assigned to this consumer
 */
@Override
public void pause(Collection partitions) {{code}
 

I think that's what we want, right?

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-06 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600985#comment-17600985
 ] 

Guozhang Wang commented on KAFKA-14196:
---

[~pnee] Thanks for reporting this. While reviewing KAFKA-13310 I have realized 
this, but as Luke said this is not a new regression (we would potentially have 
duplicates even before this, since as we commit sync, and if the commit fails, 
we still log a warning and move forward with the revocation, in which case we 
would also have duplicates), I suggested we add a TODO there indicating it's 
sub-optimal but is allowed under at least once semantics.

I think in the long run, as we move the rebalancing related procedure all to 
the background thread, this would no longer be an issue since between the time 
background thread received an response telling it to start rebalancing (of 
which, the first step is to potentially revoking partitions in 
`onJoinPrepare`), and the time after the auto commit has been completed, the 
background thread could simply mark those revoking partitions as "not 
retrievable" so that calling thread's `poll` calls would not return any more 
data for those partitions. Right?

If that's the case, then we only need to consider before that comes, what we 
should do with this. Like I said, the behaviors before are 1) we commit sync, 
and even if it fails we still move forward, which would cause duplicates, or 2) 
we commit async so that `poll` timeout could be respected, but we would still 
potentially return data for those revoking partitions. I'm thinking what about 
just taking the middle ground: we still commit async, while at the same time 
mark those revoking partitions as "not retrievable" to not return any more 
data, note this would still not forbid duplicates completely, but would 
basically take us to where we were in the likelihood of the duplicates. And 
then we rely on the threading remodeling (there's a WIP page that Philip would 
be sending out soon) to completely resolve this issue.

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-06 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600897#comment-17600897
 ] 

Philip Nee commented on KAFKA-14196:


Thanks Luke, per your suggestion, could you elaborate more about the reason to 
terminate the poll?

I've got a few questions to clarify here:
 # I don't think we need to pause the fetch if the previous async commit (of 
course autocommit) hasn't yet go through, for the normal situation (not 
rebalancing)? Because as long as we are sending out the commit, I think we 
could tentatively assume the acked data has been committed. Am I right?
 # I think we only need to pause the fetch, if there's a rebalance process 
taking place, because it only waits for the current in-flight commit, then 
revoke the partition.  Once the partition is revoked, I don't think we can do 
anything about the uncommitted data.

And because this regression was caused by the "rebalancing internal state" 
(pardon me if the words use is confusing), I think it might be worth to 
represent the rebalancing progress using some states (or a state) and exposing 
the state to the coordinator, as such the fetcher can therefore look into the 
state and decide whether to proceed with the fetch or not. 

So here is my proposal:
 # When onJoinPrepare is invoked, we set the state variable 
onJoinPrepareCommitAsync = true, and let the async commit to happen
 # in Fetcher - if(coordinator.isOnJoinPrepareCommitAsync()) : skip fetching
 # If async commit complete onSuccess, set the state var 
onJoinPrepareCommitAsync = false
 # If async commit failed onJoinPrepareCommitAsync = false, throw new 
KafkaException()
 # If async commit timer expired, throw new TimeoutException()
 # Also set onJoinPrepareCommitAsync on close etc.

We will need to amend the current javaDoc for this though, in order for the 
user to handle these exceptions.

WDYT?

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600675#comment-17600675
 ] 

Luke Chen commented on KAFKA-14196:
---

[~pnee] , thanks for the analysis. Yes, we forgot about during the following 
poll, the offset might advance while we're waiting for the old async offset 
commit completion. 

Actually, while checking the code, even if we don't do the change for 
KAFKA-14024,and KAFKA-13310, (that is, changing sync commit to async commit) 
the issue will still happen, just not that easily. The issue is, in the 
consumer#poll process, we do onJoinPrepare (i.e. commit the offset), and then 
fetch new records. I'm thinking we should have a way to terminate poll process 
to avoid it keep fetching new records and return.

 

Maybe in `KafkaConsumer#updateAssignmentMetadataIfNeeded`, we passed in a 
parameter to allow the `onJoinPrepare` method to change the flag to notify if 
we need to terminate the poll and not to fetch records. WDYT?

cc [~guozhang] [~aiquestion]

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> 

[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-05 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600552#comment-17600552
 ] 

Philip Nee commented on KAFKA-14196:


Kind of originated from this commit: 
https://github.com/apache/kafka/pull/12349/files

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0, 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)