[ https://issues.apache.org/jira/browse/KAFKA-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976648#comment-16976648 ]
Timur Rubeko edited comment on KAFKA-9184 at 11/18/19 3:50 PM: --------------------------------------------------------------- Hello. SO question author here. Following is an example of a sequence of events that typically leads to the redundant task creation. Set-up: three workers and three connectors. Relevant logs: *Worker A*: {code:java} [2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source], taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 640 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} *Worker B*: {code:java} [2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} *Worker C*: {code:java} [2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} My interpretation of these events: - Generation 639: tasks are evenly distributed between workers. - Generation 640: workers B and C are missing from generation 640 (presumably because they fail to heartbeat - `Attempt to heartbeat failed for since member id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only worker A is left in generation 640 and it is assigned all of the tasks, including those that were previously assigned to workers B and C (`hdfs-sink-0` and `another-hdfs-sink-0` in this example). - Generation 641: this generation is created just 3 seconds after the generation 640. Workers B and C are *not releasing* the tasks that were reassigned to worker A, neither worker A releases those tasks. At this point these tasks are effectively "duplicated". - Subsequent generations: further rebalancing does not fix the issue - redundant tasks are kept (generation 642 is given in these logs as an example). The only remedy is to restart *all* workers. In my tests restarting one worker only didn't fix the issue (after restart it got assigned same tasks back, even though they are also present on another worker). I have noticed that in all worker logs workers are logged as "Worker clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same client id) may be an issue. However, setting different client ids explicitly doesn't fix the issue. (According to the documentation worker IP makes part of the worker identification anyway, so it makes sense that explicit client ids had no effect). was (Author: trubeko): Hello. SO question author here. Following is an example of a sequence of events that typically leads to the redundant task creation. Set-up: three workers and 3 connectors. Relevant logs: *Worker A*: {code:java} [2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source], taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 640 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} *Worker B*: {code:java} [2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} *Worker C*: {code:java} [2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 639 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947) [2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 641 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) [2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, groupId=ingest-sources-cluster] Joined group at generation 642 and got assignment: Assignment{error=0, leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397) {code} My interpretation of these events: - Generation 639: tasks are evenly distributed between workers. - Generation 640: workers B and C are missing from generation 640 (presumably because they fail to heartbeat - `Attempt to heartbeat failed for since member id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only worker A is left in generation 640 and it is assigned all of the tasks, including those that were previously assigned to workers B and C (`hdfs-sink-0` and `another-hdfs-sink-0` in this example). - Generation 641: this generation is created just 3 seconds after the generation 640. Workers B and C are *not releasing* the tasks that were reassigned to worker A, neither worker A releases those tasks. At this point these tasks are effectively "duplicated". - Subsequent generations: further rebalancing does not fix the issue - redundant tasks are kept (generation 642 is given in these logs as an example). The only remedy is to restart *all* workers. In my tests restarting one worker only didn't fix the issue (after restart it got assigned same tasks back, even though they are also present on another worker). I have noticed that in all worker logs workers are logged as "Worker clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same client id) may be an issue. However, setting different client ids explictly doesn't fix the issue. (According to the documentation worker IP makes part of the worker identification anyway, so it makes sense that exlicit client ids had no effect). > Redundant task creation after worker fails to join a specific group generation > ------------------------------------------------------------------------------ > > Key: KAFKA-9184 > URL: https://issues.apache.org/jira/browse/KAFKA-9184 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.3.2 > Reporter: Konstantine Karantasis > Assignee: Konstantine Karantasis > Priority: Blocker > Fix For: 2.3.2 > > > First reported here: > https://stackoverflow.com/questions/58631092/kafka-connect-assigns-same-task-to-multiple-workers > There seems to be an issue with task reassignment when a worker rejoins after > an unsuccessful join request. The worker seems to be outside the group for a > generation but when it joins again the same task is running in more than one > worker -- This message was sent by Atlassian Jira (v8.3.4#803005)