The KIP for the current rebalancing protocol is probably a good reference: https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
-- Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff On Tue, 26 May 2020 at 14:25, Deepak Raghav <deepakragha...@gmail.com> wrote: > Hi Robin > > Thanks for the clarification. > > As you suggested, that task allocation between the workers is > nondeterministic. I have shared the same information within in my team but > there are some other parties, with whom I need to share this information as > explanation for the issue raised by them and I cannot show this mail as a > reference. > > It would be very great if you please share any link/discussion reference > regarding the same. > > Regards and Thanks > Deepak Raghav > > > > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io> wrote: > > > I don't think you're right to assert that this is "expected behaviour": > > > > > the tasks are divided in below pattern when they are first time > > registered > > > > Kafka Connect task allocation is non-determanistic. > > > > I'm still not clear if you're solving for a theoretical problem or an > > actual one. If this is an actual problem that you're encountering and > need > > a solution to then since the task allocation is not deterministic it > sounds > > like you need to deploy separate worker clusters based on the workload > > patterns that you are seeing and machine resources available. > > > > > > -- > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > > > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakragha...@gmail.com> > > wrote: > > > > > Hi Robin > > > > > > I had gone though the link you provided, It is not helpful in my case. > > > Apart from this, *I am not getting why the tasks are divided in *below > > > pattern* when they are *first time registered*, which is expected > > behavior. > > > I*s there any parameter which we can pass in worker property file which > > > handle the task assignment strategy like we have range assigner or > round > > > robin in consumer-group ? > > > > > > connector rest status api result after first registration : > > > > > > { > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > "connector": { > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > }, > > > "tasks": [ > > > { > > > "id": 0, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > }, > > > { > > > "id": 1, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > } > > > ], > > > "type": "sink" > > > } > > > > > > and > > > > > > { > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > "connector": { > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > }, > > > "tasks": [ > > > { > > > "id": 0, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > }, > > > { > > > "id": 1, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > } > > > ], > > > "type": "sink" > > > } > > > > > > > > > But when I stop the second worker process and wait for > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the > > > process again. Result is different. > > > > > > { > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > "connector": { > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > }, > > > "tasks": [ > > > { > > > "id": 0, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > }, > > > { > > > "id": 1, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.5:*8080*" > > > } > > > ], > > > "type": "sink" > > > } > > > > > > and > > > > > > { > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > "connector": { > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > }, > > > "tasks": [ > > > { > > > "id": 0, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > }, > > > { > > > "id": 1, > > > "state": "RUNNING", > > > "worker_id": "10.0.0.4:*8078*" > > > } > > > ], > > > "type": "sink" > > > } > > > > > > Regards and Thanks > > > Deepak Raghav > > > > > > > > > > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> > > wrote: > > > > > > > Thanks for the clarification. If this is an actual problem that > you're > > > > encountering and need a solution to then since the task allocation is > > not > > > > deterministic it sounds like you need to deploy separate worker > > clusters > > > > based on the workload patterns that you are seeing and machine > > resources > > > > available. > > > > > > > > > > > > -- > > > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | > > @rmoff > > > > > > > > > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav < > deepakragha...@gmail.com> > > > > wrote: > > > > > > > > > Hi Robin > > > > > > > > > > Replying to your query i.e > > > > > > > > > > One thing I'd ask at this point is though if it makes any > difference > > > > where > > > > > the tasks execute? > > > > > > > > > > It actually makes difference to us, we have 16 connectors and as I > > > stated > > > > > tasks division earlier, first 8 connector' task are assigned to > first > > > > > worker process and another connector's task to another worker > process > > > and > > > > > just to mention that these 16 connectors are sink connectors. Each > > sink > > > > > connector consumes message from different topic.There may be a case > > > when > > > > > messages are coming only for first 8 connector's topic and because > > all > > > > the > > > > > tasks of these connectors are assigned to First Worker, load would > be > > > > high > > > > > on it and another set of connectors in another worker would be > idle. > > > > > > > > > > Instead, if the task would have been divided evenly then this case > > > would > > > > > have been avoided. Because tasks of each connector would be present > > in > > > > both > > > > > workers process like below : > > > > > > > > > > *W1* *W2* > > > > > C1T1 C1T2 > > > > > C2T2 C2T2 > > > > > > > > > > I hope, I gave your answer, > > > > > > > > > > > > > > > Regards and Thanks > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io> > > > > wrote: > > > > > > > > > > > OK, I understand better now. > > > > > > > > > > > > You can read more about the guts of the rebalancing protocol that > > > Kafka > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here: > > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/ > > > > > > > > > > > > One thing I'd ask at this point is though if it makes any > > difference > > > > > where > > > > > > the tasks execute? The point of a cluster is that Kafka Connect > > > manages > > > > > the > > > > > > workload allocation. If you need workload separation and > > > > > > guaranteed execution locality I would suggest separate Kafka > > Connect > > > > > > distributed clusters. > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | > > > > @rmoff > > > > > > > > > > > > > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav < > > > deepakragha...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Robin > > > > > > > > > > > > > > Thanks for your reply. > > > > > > > > > > > > > > We are having two worker on different IP. The example which I > > gave > > > > you > > > > > it > > > > > > > was just a example. We are using kafka version 2.3.1. > > > > > > > > > > > > > > Let me tell you again with a simple example. > > > > > > > > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker process > W1 > > > and > > > > > W2 > > > > > > > running in distribute mode with groupId i.e in same cluster and > > two > > > > > > > connectors with having two task each i.e > > > > > > > > > > > > > > Node N1: W1 is running > > > > > > > Node N2 : W2 is running > > > > > > > > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id > : > > > C1T2 > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with > id : > > > > C2T2 > > > > > > > > > > > > > > Now Suppose If both W1 and W2 worker process are running and I > > > > > register > > > > > > > Connector C1 and C2 one after another i.e sequentially, on any > of > > > the > > > > > > > worker process, the tasks division between the worker > > > > > > > node are happening like below, which is expected. > > > > > > > > > > > > > > *W1* *W2* > > > > > > > C1T1 C1T2 > > > > > > > C2T2 C2T2 > > > > > > > > > > > > > > Now, suppose I stop one worker process e.g W2 and start after > > some > > > > > time, > > > > > > > the tasks division is changed like below i.e first connector's > > task > > > > > move > > > > > > to > > > > > > > W1 and second connector's task move to W2 > > > > > > > > > > > > > > *W1* *W2* > > > > > > > C1T1 C2T1 > > > > > > > C1T2 C2T2 > > > > > > > > > > > > > > > > > > > > > Please let me know, If it is understandable or not. > > > > > > > > > > > > > > Note : Actually, In production, we are gonna have 16 connectors > > > > having > > > > > 10 > > > > > > > task each and two worker node. With above scenario, first 8 > > > > > connectors's > > > > > > > task move to W1 and next 8 connector' task move to W2, Which is > > not > > > > > > > expected. > > > > > > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt < > > ro...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > > > So you're running two workers on the same machine (10.0.0.4), > > is > > > > > > > > that correct? Normally you'd run one worker per machine > unless > > > > there > > > > > > was > > > > > > > a > > > > > > > > particular reason otherwise. > > > > > > > > What version of Apache Kafka are you using? > > > > > > > > I'm not clear from your question if the distribution of tasks > > is > > > > > > > > presenting a problem to you (if so please describe why), or > if > > > > you're > > > > > > > just > > > > > > > > interested in the theory behind the rebalancing protocol? > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Robin Moffatt | Senior Developer Advocate | > ro...@confluent.io > > | > > > > > > @rmoff > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav < > > > > > deepakragha...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > > > > > Please, can anybody help me with this? > > > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav < > > > > > > > deepakragha...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Team > > > > > > > > > > > > > > > > > > > > We have two worker node in a cluster and 2 connector with > > > > having > > > > > 10 > > > > > > > > tasks > > > > > > > > > > each. > > > > > > > > > > > > > > > > > > > > Now, suppose if we have two kafka connect process W1(Port > > > 8080) > > > > > and > > > > > > > > > > W2(Port 8078) started already in distribute mode and then > > > > > register > > > > > > > the > > > > > > > > > > connectors, task of one connector i.e 10 tasks are > divided > > > > > equally > > > > > > > > > between > > > > > > > > > > two worker i.e first task of A connector to W1 worker > node > > > and > > > > > sec > > > > > > > task > > > > > > > > > of > > > > > > > > > > A connector to W2 worker node, similarly for first task > of > > B > > > > > > > connector, > > > > > > > > > > will go to W1 node and sec task of B connector go to W2 > > node. > > > > > > > > > > > > > > > > > > > > e.g > > > > > > > > > > *#First Connector : * > > > > > > > > > > { > > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > > > > > > > > "connector": { > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:*8080*" > > > > > > > > > > }, > > > > > > > > > > "tasks": [ > > > > > > > > > > { > > > > > > > > > > "id": 0, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:*8078*" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 1, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 2, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 3, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 4, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 5, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 6, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 7, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 8, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 9, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > } > > > > > > > > > > ], > > > > > > > > > > "type": "sink" > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *#Sec connector* > > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > > > > > > > > "connector": { > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > "tasks": [ > > > > > > > > > > { > > > > > > > > > > "id": 0, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 1, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 2, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 3, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 4, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 5, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 6, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 7, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 8, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 9, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > } > > > > > > > > > > ], > > > > > > > > > > "type": "sink" > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > But I have seen a strange behavior, when I just shutdown > W2 > > > > > worker > > > > > > > node > > > > > > > > > > and start it again task are divided but in diff way i.e > all > > > the > > > > > > tasks > > > > > > > > of > > > > > > > > > A > > > > > > > > > > connector will get into W1 node and tasks of B Connector > > into > > > > W2 > > > > > > > node. > > > > > > > > > > > > > > > > > > > > Can you please have a look for this. > > > > > > > > > > > > > > > > > > > > *#First Connector* > > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > > > > > > > > "connector": { > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > "tasks": [ > > > > > > > > > > { > > > > > > > > > > "id": 0, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 1, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 2, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 3, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 4, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 5, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 6, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 7, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 8, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 9, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > > } > > > > > > > > > > ], > > > > > > > > > > "type": "sink" > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > *#Second Connector *: > > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > > > > > > > > "connector": { > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > "tasks": [ > > > > > > > > > > { > > > > > > > > > > "id": 0, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 1, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 2, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 3, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 4, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 5, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 6, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 7, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 8, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > }, > > > > > > > > > > { > > > > > > > > > > "id": 9, > > > > > > > > > > "state": "RUNNING", > > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > > } > > > > > > > > > > ], > > > > > > > > > > "type": "sink" > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >