I don't think you're right to assert that this is "expected behaviour":

>  the tasks are divided in below pattern when they are first time
registered

Kafka Connect task allocation is non-determanistic.

I'm still not clear if you're solving for a theoretical problem or an
actual one. If this is an actual problem that you're encountering and need
a solution to then since the task allocation is not deterministic it sounds
like you need to deploy separate worker clusters based on the workload
patterns that you are seeing and machine resources available.


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakragha...@gmail.com>
wrote:

> Hi Robin
>
> I had gone though the link you provided, It is not helpful in my case.
> Apart from this, *I am not getting why the tasks are divided in *below
> pattern* when they are *first time registered*, which is expected behavior.
> I*s there any parameter which we can pass in worker property file which
> handle the task assignment strategy like we have range assigner or round
> robin in consumer-group ?
>
> connector rest status api result after first registration :
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.5:*8080*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
> and
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:*8078*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
>
> But when I stop the second worker process and wait for
> scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
> process again. Result is different.
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.5:*8080*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
> and
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:*8078*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     }
>   ],
>   "type": "sink"
> }
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > Thanks for the clarification. If this is an actual problem that you're
> > encountering and need a solution to then since the task allocation is not
> > deterministic it sounds like you need to deploy separate worker clusters
> > based on the workload patterns that you are seeing and machine resources
> > available.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 16:39, Deepak Raghav <deepakragha...@gmail.com>
> > wrote:
> >
> > > Hi Robin
> > >
> > > Replying to your query i.e
> > >
> > > One thing I'd ask at this point is though if it makes any difference
> > where
> > > the tasks execute?
> > >
> > > It actually makes difference to us, we have 16 connectors and as I
> stated
> > > tasks division earlier, first 8 connector' task are assigned to first
> > > worker process and another connector's task to another worker process
> and
> > > just to mention that these 16 connectors are sink connectors. Each sink
> > > connector consumes message from different topic.There may be a case
> when
> > > messages are coming only for first 8 connector's topic and because all
> > the
> > > tasks of these connectors are assigned to First Worker, load would be
> > high
> > > on it and another set of connectors in another worker would be idle.
> > >
> > > Instead, if the task would have been divided evenly then this case
> would
> > > have been avoided. Because tasks of each connector would be present in
> > both
> > > workers process like below :
> > >
> > > *W1*                       *W2*
> > >  C1T1                    C1T2
> > >  C2T2                    C2T2
> > >
> > > I hope, I gave your answer,
> > >
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> > wrote:
> > >
> > > > OK, I understand better now.
> > > >
> > > > You can read more about the guts of the rebalancing protocol that
> Kafka
> > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > >
> > > > One thing I'd ask at this point is though if it makes any difference
> > > where
> > > > the tasks execute? The point of a cluster is that Kafka Connect
> manages
> > > the
> > > > workload allocation. If you need workload separation and
> > > > guaranteed execution locality I would suggest separate Kafka Connect
> > > > distributed clusters.
> > > >
> > > >
> > > > --
> > > >
> > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> > @rmoff
> > > >
> > > >
> > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> deepakragha...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Robin
> > > > >
> > > > > Thanks for your reply.
> > > > >
> > > > > We are having two worker on different IP. The example which I gave
> > you
> > > it
> > > > > was just a example. We are using kafka version 2.3.1.
> > > > >
> > > > > Let me tell you again with a simple example.
> > > > >
> > > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1
> and
> > > W2
> > > > > running in distribute mode with groupId i.e in same cluster and two
> > > > > connectors with having two task each i.e
> > > > >
> > > > > Node N1: W1 is running
> > > > > Node N2 : W2 is running
> > > > >
> > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id :
> C1T2
> > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> > C2T2
> > > > >
> > > > > Now Suppose If both W1 and W2 worker process are running  and I
> > > register
> > > > > Connector C1 and C2 one after another i.e sequentially, on any of
> the
> > > > > worker process, the tasks division between the worker
> > > > > node are happening like below, which is expected.
> > > > >
> > > > > *W1*                       *W2*
> > > > > C1T1                    C1T2
> > > > > C2T2                    C2T2
> > > > >
> > > > > Now, suppose I stop one worker process e.g W2 and start after some
> > > time,
> > > > > the tasks division is changed like below i.e first connector's task
> > > move
> > > > to
> > > > > W1 and second connector's task move to W2
> > > > >
> > > > > *W1*                       *W2*
> > > > > C1T1                    C2T1
> > > > > C1T2                    C2T2
> > > > >
> > > > >
> > > > > Please let me know, If it is understandable or not.
> > > > >
> > > > > Note : Actually, In production, we are gonna have 16 connectors
> > having
> > > 10
> > > > > task each and two worker node. With above scenario, first 8
> > > connectors's
> > > > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > > > expected.
> > > > >
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > > > that correct? Normally you'd run one worker per machine unless
> > there
> > > > was
> > > > > a
> > > > > > particular reason otherwise.
> > > > > > What version of Apache Kafka are you using?
> > > > > > I'm not clear from your question if the distribution of tasks is
> > > > > > presenting a problem to you (if so please describe why), or if
> > you're
> > > > > just
> > > > > > interested in the theory behind the rebalancing protocol?
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> > > > @rmoff
> > > > > >
> > > > > >
> > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > deepakragha...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > Please, can anybody help me with this?
> > > > > > >
> > > > > > > Regards and Thanks
> > > > > > > Deepak Raghav
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > deepakragha...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Team
> > > > > > > >
> > > > > > > > We have two worker node in a cluster and 2 connector with
> > having
> > > 10
> > > > > > tasks
> > > > > > > > each.
> > > > > > > >
> > > > > > > > Now, suppose if we have two kafka connect process W1(Port
> 8080)
> > > and
> > > > > > > > W2(Port 8078) started already in distribute mode and then
> > > register
> > > > > the
> > > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > > equally
> > > > > > > between
> > > > > > > > two worker i.e first task of A connector to W1 worker node
> and
> > > sec
> > > > > task
> > > > > > > of
> > > > > > > > A connector to W2 worker node, similarly for first task of B
> > > > > connector,
> > > > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > > > >
> > > > > > > > e.g
> > > > > > > > *#First Connector : *
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > > > *#Sec connector*
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > > worker
> > > > > node
> > > > > > > > and start it again task are divided but in diff way i.e all
> the
> > > > tasks
> > > > > > of
> > > > > > > A
> > > > > > > > connector will get into W1 node and tasks of B Connector into
> > W2
> > > > > node.
> > > > > > > >
> > > > > > > > Can you please have a look for this.
> > > > > > > >
> > > > > > > > *#First Connector*
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > > *#Second Connector *:
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to