[ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16147.
---------------------------------
    Fix Version/s: 3.8.0
       Resolution: Fixed

> Partition is assigned to two members at the same time
> -----------------------------------------------------
>
>                 Key: KAFKA-16147
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16147
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Emanuele Sabellico
>            Assignee: David Jacot
>            Priority: Major
>             Fix For: 3.8.0
>
>         Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, 
> server.properties, server1.properties, server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are 
> pending 
> {noformat}
> %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment ""{noformat}
> C_5 FINISHES REVOCATION
> {noformat}
> %7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group 
> "rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> 
> steady (state up){noformat}
> C_5 ACKS REVOCATION, RECEIVES T2-P0,T2-P1,T2-P2
>  
> {noformat}
> %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "", subscribe topics "NULL"
> [2024-01-16 12:10:52,618] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}) to 
> CurrentAssignment(memberEpoch=16, previousMemberEpoch=14, 
> targetMemberEpoch=16, state=assigning, 
> assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2]}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[8, 9]}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> %7|1705403452.620|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(EnZMikZURKiUoxZf0rozaA)[0], (null)(EnZMikZURKiUoxZf0rozaA)[1], 
> (null)(EnZMikZURKiUoxZf0rozaA)[2]"
> %7|1705403452.621|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 16, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd550f20623daba04c_0113u_2(EnZMikZURKiUoxZf0rozaA)[0], 
> rdkafkatest_rnd550f20623daba04c_0113u_2(EnZMikZURKiUoxZf0rozaA)[1], 
> rdkafkatest_rnd550f20623daba04c_0113u_2(EnZMikZURKiUoxZf0rozaA)[2]", 
> subscribe topics "NULL"
> %7|1705403452.622|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(EnZMikZURKiUoxZf0rozaA)[0], (null)(EnZMikZURKiUoxZf0rozaA)[1], 
> (null)(EnZMikZURKiUoxZf0rozaA)[2]"
> {noformat}
> LATER C_6 SUBSCRIBES TO T2,T1, IT RECEIVES T1-P9,T1-P13,T1-P15 AND T2-P0. 
> T2-P0 is assigned without being revoked from C_5.
> {noformat}
> %7|1705403453.612|HEARTBEAT|C_6#consumer-9| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "ss189cxiQ4q_cyNB1aNrfA", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:53,612] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member ss189cxiQ4q_cyNB1aNrfA updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:53,613] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Bumped group epoch to 17. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:53,614] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Computed a new target assignment for 
> epoch 17 with 'uniform' assignor: {SWlBlUvBQpqdl93hD6SJOA=Assignment(error=0, 
> partitions={IKXGrFR1Rv-Qes7Ummas6A=[2, 4, 5]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), RaTCu6RXQH-FiSl95iZzdw=Assignment(error=0, 
> partitions={EnZMikZURKiUoxZf0rozaA=[1, 2, 8, 9, 10]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), mhzGxxF2R-Oa3BQGotzx7A=Assignment(error=0, 
> partitions={EnZMikZURKiUoxZf0rozaA=[3, 4, 5, 6, 7]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), i7qqdVw4S0KZR5J5Z0XvoQ=Assignment(error=0, 
> partitions={IKXGrFR1Rv-Qes7Ummas6A=[1, 3, 7]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), d6cbfWsfTt-7XdYU23MNaw=Assignment(error=0, 
> partitions={IKXGrFR1Rv-Qes7Ummas6A=[10, 11, 14]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), ss189cxiQ4q_cyNB1aNrfA=Assignment(error=0, 
> partitions={IKXGrFR1Rv-Qes7Ummas6A=[8, 13, 15], EnZMikZURKiUoxZf0rozaA=[0]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), 2vIQ0F6NRlOd9RgnVKtuDg=Assignment(error=0, 
> partitions={IKXGrFR1Rv-Qes7Ummas6A=[0, 6, 9, 12]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0])), pOvUx45IT-CJDvBQlfvVbQ=Assignment(error=0, 
> partitions={EnZMikZURKiUoxZf0rozaA=[11, 12, 13, 14, 15]}, 
> metadata=VersionedMetadata(version=0, metadata=java.nio.HeapByteBuffer[pos=0 
> lim=0 cap=0]))}. (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:53,614] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member ss189cxiQ4q_cyNB1aNrfA 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=9, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[13, 15]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=17, previousMemberEpoch=14, 
> targetMemberEpoch=17, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[8, 13, 15], 
> EnZMikZURKiUoxZf0rozaA=[0]}, partitionsPendingRevocation={}, 
> partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [0m%7|1705403453.616|HEARTBEAT|C_6#consumer-9| [thrd:main]: 
> GroupCoordinator/1: Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[8], (null)(IKXGrFR1Rv+Qes7Ummas6A)[13], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[15], 
> (null)(EnZMikZURKiUoxZf0rozaA)[0]"{noformat}
> I also see in consecutive broker logs that the same partition T2-P0, but also 
> T2-P1 and T2-p2, is assigned to C_5 (RaTCu6RXQH-FiSl95iZzdw) and pending 
> revocation from a third one (member id mhzGxxF2R-Oa3BQGotzx7A)
> {noformat}
> [2024-01-16 12:10:52,618] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}) to 
> CurrentAssignment(memberEpoch=16, previousMemberEpoch=14, 
> targetMemberEpoch=16, state=assigning, 
> assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2]}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[8, 9]}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,620] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member mhzGxxF2R-Oa3BQGotzx7A 
> transitioned from CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, 
> targetMemberEpoch=15, state=stable, 
> assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, 
> targetMemberEpoch=16, state=revoking, 
> assignedPartitions={EnZMikZURKiUoxZf0rozaA=[3, 4, 5, 6, 7]}, 
> partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2]}, 
> partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
> NOTE: error handling in my current branch is disabled because it needs to be 
> integrated, there is an assert that fails the tests if any error happens, so 
> effects of retriable errors can be excluded.
> The coordinator was broker1.
> I attach the broker properties too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to