Yep, I increase the number of tasks and the umber of nodes executing the
tasks, the above issues appears. It creates the topics ( so non issue
reaching the clusters ) but does not create  the MirrorSourceConnector/s .
The process does launch the MirrorCheckPointConnector, one for each node. I
took a thread dump and the only thread I see on poll is I think for the
config  org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:262)




On Thu, Oct 24, 2019 at 10:36 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I might have created a build from the trunk, rather then the 2.4 branch ,
> but will confirm.
>
> On Thu, Oct 24, 2019 at 4:44 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> The above may not be an issue as in  it just uses  the returned class
>> loader to resolve the Connector I think .  What is not obvious, why it does
>> not go ahead and consume ..
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,571] INFO refreshing
>> known target topics took 15 ms
>> (org.apache.kafka.connect.mirror.Scheduler:95)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Started
>> MirrorSourceConnector with 120 topic-partitions.
>> (org.apache.kafka.connect.mirror.MirrorSourceConnector:121)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Starting
>> MirrorSourceConnector took 160 ms.
>> (org.apache.kafka.connect.mirror.MirrorSourceConnector:122)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,577] INFO Finished
>> creating connector MirrorSourceConnector
>> (org.apache.kafka.connect.runtime.Worker:272)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,578] ERROR Plugin class
>> loader for connector:
>> 'org.apache.kafka.connect.mirror.MirrorSourceConnector' was not found.
>> Returning:
>> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621
>> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,579] INFO
>> SourceConnectorConfig values:
>>
>> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart
>>
>> [mm2-dev-749469cf68-vpm2l] connector.class =
>> org.apache.kafka.connect.mirror.MirrorSourceConnector
>>
>> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false
>>
>> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false
>>
>> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000
>>
>> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0
>>
>> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none
>>
>> [mm2-dev-749469cf68-vpm2l] header.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l] key.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector
>>
>> [mm2-dev-749469cf68-vpm2l] tasks.max = 48
>>
>> [mm2-dev-749469cf68-vpm2l] transforms = []
>>
>> [mm2-dev-749469cf68-vpm2l] value.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l]
>> (org.apache.kafka.connect.runtime.SourceConnectorConfig:347)
>>
>> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,580] INFO
>> EnrichedConnectorConfig values:
>>
>> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart
>>
>> [mm2-dev-749469cf68-vpm2l] connector.class =
>> org.apache.kafka.connect.mirror.MirrorSourceConnector
>>
>> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false
>>
>> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false
>>
>> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000
>>
>> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0
>>
>> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none
>>
>> [mm2-dev-749469cf68-vpm2l] header.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l] key.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector
>>
>> [mm2-dev-749469cf68-vpm2l] tasks.max = 48
>>
>> [mm2-dev-749469cf68-vpm2l] transforms = []
>>
>> [mm2-dev-749469cf68-vpm2l] value.converter = null
>>
>> [mm2-dev-749469cf68-vpm2l]
>> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
>>
>>
>>
>> And from there on nothing..
>>
>>
>>
>>
>> On Thu, Oct 24, 2019 at 3:02 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey Ryanne,
>>>
>>>            Seeing the below ERROR in the logs  and then, it seems the
>>> process does not consume ( it does not exit with any errors ) . And this is
>>> intermittent. As in do it enough times. that does relaunch :)  Is this
>>> something a known bug
>>>
>>>          [mm2-dev-58bf5df684-ln9k2] [2019-10-24 18:41:03,067] ERROR
>>> Plugin class loader for connector:
>>> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found.
>>> Returning:
>>> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621
>>> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Oct 21, 2019 at 5:16 PM Ryanne Dolan <ryannedo...@gmail.com>
>>> wrote:
>>>
>>>> Vishal, the number of tasks created per source->target herder is
>>>> determined
>>>> by both tasks.max and the total number of topic-partitions being
>>>> replicated. In order to use all 12 worker nodes, you'd need tasks.max
>>>> >= 12
>>>> and number of topic-partitions >= 12. From previous emails it sounds
>>>> like
>>>> you have a small number of topic-partitions total (i.e. a small number
>>>> of
>>>> topics with a small number of partitions per topic), so I'm guessing
>>>> that's
>>>> the reason you aren't seeing more tasks being created.
>>>>
>>>> Ryanne
>>>>
>>>> On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi <
>>>> vishal.santo...@gmail.com>
>>>> wrote:
>>>>
>>>> > Here is what I see
>>>> >
>>>> > * The max tasks are a a cap  on a Connector across the cluster. If
>>>> have 8
>>>> > VMs but 8 max tasks my assumption  that there would be 8 * 8 = 72 task
>>>> > threads was
>>>> > wring. The logs showed that the partitions were consumed by  8
>>>> threads on
>>>> > the 8 VMs ( 1 per VM ) which was highly un optimal.  When I scaled the
>>>> > VMs to 12, it did not matter, as the max tasks still prevented any
>>>> further
>>>> > distribution.
>>>> >
>>>> > *  If I cancel/resume the cluster with a max task of 48 ( keeping the
>>>> same
>>>> > job name and thus connector definition   the max tasks does not
>>>> change, as
>>>> > in
>>>> >  it seems to keep the same number of max task  threads limit ( as in
>>>> 8 )
>>>> >
>>>> > * I can bring down a VM and see the task migrate to a free VM but the
>>>> > overall count of task threads remain the same.
>>>> >
>>>> >
>>>> > In essence, the num of tasks is a cap on threads in the cluster per
>>>> > connector, A connector is a source->sink pair that spans a cluster.
>>>> Thus if
>>>> > we have a
>>>> > A->B DAG and  max tasks of 8, then there will be no more that 8 Source
>>>> > Tasks  ( threads ) no matter how big the cluster is, It thus makes
>>>> sense to
>>>> > over provision ( within limits of a single VM ) on the max tasks to
>>>> allow
>>>> > for adding more VMs for scale up.....
>>>> >
>>>> >
>>>> >
>>>> > On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com
>>>> > >
>>>> > wrote:
>>>> >
>>>> > > I misspoke
>>>> > >
>>>> > > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to
>>>> the the
>>>> > > 8  VMs. I then upscaled to 12 VMs and the tasks *have not *migrated
>>>> as I
>>>> > > would expect .
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi <
>>>> > vishal.santo...@gmail.com>
>>>> > > wrote:
>>>> > >
>>>> > >> OK, You will have to explain :)
>>>> > >>
>>>> > >> I had 12 VMs with 8 cpus and 8 max tasks.  I thought let me give a
>>>> CPU
>>>> > to
>>>> > >> each task, which I presumed is a java thread ( even though I know
>>>> the
>>>> > >> thread would be mostly ip bound ). . I saw the issue I pointed up.
>>>> > >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to
>>>> the the
>>>> > >> 8  VMs. I then upscaled to 12 VMs and the tasks migrated as I would
>>>> > expect
>>>> > >> .*
>>>> > >>
>>>> > >>  I know that a VM will have MirrorSourceConnector and
>>>> > >> MirrorHeartbeatConnector tasks up till  tasks.max.  So a few
>>>> questions
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> * When we say there are 48 max tasks, are we saying there are  48
>>>> > threads
>>>> > >> ( in fact 96, each for the 2 groups above,  worst case  + 2 ) ?
>>>> > >> * When we talk about Connector, are we talking about a JVM
>>>> process, as
>>>> > in
>>>> > >> a Connector is a JVM process ?
>>>> > >> * Why larger number of tasks.max help the spread  ?  As in I would
>>>> > assume
>>>> > >> there are up till 8 tasks ( or 16 )  per VM but how that should
>>>> not have
>>>> > >> prevented  re assignment  on a scale up ( as it clearly did ) ?
>>>> > >>
>>>> > >> The reason I ask is that I plan to run mm2 cluster on  k8s and I
>>>> want to
>>>> > >> make sure that I use the version of JVM that is more docker
>>>> friendly
>>>> > vis a
>>>> > >> vis, how many cpus it believes it has  and as explained here
>>>> > >>
>>>> >
>>>> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <
>>>> ryannedo...@gmail.com>
>>>> > >> wrote:
>>>> > >>
>>>> > >>> What is tasks.max? Consider bumping to something like 48 if you're
>>>> > >>> running
>>>> > >>> on a dozen nodes.
>>>> > >>>
>>>> > >>> Ryanne
>>>> > >>>
>>>> > >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi <
>>>> > vishal.santo...@gmail.com
>>>> > >>> >
>>>> > >>> wrote:
>>>> > >>>
>>>> > >>> > Hey Ryanne,
>>>> > >>> >
>>>> > >>> >
>>>> > >>> >             I see a definite issue. I am doing an intense test
>>>> and I
>>>> > >>> bring
>>>> > >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating
>>>> about
>>>> > 1200
>>>> > >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and
>>>> are
>>>> > >>> > staggered as they come up..I see a fraction of these nodes not
>>>> > >>> assigned any
>>>> > >>> > replication....There is plenty to go around. ( more then a
>>>> couple of
>>>> > >>> > thousand partitions ) .   is there something I am missing....
>>>> As in
>>>> > my
>>>> > >>> > current case 5 of the 12 VMs are idle..
>>>> > >>> >
>>>> > >>> > Vishal
>>>> > >>> >
>>>> > >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <
>>>> > >>> vishal.santo...@gmail.com
>>>> > >>> > >
>>>> > >>> > wrote:
>>>> > >>> >
>>>> > >>> > > Oh sorry a. COUNTER... is more like it....
>>>> > >>> > >
>>>> > >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <
>>>> > >>> vishal.santo...@gmail.com
>>>> > >>> > >
>>>> > >>> > > wrote:
>>>> > >>> > >
>>>> > >>> > >> Will do
>>>> > >>> > >>     One more thing the age/latency metrics seem to be
>>>> analogous as
>>>> > >>> in
>>>> > >>> > >> they seem to be calculated using similar routines.  I would
>>>> think
>>>> > a
>>>> > >>> > metric
>>>> > >>> > >> tracking
>>>> > >>> > >> the number of flush failures ( as a GAUGE )  given
>>>> > >>> > >> offset.flush.timeout.ms would be highly beneficial.
>>>> > >>> > >>
>>>> > >>> > >> Regards..
>>>> > >>> > >>
>>>> > >>> > >>
>>>> > >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <
>>>> > >>> ryannedo...@gmail.com>
>>>> > >>> > >> wrote:
>>>> > >>> > >>
>>>> > >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers"
>>>> > >>> earlier, as
>>>> > >>> > >>> the
>>>> > >>> > >>> consumer is not created by the worker, but the task.
>>>> > >>> > >>>
>>>> > >>> > >>> I suppose the put() could be changed to putIfAbsent() here
>>>> to
>>>> > >>> enable
>>>> > >>> > this
>>>> > >>> > >>> property to be changed. Maybe submit a PR?
>>>> > >>> > >>>
>>>> > >>> > >>> Ryanne
>>>> > >>> > >>>
>>>> > >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi <
>>>> > >>> > >>> vishal.santo...@gmail.com>
>>>> > >>> > >>> wrote:
>>>> > >>> > >>>
>>>> > >>> > >>> > Hmm  ( I did both )
>>>> > >>> > >>> >
>>>> > >>> > >>> > another->another_test.enabled = true
>>>> > >>> > >>> >
>>>> > >>> > >>> > another->another_test.topics = act_post
>>>> > >>> > >>> >
>>>> > >>> > >>> > another->another_test.emit.heartbeats.enabled = false
>>>> > >>> > >>> >
>>>> > >>> > >>> > another->another_test.consumer.auto.offset.reset = latest
>>>> > >>> > >>> >
>>>> > >>> > >>> > another->another_test.sync.topic.acls.enabled = false
>>>> > >>> > >>> >
>>>> > >>> > >>> > another.consumer.auto.offset.reset = latest
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8
>>>> instances,
>>>> > >>> this
>>>> > >>> > >>> topic
>>>> > >>> > >>> > has 4 partitions )
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> >  [2019-10-17 14:01:21,879] INFO ConsumerConfig values:
>>>> > >>> > >>> >
>>>> > >>> > >>> > allow.auto.create.topics = true
>>>> > >>> > >>> >
>>>> > >>> > >>> > auto.commit.interval.ms = 5000
>>>> > >>> > >>> >
>>>> > >>> > >>> > *auto.offset.reset* = earliest
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> > I am now using the 2.4 branch from kafka trunk
>>>> > >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> > This code change works and makes sense.. I think all other
>>>> > >>> settings
>>>> > >>> > >>> will be
>>>> > >>> > >>> > fine ( as can be overridden )  but for the 2 below..
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> > *---
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>>> > >>> > >>> >
>>>> > >>> > >>> > *+++
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>>> > >>> > >>> >
>>>> > >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig
>>>> extends
>>>> > >>> > >>> > AbstractConfig {
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> >
>>>> >
>>>> props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
>>>> > >>> > >>> >
>>>> > >>> > >>> >          props.put("enable.auto.commit", "false");
>>>> > >>> > >>> >
>>>> > >>> > >>> > -        props.put("auto.offset.reset", "earliest");
>>>> > >>> > >>> >
>>>> > >>> > >>> > +        props.put("auto.offset.reset", "latest");
>>>> > >>> > >>> >
>>>> > >>> > >>> >          return props;
>>>> > >>> > >>> >
>>>> > >>> > >>> >      }
>>>> > >>> > >>> >
>>>> > >>> > >>> >
>>>> > >>> > >>> > Regards.
>>>> > >>> > >>> >
>>>> > >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <
>>>> > >>> ryannedo...@gmail.com>
>>>> > >>> > >>> > wrote:
>>>> > >>> > >>> >
>>>> > >>> > >>> > > Vishal, you should be able to override the properties
>>>> passed
>>>> > >>> to the
>>>> > >>> > >>> > > internal workers using properties like
>>>> > >>> > >>> A->B.consumer.auto.offset.reset or
>>>> > >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file.
>>>> > >>> Certain
>>>> > >>> > >>> > top-level
>>>> > >>> > >>> > > properties like tasks.max are honored without the A->B
>>>> or A
>>>> > >>> prefix,
>>>> > >>> > >>> but
>>>> > >>> > >>> > > auto.offset.reset is not one of them.
>>>> > >>> > >>> > >
>>>> > >>> > >>> > > Ryanne
>>>> > >>> > >>> > >
>>>> > >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <
>>>> > >>> > >>> > vishal.santo...@gmail.com
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > wrote:
>>>> > >>> > >>> > >
>>>> > >>> > >>> > > > Hey Ryanne,
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > >     How do I override auto.offset.reset = latest for
>>>> > >>> consumers
>>>> > >>> > >>> through
>>>> > >>> > >>> > > > mm2.properties. I have tried straight up .
>>>> > auto.offset.reset
>>>> > >>> and
>>>> > >>> > >>> > > consumer.
>>>> > >>> > >>> > > > auto.offset.reset  but it defaults to earliest.. I do
>>>> have
>>>> > a
>>>> > >>> > query
>>>> > >>> > >>> in
>>>> > >>> > >>> > > > another thread but though you might know off hand..
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > > I would imagine there is some way in general of
>>>> overriding
>>>> > >>> > >>> consumer and
>>>> > >>> > >>> > > > producer configs through mm2.properties in MM2 ?
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > > Regards.
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <
>>>> > >>> > >>> > > vishal.santo...@gmail.com
>>>> > >>> > >>> > > > >
>>>> > >>> > >>> > > > wrote:
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > > > > Thank you so much for all your help.  Will keep you
>>>> > posted
>>>> > >>> on
>>>> > >>> > >>> tests I
>>>> > >>> > >>> > > > do..
>>>> > >>> > >>> > > > > I hope this is helpful to other folks too..
>>>> > >>> > >>> > > > >
>>>> > >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <
>>>> > >>> > >>> ryannedo...@gmail.com>
>>>> > >>> > >>> > > > > wrote:
>>>> > >>> > >>> > > > >
>>>> > >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as
>>>> > legacy
>>>> > >>> > >>> > > MirrorMaker.
>>>> > >>> > >>> > > > >> You
>>>> > >>> > >>> > > > >> can follow
>>>> > >>> https://issues.apache.org/jira/browse/KAFKA-6080
>>>> > >>> > for
>>>> > >>> > >>> > > updates
>>>> > >>> > >>> > > > >> on
>>>> > >>> > >>> > > > >> exactly-once semantics in Connect.
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > > >> Ryanne
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi <
>>>> > >>> > >>> > > > >> vishal.santo...@gmail.com>
>>>> > >>> > >>> > > > >> wrote:
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > > >> >   >> You are correct. I'm working on a KIP and
>>>> PoC to
>>>> > >>> > >>> introduce
>>>> > >>> > >>> > > > >> > transactions to
>>>> > >>> > >>> > > > >> > >> Connect for this exact purpose :)
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > That is awesome. Any time frame ?
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > In the mean time the SLA as of now
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > 1. It is conceivable that we flush the producer
>>>> to the
>>>> > >>> > target
>>>> > >>> > >>> > > cluster
>>>> > >>> > >>> > > > >> but
>>>> > >>> > >>> > > > >> > fail to offset commit. If there was a restart
>>>> before
>>>> > the
>>>> > >>> > next
>>>> > >>> > >>> > > > successful
>>>> > >>> > >>> > > > >> > offset commit, there will be  duplicates  and a
>>>> part
>>>> > of
>>>> > >>> data
>>>> > >>> > >>> is
>>>> > >>> > >>> > > > >> replayed (
>>>> > >>> > >>> > > > >> > at least once ) ?
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > 2. The same can be said about  partial flushes,
>>>> though
>>>> > >>> am
>>>> > >>> > not
>>>> > >>> > >>> sure
>>>> > >>> > >>> > > > about
>>>> > >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either
>>>> success
>>>> > >>> or a
>>>> > >>> > >>> > failure,
>>>> > >>> > >>> > > > and
>>>> > >>> > >>> > > > >> > nothing in between )
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > Thanks..
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <
>>>> > >>> > >>> > > ryannedo...@gmail.com>
>>>> > >>> > >>> > > > >> > wrote:
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making
>>>> progress.
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the
>>>> > producer
>>>> > >>> and
>>>> > >>> > >>> > setting
>>>> > >>> > >>> > > > the
>>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not
>>>> atomic  OR
>>>> > >>> do we
>>>> > >>> > >>> use
>>>> > >>> > >>> > > > >> > > > transactions here  ?
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC
>>>> to
>>>> > >>> introduce
>>>> > >>> > >>> > > > >> transactions
>>>> > >>> > >>> > > > >> > to
>>>> > >>> > >>> > > > >> > > Connect for this exact purpose :)
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz
>>>> num.tasks=4 ),
>>>> > >>> and I
>>>> > >>> > >>> have
>>>> > >>> > >>> > 2
>>>> > >>> > >>> > > > >> topics
>>>> > >>> > >>> > > > >> > > with
>>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as
>>>> in
>>>> > >>> there
>>>> > >>> > are
>>>> > >>> > >>> 4
>>>> > >>> > >>> > > > consumer
>>>> > >>> > >>> > > > >> > > groups
>>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ...
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > Some details here:
>>>> > >>> > >>> > > > >> > > - tasks.max controls the maximum number of
>>>> tasks
>>>> > >>> created
>>>> > >>> > per
>>>> > >>> > >>> > > > Connector
>>>> > >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and
>>>> > >>> > >>> > MirrorCheckpointConnector
>>>> > >>> > >>> > > > >> will
>>>> > >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but
>>>> > >>> > >>> > > > MirrorHeartbeatConnector
>>>> > >>> > >>> > > > >> > only
>>>> > >>> > >>> > > > >> > > ever creates a single task. Moreover, there
>>>> cannot
>>>> > be
>>>> > >>> more
>>>> > >>> > >>> tasks
>>>> > >>> > >>> > > > than
>>>> > >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or
>>>> > >>> consumer
>>>> > >>> > >>> groups
>>>> > >>> > >>> > > (for
>>>> > >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two
>>>> > topics
>>>> > >>> with
>>>> > >>> > >>> one
>>>> > >>> > >>> > > > >> partition
>>>> > >>> > >>> > > > >> > > each and 1 consumer group total, you'll have
>>>> two
>>>> > >>> > >>> > > > MirrorSourceConnector
>>>> > >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and
>>>> one
>>>> > >>> > >>> > > > >> > MirrorCheckpointConnector
>>>> > >>> > >>> > > > >> > > tasks, for a total of four. And that's in one
>>>> > >>> direction
>>>> > >>> > >>> only: if
>>>> > >>> > >>> > > you
>>>> > >>> > >>> > > > >> have
>>>> > >>> > >>> > > > >> > > multiple source->target herders enabled, each
>>>> will
>>>> > >>> create
>>>> > >>> > >>> tasks
>>>> > >>> > >>> > > > >> > > independently.
>>>> > >>> > >>> > > > >> > > - There are no consumer groups in MM2,
>>>> technically.
>>>> > >>> The
>>>> > >>> > >>> Connect
>>>> > >>> > >>> > > > >> framework
>>>> > >>> > >>> > > > >> > > uses the Coordinator API and internal topics to
>>>> > divide
>>>> > >>> > tasks
>>>> > >>> > >>> > among
>>>> > >>> > >>> > > > >> > workers
>>>> > >>> > >>> > > > >> > > -- not a consumer group per se. The MM2
>>>> connectors
>>>> > >>> use the
>>>> > >>> > >>> > > assign()
>>>> > >>> > >>> > > > >> API,
>>>> > >>> > >>> > > > >> > > not the subscribe() API, so there are no
>>>> consumer
>>>> > >>> groups
>>>> > >>> > >>> there
>>>> > >>> > >>> > > > >> either. In
>>>> > >>> > >>> > > > >> > > fact, they don't commit() either. This is
>>>> nice, as
>>>> > it
>>>> > >>> > >>> > eliminates a
>>>> > >>> > >>> > > > >> lot of
>>>> > >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker
>>>> has been
>>>> > >>> > plagued
>>>> > >>> > >>> > with.
>>>> > >>> > >>> > > > >> With
>>>> > >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of
>>>> > >>> workers
>>>> > >>> > >>> changes
>>>> > >>> > >>> > or
>>>> > >>> > >>> > > > >> when
>>>> > >>> > >>> > > > >> > the
>>>> > >>> > >>> > > > >> > > assignments change (e.g. new topics are
>>>> discovered).
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > Ryanne
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal
>>>> Santoshi <
>>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com>
>>>> > >>> > >>> > > > >> > > wrote:
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> > > > Hey Ryanne,
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >            The test was on topics that had a
>>>> 7 day
>>>> > >>> > >>> retention.
>>>> > >>> > >>> > > > Which
>>>> > >>> > >>> > > > >> > > > generally implies that the batch size for
>>>> flush is
>>>> > >>> > pretty
>>>> > >>> > >>> > high (
>>>> > >>> > >>> > > > >> till
>>>> > >>> > >>> > > > >> > the
>>>> > >>> > >>> > > > >> > > > consumption becomes current ). The
>>>> > >>> > >>> offset.flush.timeout.ms
>>>> > >>> > >>> > > > >> defaults
>>>> > >>> > >>> > > > >> > to
>>>> > >>> > >>> > > > >> > > 5
>>>> > >>> > >>> > > > >> > > > seconds and the code will not send in the
>>>> offsets
>>>> > >>> if the
>>>> > >>> > >>> flush
>>>> > >>> > >>> > > is
>>>> > >>> > >>> > > > >> not
>>>> > >>> > >>> > > > >> > > > complete. Increasing that time out did solve
>>>> the
>>>> > >>> "not
>>>> > >>> > >>> sending
>>>> > >>> > >>> > > the
>>>> > >>> > >>> > > > >> > offset
>>>> > >>> > >>> > > > >> > > to
>>>> > >>> > >>> > > > >> > > > topic" issue.
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) )
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing the
>>>> flushing the
>>>> > >>> > >>> producer and
>>>> > >>> > >>> > > > >> setting
>>>> > >>> > >>> > > > >> > > the
>>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not
>>>> atomic  OR
>>>> > >>> do we
>>>> > >>> > >>> use
>>>> > >>> > >>> > > > >> > > > transactions here  ?
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > 2. I see
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> WorkerSourceTask{id=MirrorHeartbeatConnector-0}
>>>> > >>> > flushing
>>>> > >>> > >>> > 956435
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorSourceConnector-1}
>>>> > >>> flushing
>>>> > >>> > >>> 356251
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> WorkerSourceTask{id=MirrorCheckpointConnector-2}
>>>> > >>> > >>> flushing 0
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> WorkerSourceTask{id=MirrorCheckpointConnector-3}
>>>> > >>> > >>> flushing 0
>>>> > >>> > >>> > > > >> > outstanding
>>>> > >>> > >>> > > > >> > > > messages
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz
>>>> num.tasks=4 ),
>>>> > >>> and I
>>>> > >>> > >>> have
>>>> > >>> > >>> > 2
>>>> > >>> > >>> > > > >> topics
>>>> > >>> > >>> > > > >> > > with
>>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as
>>>> in
>>>> > >>> there
>>>> > >>> > are
>>>> > >>> > >>> 4
>>>> > >>> > >>> > > > consumer
>>>> > >>> > >>> > > > >> > > groups
>>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ...
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > THANKS A LOT
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > Vishal.
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan
>>>> <
>>>> > >>> > >>> > > > ryannedo...@gmail.com
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >> > > > wrote:
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > > > > >  timed out
>>>> > >>> > >>> > > > >> > > > > while waiting for producer to flush
>>>> outstanding
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if
>>>> Connect
>>>> > was
>>>> > >>> > >>> unable to
>>>> > >>> > >>> > > > send
>>>> > >>> > >>> > > > >> > > records
>>>> > >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if
>>>> > >>> > >>> > min.in-sync.replicas
>>>> > >>> > >>> > > > were
>>>> > >>> > >>> > > > >> > > > > misconfigured. Given some data seems to
>>>> arrive,
>>>> > >>> it's
>>>> > >>> > >>> > possible
>>>> > >>> > >>> > > > that
>>>> > >>> > >>> > > > >> > > > > everything is configured correctly but
>>>> with too
>>>> > >>> much
>>>> > >>> > >>> latency
>>>> > >>> > >>> > > to
>>>> > >>> > >>> > > > >> > > > > successfully commit within the default
>>>> timeouts.
>>>> > >>> You
>>>> > >>> > may
>>>> > >>> > >>> > want
>>>> > >>> > >>> > > to
>>>> > >>> > >>> > > > >> > > increase
>>>> > >>> > >>> > > > >> > > > > the number of tasks substantially to
>>>> achieve
>>>> > more
>>>> > >>> > >>> > parallelism
>>>> > >>> > >>> > > > and
>>>> > >>> > >>> > > > >> > > > > throughput.
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > Ryanne
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal
>>>> Santoshi <
>>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > wrote:
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > > Aah no.. this is more to  it. Note sure
>>>> if
>>>> > >>> related
>>>> > >>> > to
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > above.
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > > Is timing out based on
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR
>>>> > >>> > >>> > > > >> > > > > >
>>>> WorkerSourceTask{id=MirrorSourceConnector-0}
>>>> > >>> Failed
>>>> > >>> > to
>>>> > >>> > >>> > > flush,
>>>> > >>> > >>> > > > >> timed
>>>> > >>> > >>> > > > >> > > out
>>>> > >>> > >>> > > > >> > > > > > while waiting for producer to flush
>>>> > outstanding
>>>> > >>> > 36478
>>>> > >>> > >>> > > messages
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal
>>>> > Santoshi
>>>> > >>> <
>>>> > >>> > >>> > > > >> > > > > vishal.santo...@gmail.com
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > wrote:
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > > > I think this might be it.. Could you
>>>> > confirm.
>>>> > >>> It
>>>> > >>> > >>> seems
>>>> > >>> > >>> > to
>>>> > >>> > >>> > > be
>>>> > >>> > >>> > > > >> on
>>>> > >>> > >>> > > > >> > the
>>>> > >>> > >>> > > > >> > > > > path
>>>> > >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure...
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR
>>>> Scheduler
>>>> > for
>>>> > >>> > >>> > > > >> > MirrorSourceConnector
>>>> > >>> > >>> > > > >> > > > > > caught
>>>> > >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing
>>>> topic
>>>> > >>> ACLs
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> (org.apache.kafka.connect.mirror.Scheduler:102)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >
>>>> java.util.concurrent.ExecutionException:
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException:
>>>> > >>> > >>> > > No
>>>> > >>> > >>> > > > >> > > > Authorizer
>>>> > >>> > >>> > > > >> > > > > > is
>>>> > >>> > >>> > > > >> > > > > > > configured on the broker
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >>
>>>> > >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > >
>>>> > >>> > >>>
>>>> > >>>
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >>
>>>> > >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >         at
>>>> > >>> java.lang.Thread.run(Thread.java:748)
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > > Caused by:
>>>> > >>> > >>> > > > >> > >
>>>> > >>> org.apache.kafka.common.errors.SecurityDisabledException:
>>>> > >>> > >>> > > > >> > > > No
>>>> > >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne
>>>> > Dolan
>>>> > >>> <
>>>> > >>> > >>> > > > >> > > ryannedo...@gmail.com
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > > > > > wrote:
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > > >> > I do not have a single record in the
>>>> > >>> offsets
>>>> > >>> > >>> topic
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are
>>>> > correct
>>>> > >>> > that
>>>> > >>> > >>> > > without
>>>> > >>> > >>> > > > >> > records
>>>> > >>> > >>> > > > >> > > > in
>>>> > >>> > >>> > > > >> > > > > > that
>>>> > >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from
>>>> EARLIEST. The
>>>> > >>> > offsets
>>>> > >>> > >>> > should
>>>> > >>> > >>> > > > be
>>>> > >>> > >>> > > > >> > > stored
>>>> > >>> > >>> > > > >> > > > > > >> periodically and whenever the
>>>> connectors
>>>> > >>> > gracefully
>>>> > >>> > >>> > > > shutdown
>>>> > >>> > >>> > > > >> or
>>>> > >>> > >>> > > > >> > > > > restart.
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have
>>>> > required
>>>> > >>> > ACLs
>>>> > >>> > >>> or
>>>> > >>> > >>> > > > >> something?
>>>> > >>> > >>> > > > >> > > > Also
>>>> > >>> > >>> > > > >> > > > > > >> note:
>>>> > >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to
>>>> have a
>>>> > >>> large
>>>> > >>> > >>> number
>>>> > >>> > >>> > of
>>>> > >>> > >>> > > > >> > > partitions
>>>> > >>> > >>> > > > >> > > > > and
>>>> > >>> > >>> > > > >> > > > > > >> to
>>>> > >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine
>>>> either
>>>> > >>> would
>>>> > >>> > >>> > prevent
>>>> > >>> > >>> > > > >> > commits
>>>> > >>> > >>> > > > >> > > > from
>>>> > >>> > >>> > > > >> > > > > > >> being sent.
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >> Ryanne
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM
>>>> Vishal
>>>> > >>> Santoshi
>>>> > >>> > <
>>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>>> > >>> > >>> > > > >> > > > > > >> wrote:
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >> > 2nd/restore issue  ( I think I need
>>>> to
>>>> > >>> solve
>>>> > >>> > the
>>>> > >>> > >>> > > offsets
>>>> > >>> > >>> > > > >> topic
>>>> > >>> > >>> > > > >> > > > issue
>>>> > >>> > >>> > > > >> > > > > > >> > before I go with the scale up and
>>>> down
>>>> > >>> issue )
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead
>>>> and
>>>> > >>> created
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > offsets
>>>> > >>> > >>> > > > >> > > topic.
>>>> > >>> > >>> > > > >> > > > > The
>>>> > >>> > >>> > > > >> > > > > > >> > status of the cluster  (
>>>> destination ) is
>>>> > >>> thus
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers
>>>> Spread
>>>> > >>> > %Brokers
>>>> > >>> > >>> Skew
>>>> > >>> > >>> > > > >> %Brokers
>>>> > >>> > >>> > > > >> > > > > Leader
>>>> > >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated
>>>> %Leader
>>>> > >>> > >>> SizeProducer
>>>> > >>> > >>> > > > >> > > > > > Message/SecSummed
>>>> > >>> > >>> > > > >> > > > > > >> > Recent Offsets
>>>> > >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_search_page
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
>>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_reach
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
>>>> > >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10
>>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>>> > >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> > <
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > You can see . that we have the  5 (
>>>> I
>>>> > >>> created
>>>> > >>> > >>> bot the
>>>> > >>> > >>> > > > >> offsets,
>>>> > >>> > >>> > > > >> > > to
>>>> > >>> > >>> > > > >> > > > be
>>>> > >>> > >>> > > > >> > > > > > >> safe
>>>> > >>> > >>> > > > >> > > > > > >> > for the below )
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers =
>>>> ......*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 ->
>>>> dr2*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics =
>>>> > >>> > >>> act_search_page|act_reach*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> *s8k->s8k_test.emit.heartbeats.enabled =
>>>> > >>> false*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> *s8k_test->s8k.emit.heartbeats.enabled =
>>>> > >>> false*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor
>>>> = 3*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *replication.factor = 3*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > *tasks.max = 4*
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not
>>>> have
>>>> > a
>>>> > >>> > single
>>>> > >>> > >>> > > record
>>>> > >>> > >>> > > > in
>>>> > >>> > >>> > > > >> > the
>>>> > >>> > >>> > > > >> > > > > > offsets
>>>> > >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ?   I would
>>>> > imagine
>>>> > >>> that
>>>> > >>> > >>> > > without a
>>>> > >>> > >>> > > > >> > > record,
>>>> > >>> > >>> > > > >> > > > > > there
>>>> > >>> > >>> > > > >> > > > > > >> is
>>>> > >>> > >>> > > > >> > > > > > >> > no way that a restore would
>>>> happen....
>>>> > And
>>>> > >>> that
>>>> > >>> > >>> is
>>>> > >>> > >>> > > > obvious
>>>> > >>> > >>> > > > >> > when
>>>> > >>> > >>> > > > >> > > I
>>>> > >>> > >>> > > > >> > > > > > >> restart
>>>> > >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the
>>>> screenshot
>>>> > >>> > >>> attached. In
>>>> > >>> > >>> > > > >> essence
>>>> > >>> > >>> > > > >> > the
>>>> > >>> > >>> > > > >> > > > > > latency
>>>> > >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2
>>>> instance
>>>> > is
>>>> > >>> > reset
>>>> > >>> > >>> > > > >> indicating no
>>>> > >>> > >>> > > > >> > > > > restore
>>>> > >>> > >>> > > > >> > > > > > >> but
>>>> > >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be
>>>> > missing
>>>> > >>> some
>>>> > >>> > >>> thing
>>>> > >>> > >>> > > > >> simple
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM
>>>> Ryanne
>>>> > >>> Dolan <
>>>> > >>> > >>> > > > >> > > > ryannedo...@gmail.com
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > > > >> > wrote:
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy:
>>>> you
>>>> > must
>>>> > >>> set
>>>> > >>> > >>> > > tasks.max
>>>> > >>> > >>> > > > to
>>>> > >>> > >>> > > > >> > > > > something
>>>> > >>> > >>> > > > >> > > > > > >> above
>>>> > >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to
>>>> achieve any
>>>> > >>> > >>> parallelism.
>>>> > >>> > >>> > > > This
>>>> > >>> > >>> > > > >> > > > property
>>>> > >>> > >>> > > > >> > > > > is
>>>> > >>> > >>> > > > >> > > > > > >> >> passed along to the internal
>>>> Connect
>>>> > >>> workers.
>>>> > >>> > >>> It's
>>>> > >>> > >>> > > > >> > unfortunate
>>>> > >>> > >>> > > > >> > > > that
>>>> > >>> > >>> > > > >> > > > > > >> >> Connect
>>>> > >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this
>>>> > >>> property
>>>> > >>> > to
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > >> number of
>>>> > >>> > >>> > > > >> > > > > > workers.
>>>> > >>> > >>> > > > >> > > > > > >> I
>>>> > >>> > >>> > > > >> > > > > > >> >> suspect that will improve before
>>>> long.
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >> >> For the second issue, is it
>>>> possible you
>>>> > >>> are
>>>> > >>> > >>> missing
>>>> > >>> > >>> > > the
>>>> > >>> > >>> > > > >> > > offsets
>>>> > >>> > >>> > > > >> > > > > > >> topic? It
>>>> > >>> > >>> > > > >> > > > > > >> >> should exist alongside the config
>>>> and
>>>> > >>> status
>>>> > >>> > >>> topics.
>>>> > >>> > >>> > > > >> Connect
>>>> > >>> > >>> > > > >> > > > should
>>>> > >>> > >>> > > > >> > > > > > >> create
>>>> > >>> > >>> > > > >> > > > > > >> >> this topic, but there are various
>>>> > reasons
>>>> > >>> this
>>>> > >>> > >>> can
>>>> > >>> > >>> > > fail,
>>>> > >>> > >>> > > > >> e.g.
>>>> > >>> > >>> > > > >> > > if
>>>> > >>> > >>> > > > >> > > > > the
>>>> > >>> > >>> > > > >> > > > > > >> >> replication factor is
>>>> misconfigured. You
>>>> > >>> can
>>>> > >>> > try
>>>> > >>> > >>> > > > creating
>>>> > >>> > >>> > > > >> > this
>>>> > >>> > >>> > > > >> > > > > topic
>>>> > >>> > >>> > > > >> > > > > > >> >> manually or changing
>>>> > >>> > >>> > > offsets.storage.replication.factor.
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >> >> Ryanne
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM
>>>> Vishal
>>>> > >>> Santoshi
>>>> > >>> > <
>>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>>> > >>> > >>> > > > >> > > > > > >> >> wrote:
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >> >> > Using
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror
>>>> > >>> > >>> > > > >> > > > > > as a
>>>> > >>> > >>> > > > >> > > > > > >> >> > guide,
>>>> > >>> > >>> > > > >> > > > > > >> >> > I have build from source the
>>>> > >>> origin/KIP-382
>>>> > >>> > of
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> https://github.com/apache/kafka.git.
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2
>>>> > >>> different
>>>> > >>> > >>> nodes (
>>>> > >>> > >>> > > they
>>>> > >>> > >>> > > > >> are
>>>> > >>> > >>> > > > >> > > > > actually
>>>> > >>> > >>> > > > >> > > > > > >> >> pods on
>>>> > >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter
>>>> ). They
>>>> > >>> share
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > >> > > mm2.properties
>>>> > >>> > >>> > > > >> > > > > > file
>>>> > >>> > >>> > > > >> > > > > > >> and
>>>> > >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3
>>>> topics
>>>> > with
>>>> > >>> 8
>>>> > >>> > >>> > partitions
>>>> > >>> > >>> > > > in
>>>> > >>> > >>> > > > >> > > total.
>>>> > >>> > >>> > > > >> > > > > > That
>>>> > >>> > >>> > > > >> > > > > > >> >> seems
>>>> > >>> > >>> > > > >> > > > > > >> >> > to be the way to create a
>>>> standalone
>>>> > mm2
>>>> > >>> > >>> cluster.
>>>> > >>> > >>> > I
>>>> > >>> > >>> > > do
>>>> > >>> > >>> > > > >> not
>>>> > >>> > >>> > > > >> > > > > however
>>>> > >>> > >>> > > > >> > > > > > >> see(
>>>> > >>> > >>> > > > >> > > > > > >> >> at
>>>> > >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show )
>>>> any
>>>> > >>> attempt
>>>> > >>> > to
>>>> > >>> > >>> > > > rebalance.
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> >
>>>> > >>>
>>>> >
>>>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
>>>> > >>> > >>> > > > >> > > > > > >> >> > mbeans
>>>> > >>> > >>> > > > >> > > > > > >> >> > are all on a single node
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the
>>>> 2
>>>> > >>> nodes (
>>>> > >>> > >>> hard
>>>> > >>> > >>> > stop
>>>> > >>> > >>> > > > ans
>>>> > >>> > >>> > > > >> > > start
>>>> > >>> > >>> > > > >> > > > ).
>>>> > >>> > >>> > > > >> > > > > > The
>>>> > >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to
>>>> be
>>>> > >>> reset to
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > >> earliest,
>>>> > >>> > >>> > > > >> > as
>>>> > >>> > >>> > > > >> > > if
>>>> > >>> > >>> > > > >> > > > > it
>>>> > >>> > >>> > > > >> > > > > > >> is a
>>>> > >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also
>>>> > obvious
>>>> > >>> from
>>>> > >>> > >>> the
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > "record-age-ms-avg|replication-latency-ms-avg"
>>>> > >>> > >>> > > > >> > > > > > >> >> > which I track through the
>>>> restart.
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > This implies that
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing
>>>> is
>>>> > not
>>>> > >>> > >>> working. I
>>>> > >>> > >>> > > > >> cannot
>>>> > >>> > >>> > > > >> > > scale
>>>> > >>> > >>> > > > >> > > > > up
>>>> > >>> > >>> > > > >> > > > > > or
>>>> > >>> > >>> > > > >> > > > > > >> >> down
>>>> > >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2
>>>> cluster or
>>>> > >>> > removing
>>>> > >>> > >>> > them.
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not
>>>> working.
>>>> > >>> If
>>>> > >>> > the
>>>> > >>> > >>> MM2
>>>> > >>> > >>> > > > >> cluster
>>>> > >>> > >>> > > > >> > is
>>>> > >>> > >>> > > > >> > > > > > brought
>>>> > >>> > >>> > > > >> > > > > > >> >> down,
>>>> > >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from
>>>> the
>>>> > >>> last
>>>> > >>> > >>> known
>>>> > >>> > >>> > > > state. I
>>>> > >>> > >>> > > > >> > see
>>>> > >>> > >>> > > > >> > > > the,
>>>> > >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created
>>>> as
>>>> > >>> > expected..
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty
>>>> mimimal
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 ->
>>>> dr2*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled =
>>>> false*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled =
>>>> false*
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ?
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >> > Thanks
>>>> > >>> > >>> > > > >> > > > > > >> >> >
>>>> > >>> > >>> > > > >> > > > > > >> >>
>>>> > >>> > >>> > > > >> > > > > > >> >
>>>> > >>> > >>> > > > >> > > > > > >>
>>>> > >>> > >>> > > > >> > > > > > >
>>>> > >>> > >>> > > > >> > > > > >
>>>> > >>> > >>> > > > >> > > > >
>>>> > >>> > >>> > > > >> > > >
>>>> > >>> > >>> > > > >> > >
>>>> > >>> > >>> > > > >> >
>>>> > >>> > >>> > > > >>
>>>> > >>> > >>> > > > >
>>>> > >>> > >>> > > >
>>>> > >>> > >>> > >
>>>> > >>> > >>> >
>>>> > >>> > >>>
>>>> > >>> > >>
>>>> > >>> >
>>>> > >>>
>>>> > >>
>>>> >
>>>>
>>>

Reply via email to