[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079011#comment-17079011 ]
ASF GitHub Bot commented on KAFKA-9841: --------------------------------------- Lucent-Wong commented on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old… URL: https://github.com/apache/kafka/pull/8453 KAFKA-9841: Connector and Task duplicated when a worker join with old generation assignment Detect duplicated connectors/tasks in IncrementalCooperativeAssignor then revoke all of them to trigger next round reassignment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connector and Task duplicated when a worker join with old generation > assignment > ------------------------------------------------------------------------------- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.4.0, 2.4.1 > Reporter: Yu Wang > Priority: Major > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:32:35,489] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Group coordinator xxxxxx:9092 (id: > 2147483631 rack: null) is unavailable or invalid, will attempt rediscovery > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:35,590] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Discovered group coordinator > xxxxxx:9092 (id: 2147483631 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} Committing > offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} flushing 86 > outstanding messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Attempt to heartbeat failed since > group is rebalancing > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > Worker4: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], > taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 5: > Worker1: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker4: > [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], > taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)