[ 
https://issues.apache.org/jira/browse/KAFKA-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976648#comment-16976648
 ] 

Timur Rubeko edited comment on KAFKA-9184 at 11/18/19 3:50 PM:
---------------------------------------------------------------

Hello. SO question author here.

Following is an example of a sequence of events that typically leads to the 
redundant task creation. Set-up: three workers and three connectors. Relevant 
logs:

 

*Worker A*:
{code:java}
[2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source], 
taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], 
delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 640 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
 

*Worker B*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
 

*Worker C*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
 

My interpretation of these events:
 - Generation 639: tasks are evenly distributed between workers.
 - Generation 640: workers B and C are missing from generation 640 (presumably 
because they fail to heartbeat - `Attempt to heartbeat failed for since member 
id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only 
worker A is left in generation 640 and it is assigned all of the tasks, 
including those that were previously assigned to workers B and C (`hdfs-sink-0` 
and `another-hdfs-sink-0` in this example).
 - Generation 641: this generation is created just 3 seconds after the 
generation 640. Workers B and C are *not releasing* the tasks that were 
reassigned to worker A, neither worker A releases those tasks. At this point 
these tasks are effectively "duplicated".
 - Subsequent generations: further rebalancing does not fix the issue - 
redundant tasks are kept (generation 642 is given in these logs as an example).

The only remedy is to restart *all* workers. In my tests restarting one worker 
only didn't fix the issue (after restart it got assigned same tasks back, even 
though they are also present on another worker).

I have noticed that in all worker logs workers are logged as "Worker 
clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same 
client id) may be an issue. However, setting different client ids explicitly 
doesn't fix the issue. (According to the documentation worker IP makes part of 
the worker identification anyway, so it makes sense that explicit client ids 
had no effect).


was (Author: trubeko):
Hello. SO question author here.

Following is an example of a sequence of events that typically leads to the 
redundant task creation. Set-up: three workers and 3 connectors. Relevant logs:

*Worker A*:
{code:java}
[2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source], 
taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], 
delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 640 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink, 
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0, 
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
*Worker B*:

 
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, 
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
 

*Worker C*:

 
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 639 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id 
connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 641 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1, 
groupId=ingest-sources-cluster] Joined group at generation 642 and got 
assignment: Assignment{error=0, 
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f', 
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink], 
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
 

My interpretation of these events:
 - Generation 639: tasks are evenly distributed between workers.
 - Generation 640: workers B and C are missing from generation 640 (presumably 
because they fail to heartbeat - `Attempt to heartbeat failed for since member 
id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only 
worker A is left in generation 640 and it is assigned all of the tasks, 
including those that were previously assigned to workers B and C (`hdfs-sink-0` 
and `another-hdfs-sink-0` in this example).
 - Generation 641: this generation is created just 3 seconds after the 
generation 640. Workers B and C are *not releasing* the tasks that were 
reassigned to worker A, neither worker A releases those tasks. At this point 
these tasks are effectively "duplicated".
 - Subsequent generations: further rebalancing does not fix the issue - 
redundant tasks are kept (generation 642 is given in these logs as an example).

The only remedy is to restart *all* workers. In my tests restarting one worker 
only didn't fix the issue (after restart it got assigned same tasks back, even 
though they are also present on another worker).

I have noticed that in all worker logs workers are logged as "Worker 
clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same 
client id) may be an issue. However, setting different client ids explictly 
doesn't fix the issue. (According to the documentation worker IP makes part of 
the worker identification anyway, so it makes sense that exlicit client ids had 
no effect).

> Redundant task creation after worker fails to join a specific group generation
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-9184
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9184
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.3.2
>            Reporter: Konstantine Karantasis
>            Assignee: Konstantine Karantasis
>            Priority: Blocker
>             Fix For: 2.3.2
>
>
> First reported here: 
> https://stackoverflow.com/questions/58631092/kafka-connect-assigns-same-task-to-multiple-workers
> There seems to be an issue with task reassignment when a worker rejoins after 
> an unsuccessful join request. The worker seems to be outside the group for a 
> generation but when it joins again the same task is running in more than one 
> worker



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to