Thanks for the clarification. If this is an actual problem that you're
encountering and need a solution to then since the task allocation is not
deterministic it sounds like you need to deploy separate worker clusters
based on the workload patterns that you are seeing and machine resources
available.


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Wed, 20 May 2020 at 16:39, Deepak Raghav <deepakragha...@gmail.com>
wrote:

> Hi Robin
>
> Replying to your query i.e
>
> One thing I'd ask at this point is though if it makes any difference where
> the tasks execute?
>
> It actually makes difference to us, we have 16 connectors and as I stated
> tasks division earlier, first 8 connector' task are assigned to first
> worker process and another connector's task to another worker process and
> just to mention that these 16 connectors are sink connectors. Each sink
> connector consumes message from different topic.There may be a case when
> messages are coming only for first 8 connector's topic and because all the
> tasks of these connectors are assigned to First Worker, load would be high
> on it and another set of connectors in another worker would be idle.
>
> Instead, if the task would have been divided evenly then this case would
> have been avoided. Because tasks of each connector would be present in both
> workers process like below :
>
> *W1*                       *W2*
>  C1T1                    C1T2
>  C2T2                    C2T2
>
> I hope, I gave your answer,
>
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > OK, I understand better now.
> >
> > You can read more about the guts of the rebalancing protocol that Kafka
> > Connect uses as of Apache Kafka 2.3 an onwards here:
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> >
> > One thing I'd ask at this point is though if it makes any difference
> where
> > the tasks execute? The point of a cluster is that Kafka Connect manages
> the
> > workload allocation. If you need workload separation and
> > guaranteed execution locality I would suggest separate Kafka Connect
> > distributed clusters.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 10:24, Deepak Raghav <deepakragha...@gmail.com>
> > wrote:
> >
> > > Hi Robin
> > >
> > > Thanks for your reply.
> > >
> > > We are having two worker on different IP. The example which I gave you
> it
> > > was just a example. We are using kafka version 2.3.1.
> > >
> > > Let me tell you again with a simple example.
> > >
> > > Suppose, we have two EC2 node, N1 and N2 having worker process W1 and
> W2
> > > running in distribute mode with groupId i.e in same cluster and two
> > > connectors with having two task each i.e
> > >
> > > Node N1: W1 is running
> > > Node N2 : W2 is running
> > >
> > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : C2T2
> > >
> > > Now Suppose If both W1 and W2 worker process are running  and I
> register
> > > Connector C1 and C2 one after another i.e sequentially, on any of the
> > > worker process, the tasks division between the worker
> > > node are happening like below, which is expected.
> > >
> > > *W1*                       *W2*
> > > C1T1                    C1T2
> > > C2T2                    C2T2
> > >
> > > Now, suppose I stop one worker process e.g W2 and start after some
> time,
> > > the tasks division is changed like below i.e first connector's task
> move
> > to
> > > W1 and second connector's task move to W2
> > >
> > > *W1*                       *W2*
> > > C1T1                    C2T1
> > > C1T2                    C2T2
> > >
> > >
> > > Please let me know, If it is understandable or not.
> > >
> > > Note : Actually, In production, we are gonna have 16 connectors having
> 10
> > > task each and two worker node. With above scenario, first 8
> connectors's
> > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > expected.
> > >
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > wrote:
> > >
> > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > that correct? Normally you'd run one worker per machine unless there
> > was
> > > a
> > > > particular reason otherwise.
> > > > What version of Apache Kafka are you using?
> > > > I'm not clear from your question if the distribution of tasks is
> > > > presenting a problem to you (if so please describe why), or if you're
> > > just
> > > > interested in the theory behind the rebalancing protocol?
> > > >
> > > >
> > > > --
> > > >
> > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> > @rmoff
> > > >
> > > >
> > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> deepakragha...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > Please, can anybody help me with this?
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > > >
> > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > deepakragha...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Team
> > > > > >
> > > > > > We have two worker node in a cluster and 2 connector with having
> 10
> > > > tasks
> > > > > > each.
> > > > > >
> > > > > > Now, suppose if we have two kafka connect process W1(Port 8080)
> and
> > > > > > W2(Port 8078) started already in distribute mode and then
> register
> > > the
> > > > > > connectors, task of one connector i.e 10 tasks are divided
> equally
> > > > > between
> > > > > > two worker i.e first task of A connector to W1 worker node and
> sec
> > > task
> > > > > of
> > > > > > A connector to W2 worker node, similarly for first task of B
> > > connector,
> > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > >
> > > > > > e.g
> > > > > > *#First Connector : *
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > >
> > > > > > *#Sec connector*
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > > But I have seen a strange behavior, when I just shutdown W2
> worker
> > > node
> > > > > > and start it again task are divided but in diff way i.e all the
> > tasks
> > > > of
> > > > > A
> > > > > > connector will get into W1 node and tasks of B Connector into W2
> > > node.
> > > > > >
> > > > > > Can you please have a look for this.
> > > > > >
> > > > > > *#First Connector*
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > > *#Second Connector *:
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to